diff --git a/CHANGELOG.md b/CHANGELOG.md index f3b8664..96f4e43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,3 +50,4 @@ 0.7.17 - Ajout d’une première couche WS hybride avec collecte de cibles `programSubscribe` / `accountSubscribe` et persistance technique dédupliquée des notifications `logs / program / account` 0.7.18 - Ajout d’un premier backfill historique ciblé par token mint, basé sur `getSignaturesForAddress` + `getTransaction`, avec réutilisation du pipeline interne pour reconstruire transactions, pools, swaps, origins, wallets et métriques 0.7.19 - Ajout d’une première couche holdings observés avec agrégation par couple wallet/token et branchement automatique dans le pipeline de résolution transactionnelle +0.7.20 - Ajout d’une première couche candles / OHLCV avec matérialisation en base des timeframes usuels et régénération à la demande pour un timeframe arbitraire depuis les trade events diff --git a/Cargo.toml b/Cargo.toml index 0435d4e..3f911b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.19" +version = "0.7.20" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index bb62b9b..8883d99 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -644,15 +644,14 @@ Réalisé : - branchement automatique dans le pipeline de résolution transactionnelle. ### 6.052. Version `0.7.20` — Candles / OHLCV -Objectif : compléter la première couche d’agrégats DEX avec des séries temporelles exploitables par paire. +Réalisé : -À faire : - -- ajouter des candles par paire et par fenêtre temporelle, -- calculer `open`, `high`, `low`, `close`, `volume` et `trade_count`, -- alimenter ces candles à partir des `trade events` déjà normalisés, -- conserver un modèle idempotent apte à être recalculé ou consolidé, -- préparer la couche analytique riche de `0.8.x`. +- ajout d’une première table `pair candles` pour matérialiser les agrégats OHLCV par paire, +- stockage en base des timeframes usuels (`1m`, `5m`, `15m`, `1h`), +- conservation de `trade events` comme source brute de vérité, +- ajout d’un service de régénération à la demande pour un timeframe arbitraire, +- possibilité de choisir dynamiquement le timeframe lors d’une requête analytique, +- branchement automatique dans le pipeline de résolution transactionnelle pour maintenir les candles matérialisées à jour. ### 6.053. Version `0.7.21` — Signaux analytiques plus riches Objectif : préparer, avant `0.8.x`, une première couche de signaux enrichis au-dessus des objets déjà consolidés. diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs index b6c5a46..5766ab5 100644 --- a/kb_lib/src/db.rs +++ b/kb_lib/src/db.rs @@ -31,6 +31,7 @@ pub use dtos::KbLaunchSurfaceKeyDto; pub use dtos::KbLiquidityEventDto; pub use dtos::KbObservedTokenDto; pub use dtos::KbOnchainObservationDto; +pub use dtos::KbPairCandleDto; pub use dtos::KbPairDto; pub use dtos::KbPairMetricDto; pub use dtos::KbPoolDto; @@ -61,6 +62,7 @@ pub use entities::KbLaunchSurfaceKeyEntity; pub use entities::KbLiquidityEventEntity; pub use entities::KbObservedTokenEntity; pub use entities::KbOnchainObservationEntity; +pub use entities::KbPairCandleEntity; pub use entities::KbPairEntity; pub use entities::KbPairMetricEntity; pub use entities::KbPoolEntity; @@ -88,6 +90,7 @@ pub use queries::get_launch_surface_by_code; pub use queries::get_launch_surface_key_by_match; pub use queries::get_observed_token_by_mint; pub use queries::get_pair_by_pool_id; +pub use queries::get_pair_candle_by_key; pub use queries::get_pair_metric_by_pair_id; pub use queries::get_pool_by_address; pub use queries::get_pool_listing_by_pool_id; @@ -111,6 +114,7 @@ pub use queries::list_launch_attributions_by_pool_id; pub use queries::list_launch_surface_keys_by_surface_id; pub use queries::list_launch_surfaces; pub use queries::list_observed_tokens; +pub use queries::list_pair_candles_by_pair_and_timeframe; pub use queries::list_pair_metrics; pub use queries::list_pairs; pub use queries::list_pool_listings; @@ -127,6 +131,7 @@ pub use queries::list_recent_swaps; pub use queries::list_recent_token_burn_events; pub use queries::list_recent_token_mint_events; pub use queries::list_trade_events_by_pair_id; +pub use queries::list_trade_events_by_transaction_id; pub use queries::list_wallet_holdings_by_wallet_id; pub use queries::list_wallet_participations_by_pool_id; pub use queries::list_wallet_participations_by_wallet_id; @@ -144,6 +149,7 @@ pub use queries::upsert_launch_surface_key; pub use queries::upsert_liquidity_event; pub use queries::upsert_observed_token; pub use queries::upsert_pair; +pub use queries::upsert_pair_candle; pub use queries::upsert_pair_metric; pub use queries::upsert_pool; pub use queries::upsert_pool_listing; diff --git a/kb_lib/src/db/dtos.rs b/kb_lib/src/db/dtos.rs index 78c0748..4798893 100644 --- a/kb_lib/src/db/dtos.rs +++ b/kb_lib/src/db/dtos.rs @@ -19,6 +19,7 @@ mod liquidity_event; mod observed_token; mod onchain_observation; mod pair; +mod pair_candle; mod pair_metric; mod pool; mod pool_listing; @@ -50,6 +51,7 @@ pub use liquidity_event::KbLiquidityEventDto; pub use observed_token::KbObservedTokenDto; pub use onchain_observation::KbOnchainObservationDto; pub use pair::KbPairDto; +pub use pair_candle::KbPairCandleDto; pub use pair_metric::KbPairMetricDto; pub use pool::KbPoolDto; pub use pool_listing::KbPoolListingDto; diff --git a/kb_lib/src/db/dtos/pair_candle.rs b/kb_lib/src/db/dtos/pair_candle.rs new file mode 100644 index 0000000..c72f17d --- /dev/null +++ b/kb_lib/src/db/dtos/pair_candle.rs @@ -0,0 +1,135 @@ +// file: kb_lib/src/db/dtos/pair_candle.rs + +//! Pair-candle DTO. + +/// Application-facing pair-candle DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbPairCandleDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Related pair id. + pub pair_id: i64, + /// Candle timeframe in seconds. + pub timeframe_seconds: i64, + /// Inclusive bucket start in unix seconds. + pub bucket_start_unix: i64, + /// Exclusive bucket end in unix seconds. + pub bucket_end_unix: i64, + /// Open price in quote-per-base units. + pub open_price_quote_per_base: f64, + /// High price in quote-per-base units. + pub high_price_quote_per_base: f64, + /// Low price in quote-per-base units. + pub low_price_quote_per_base: f64, + /// Close price in quote-per-base units. + pub close_price_quote_per_base: f64, + /// Trade count inside the candle. + pub trade_count: i64, + /// Buy count inside the candle. + pub buy_count: i64, + /// Sell count inside the candle. + pub sell_count: i64, + /// Aggregated base volume as decimal text when available. + pub base_volume_raw: std::option::Option, + /// Aggregated quote volume as decimal text when available. + pub quote_volume_raw: std::option::Option, + /// Optional first trade signature inside the candle. + pub first_trade_signature: std::option::Option, + /// Optional last trade signature inside the candle. + pub last_trade_signature: std::option::Option, + /// Creation timestamp. + pub created_at: chrono::DateTime, + /// Update timestamp. + pub updated_at: chrono::DateTime, +} + +impl KbPairCandleDto { + /// Creates a new pair-candle DTO. + #[allow(clippy::too_many_arguments)] + pub fn new( + pair_id: i64, + timeframe_seconds: i64, + bucket_start_unix: i64, + bucket_end_unix: i64, + open_price_quote_per_base: f64, + high_price_quote_per_base: f64, + low_price_quote_per_base: f64, + close_price_quote_per_base: f64, + trade_count: i64, + buy_count: i64, + sell_count: i64, + base_volume_raw: std::option::Option, + quote_volume_raw: std::option::Option, + first_trade_signature: std::option::Option, + last_trade_signature: std::option::Option, + ) -> Self { + let now = chrono::Utc::now(); + Self { + id: None, + pair_id, + timeframe_seconds, + bucket_start_unix, + bucket_end_unix, + open_price_quote_per_base, + high_price_quote_per_base, + low_price_quote_per_base, + close_price_quote_per_base, + trade_count, + buy_count, + sell_count, + base_volume_raw, + quote_volume_raw, + first_trade_signature, + last_trade_signature, + created_at: now, + updated_at: now, + } + } +} + +impl TryFrom for KbPairCandleDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbPairCandleEntity) -> Result { + let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at); + let created_at = match created_at_result { + Ok(created_at) => created_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse pair_candle created_at '{}': {}", + entity.created_at, error + ))); + } + }; + let updated_at_result = chrono::DateTime::parse_from_rfc3339(&entity.updated_at); + let updated_at = match updated_at_result { + Ok(updated_at) => updated_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse pair_candle updated_at '{}': {}", + entity.updated_at, error + ))); + } + }; + Ok(Self { + id: Some(entity.id), + pair_id: entity.pair_id, + timeframe_seconds: entity.timeframe_seconds, + bucket_start_unix: entity.bucket_start_unix, + bucket_end_unix: entity.bucket_end_unix, + open_price_quote_per_base: entity.open_price_quote_per_base, + high_price_quote_per_base: entity.high_price_quote_per_base, + low_price_quote_per_base: entity.low_price_quote_per_base, + close_price_quote_per_base: entity.close_price_quote_per_base, + trade_count: entity.trade_count, + buy_count: entity.buy_count, + sell_count: entity.sell_count, + base_volume_raw: entity.base_volume_raw, + quote_volume_raw: entity.quote_volume_raw, + first_trade_signature: entity.first_trade_signature, + last_trade_signature: entity.last_trade_signature, + created_at, + updated_at, + }) + } +} diff --git a/kb_lib/src/db/entities.rs b/kb_lib/src/db/entities.rs index d5895cb..5ae72e7 100644 --- a/kb_lib/src/db/entities.rs +++ b/kb_lib/src/db/entities.rs @@ -21,6 +21,7 @@ mod liquidity_event; mod observed_token; mod onchain_observation; mod pair; +mod pair_candle; mod pair_metric; mod pool; mod pool_listing; @@ -52,6 +53,7 @@ pub use liquidity_event::KbLiquidityEventEntity; pub use observed_token::KbObservedTokenEntity; pub use onchain_observation::KbOnchainObservationEntity; pub use pair::KbPairEntity; +pub use pair_candle::KbPairCandleEntity; pub use pair_metric::KbPairMetricEntity; pub use pool::KbPoolEntity; pub use pool_listing::KbPoolListingEntity; diff --git a/kb_lib/src/db/entities/pair_candle.rs b/kb_lib/src/db/entities/pair_candle.rs new file mode 100644 index 0000000..a2e90e4 --- /dev/null +++ b/kb_lib/src/db/entities/pair_candle.rs @@ -0,0 +1,44 @@ +// file: kb_lib/src/db/entities/pair_candle.rs + +//! Pair-candle entity. + +/// Persisted pair-candle row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbPairCandleEntity { + /// Numeric primary key. + pub id: i64, + /// Related pair id. + pub pair_id: i64, + /// Candle timeframe in seconds. + pub timeframe_seconds: i64, + /// Inclusive bucket start in unix seconds. + pub bucket_start_unix: i64, + /// Exclusive bucket end in unix seconds. + pub bucket_end_unix: i64, + /// Open price in quote-per-base units. + pub open_price_quote_per_base: f64, + /// High price in quote-per-base units. + pub high_price_quote_per_base: f64, + /// Low price in quote-per-base units. + pub low_price_quote_per_base: f64, + /// Close price in quote-per-base units. + pub close_price_quote_per_base: f64, + /// Trade count inside the candle. + pub trade_count: i64, + /// Buy count inside the candle. + pub buy_count: i64, + /// Sell count inside the candle. + pub sell_count: i64, + /// Aggregated base volume as decimal text when available. + pub base_volume_raw: std::option::Option, + /// Aggregated quote volume as decimal text when available. + pub quote_volume_raw: std::option::Option, + /// Optional first trade signature inside the candle. + pub first_trade_signature: std::option::Option, + /// Optional last trade signature inside the candle. + pub last_trade_signature: std::option::Option, + /// Creation timestamp encoded as RFC3339 UTC text. + pub created_at: std::string::String, + /// Update timestamp encoded as RFC3339 UTC text. + pub updated_at: std::string::String, +} diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs index 0cfdb72..b3412e2 100644 --- a/kb_lib/src/db/queries.rs +++ b/kb_lib/src/db/queries.rs @@ -23,6 +23,7 @@ mod liquidity_event; mod observed_token; mod onchain_observation; mod pair; +mod pair_candle; mod pair_metric; mod pool; mod pool_listing; @@ -84,6 +85,9 @@ pub use onchain_observation::list_recent_onchain_observations; pub use pair::get_pair_by_pool_id; pub use pair::list_pairs; pub use pair::upsert_pair; +pub use pair_candle::get_pair_candle_by_key; +pub use pair_candle::list_pair_candles_by_pair_and_timeframe; +pub use pair_candle::upsert_pair_candle; pub use pair_metric::get_pair_metric_by_pair_id; pub use pair_metric::list_pair_metrics; pub use pair_metric::upsert_pair_metric; @@ -108,6 +112,7 @@ pub use token_mint_event::list_recent_token_mint_events; pub use token_mint_event::upsert_token_mint_event; pub use trade_event::get_trade_event_by_decoded_event_id; pub use trade_event::list_trade_events_by_pair_id; +pub use trade_event::list_trade_events_by_transaction_id; pub use trade_event::upsert_trade_event; pub use wallet::get_wallet_by_address; pub use wallet::list_wallets; diff --git a/kb_lib/src/db/queries/pair_candle.rs b/kb_lib/src/db/queries/pair_candle.rs new file mode 100644 index 0000000..2211de5 --- /dev/null +++ b/kb_lib/src/db/queries/pair_candle.rs @@ -0,0 +1,216 @@ +// file: kb_lib/src/db/queries/pair_candle.rs + +//! Queries for `kb_pair_candles`. + +/// Inserts or updates one pair-candle row and returns its stable internal id. +pub async fn upsert_pair_candle( + database: &crate::KbDatabase, + dto: &crate::KbPairCandleDto, +) -> Result { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_pair_candles ( + pair_id, + timeframe_seconds, + bucket_start_unix, + bucket_end_unix, + open_price_quote_per_base, + high_price_quote_per_base, + low_price_quote_per_base, + close_price_quote_per_base, + trade_count, + buy_count, + sell_count, + base_volume_raw, + quote_volume_raw, + first_trade_signature, + last_trade_signature, + created_at, + updated_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(pair_id, timeframe_seconds, bucket_start_unix) DO UPDATE SET + bucket_end_unix = excluded.bucket_end_unix, + open_price_quote_per_base = excluded.open_price_quote_per_base, + high_price_quote_per_base = excluded.high_price_quote_per_base, + low_price_quote_per_base = excluded.low_price_quote_per_base, + close_price_quote_per_base = excluded.close_price_quote_per_base, + trade_count = excluded.trade_count, + buy_count = excluded.buy_count, + sell_count = excluded.sell_count, + base_volume_raw = excluded.base_volume_raw, + quote_volume_raw = excluded.quote_volume_raw, + first_trade_signature = excluded.first_trade_signature, + last_trade_signature = excluded.last_trade_signature, + updated_at = excluded.updated_at + "#, + ) + .bind(dto.pair_id) + .bind(dto.timeframe_seconds) + .bind(dto.bucket_start_unix) + .bind(dto.bucket_end_unix) + .bind(dto.open_price_quote_per_base) + .bind(dto.high_price_quote_per_base) + .bind(dto.low_price_quote_per_base) + .bind(dto.close_price_quote_per_base) + .bind(dto.trade_count) + .bind(dto.buy_count) + .bind(dto.sell_count) + .bind(dto.base_volume_raw.clone()) + .bind(dto.quote_volume_raw.clone()) + .bind(dto.first_trade_signature.clone()) + .bind(dto.last_trade_signature.clone()) + .bind(dto.created_at.to_rfc3339()) + .bind(dto.updated_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_pair_candles on sqlite: {}", + error + ))); + } + + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_pair_candles +WHERE pair_id = ? AND timeframe_seconds = ? AND bucket_start_unix = ? +LIMIT 1 + "#, + ) + .bind(dto.pair_id) + .bind(dto.timeframe_seconds) + .bind(dto.bucket_start_unix) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_pair_candles id for pair_id '{}' timeframe '{}' bucket '{}' on sqlite: {}", + dto.pair_id, dto.timeframe_seconds, dto.bucket_start_unix, error + ))), + } + } + } +} + +/// Returns one pair-candle row identified by `(pair_id, timeframe_seconds, bucket_start_unix)`. +pub async fn get_pair_candle_by_key( + database: &crate::KbDatabase, + pair_id: i64, + timeframe_seconds: i64, + bucket_start_unix: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + pair_id, + timeframe_seconds, + bucket_start_unix, + bucket_end_unix, + open_price_quote_per_base, + high_price_quote_per_base, + low_price_quote_per_base, + close_price_quote_per_base, + trade_count, + buy_count, + sell_count, + base_volume_raw, + quote_volume_raw, + first_trade_signature, + last_trade_signature, + created_at, + updated_at +FROM kb_pair_candles +WHERE pair_id = ? AND timeframe_seconds = ? AND bucket_start_unix = ? +LIMIT 1 + "#, + ) + .bind(pair_id) + .bind(timeframe_seconds) + .bind(bucket_start_unix) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot read kb_pair_candles by key on sqlite: {}", + error + ))); + } + }; + match entity_option { + Some(entity) => crate::KbPairCandleDto::try_from(entity).map(Some), + None => Ok(None), + } + } + } +} + +/// Lists candles for one pair and one timeframe ordered by bucket start. +pub async fn list_pair_candles_by_pair_and_timeframe( + database: &crate::KbDatabase, + pair_id: i64, + timeframe_seconds: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + pair_id, + timeframe_seconds, + bucket_start_unix, + bucket_end_unix, + open_price_quote_per_base, + high_price_quote_per_base, + low_price_quote_per_base, + close_price_quote_per_base, + trade_count, + buy_count, + sell_count, + base_volume_raw, + quote_volume_raw, + first_trade_signature, + last_trade_signature, + created_at, + updated_at +FROM kb_pair_candles +WHERE pair_id = ? AND timeframe_seconds = ? +ORDER BY bucket_start_unix ASC, id ASC + "#, + ) + .bind(pair_id) + .bind(timeframe_seconds) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_pair_candles by pair_id '{}' timeframe '{}' on sqlite: {}", + pair_id, timeframe_seconds, error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbPairCandleDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} diff --git a/kb_lib/src/db/queries/trade_event.rs b/kb_lib/src/db/queries/trade_event.rs index 5580d68..08f533b 100644 --- a/kb_lib/src/db/queries/trade_event.rs +++ b/kb_lib/src/db/queries/trade_event.rs @@ -199,6 +199,66 @@ ORDER BY created_at ASC, id ASC } } +/// Lists trade-event rows for one transaction id ordered by id. +pub async fn list_trade_events_by_transaction_id( + database: &crate::KbDatabase, + transaction_id: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + dex_id, + pool_id, + pair_id, + transaction_id, + decoded_event_id, + signature, + slot, + trade_side, + base_token_id, + quote_token_id, + base_amount_raw, + quote_amount_raw, + price_quote_per_base, + source_kind, + source_endpoint_name, + payload_json, + created_at, + updated_at +FROM kb_trade_events +WHERE transaction_id = ? +ORDER BY id ASC + "#, + ) + .bind(transaction_id) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_trade_events by transaction_id '{}' on sqlite: {}", + transaction_id, error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbTradeEventDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + fn kb_trade_side_to_string(value: crate::KbSwapTradeSide) -> &'static str { match value { crate::KbSwapTradeSide::BuyBase => "BuyBase", diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs index f716bcd..1a48226 100644 --- a/kb_lib/src/db/schema.rs +++ b/kb_lib/src/db/schema.rs @@ -322,6 +322,18 @@ pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), cr if let Err(error) = result { return Err(error); } + let result = create_kb_pair_candles_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pair_candles_pair_timeframe(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_pair_candles_bucket(pool).await; + if let Err(error) = result { + return Err(error); + } Ok(()) } } @@ -1788,3 +1800,61 @@ ON kb_wallet_holdings(token_id) ) .await } + +async fn create_kb_pair_candles_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_pair_candles_table", + r#" +CREATE TABLE IF NOT EXISTS kb_pair_candles ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pair_id INTEGER NOT NULL, + timeframe_seconds INTEGER NOT NULL, + bucket_start_unix INTEGER NOT NULL, + bucket_end_unix INTEGER NOT NULL, + open_price_quote_per_base REAL NOT NULL, + high_price_quote_per_base REAL NOT NULL, + low_price_quote_per_base REAL NOT NULL, + close_price_quote_per_base REAL NOT NULL, + trade_count INTEGER NOT NULL, + buy_count INTEGER NOT NULL, + sell_count INTEGER NOT NULL, + base_volume_raw TEXT NULL, + quote_volume_raw TEXT NULL, + first_trade_signature TEXT NULL, + last_trade_signature TEXT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(pair_id, timeframe_seconds, bucket_start_unix), + FOREIGN KEY(pair_id) REFERENCES kb_pairs(id) ON DELETE CASCADE +) + "#, + ) + .await +} + +async fn create_kb_idx_pair_candles_pair_timeframe( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pair_candles_pair_timeframe", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pair_candles_pair_timeframe +ON kb_pair_candles(pair_id, timeframe_seconds) + "#, + ) + .await +} + +async fn create_kb_idx_pair_candles_bucket(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_pair_candles_bucket", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_pair_candles_bucket +ON kb_pair_candles(bucket_start_unix) + "#, + ) + .await +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 04c5806..ddf254c 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -20,6 +20,8 @@ mod http_client; mod http_pool; mod json_rpc_ws; mod launch_origin; +mod pair_candle_aggregation; +mod pair_candle_query; mod pool_origin; mod solana_pubsub_ws; mod token_backfill; @@ -86,6 +88,8 @@ pub use db::KbObservedTokenEntity; pub use db::KbObservedTokenStatus; pub use db::KbOnchainObservationDto; pub use db::KbOnchainObservationEntity; +pub use db::KbPairCandleDto; +pub use db::KbPairCandleEntity; pub use db::KbPairDto; pub use db::KbPairEntity; pub use db::KbPairMetricDto; @@ -131,6 +135,7 @@ pub use db::get_launch_surface_by_code; pub use db::get_launch_surface_key_by_match; pub use db::get_observed_token_by_mint; pub use db::get_pair_by_pool_id; +pub use db::get_pair_candle_by_key; pub use db::get_pair_metric_by_pair_id; pub use db::get_pool_by_address; pub use db::get_pool_listing_by_pool_id; @@ -154,6 +159,7 @@ pub use db::list_launch_attributions_by_pool_id; pub use db::list_launch_surface_keys_by_surface_id; pub use db::list_launch_surfaces; pub use db::list_observed_tokens; +pub use db::list_pair_candles_by_pair_and_timeframe; pub use db::list_pair_metrics; pub use db::list_pairs; pub use db::list_pool_listings; @@ -170,6 +176,7 @@ pub use db::list_recent_swaps; pub use db::list_recent_token_burn_events; pub use db::list_recent_token_mint_events; pub use db::list_trade_events_by_pair_id; +pub use db::list_trade_events_by_transaction_id; pub use db::list_wallet_holdings_by_wallet_id; pub use db::list_wallet_participations_by_pool_id; pub use db::list_wallet_participations_by_wallet_id; @@ -187,6 +194,7 @@ pub use db::upsert_launch_surface_key; pub use db::upsert_liquidity_event; pub use db::upsert_observed_token; pub use db::upsert_pair; +pub use db::upsert_pair_candle; pub use db::upsert_pair_metric; pub use db::upsert_pool; pub use db::upsert_pool_listing; @@ -282,6 +290,9 @@ pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; pub use launch_origin::KbLaunchAttributionResult; pub use launch_origin::KbLaunchOriginService; +pub use pair_candle_aggregation::KbPairCandleAggregationResult; +pub use pair_candle_aggregation::KbPairCandleAggregationService; +pub use pair_candle_query::KbPairCandleQueryService; pub use pool_origin::KbPoolOriginResult; pub use pool_origin::KbPoolOriginService; pub use solana_pubsub_ws::KbSolanaWsTypedNotification; diff --git a/kb_lib/src/pair_candle_aggregation.rs b/kb_lib/src/pair_candle_aggregation.rs new file mode 100644 index 0000000..78e137e --- /dev/null +++ b/kb_lib/src/pair_candle_aggregation.rs @@ -0,0 +1,588 @@ +// file: kb_lib/src/pair_candle_aggregation.rs + +//! Pair-candle aggregation service. + +/// One pair-candle aggregation result. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbPairCandleAggregationResult { + /// Related pair id. + pub pair_id: i64, + /// Candle timeframe in seconds. + pub timeframe_seconds: i64, + /// Inclusive bucket start in unix seconds. + pub bucket_start_unix: i64, + /// Persisted candle id. + pub pair_candle_id: i64, +} + +/// Pair-candle aggregation service. +/// +/// This service materializes a small set of standard timeframes in base storage. +/// Arbitrary timeframes are rebuilt on demand through `KbPairCandleQueryService`. +#[derive(Debug, Clone)] +pub struct KbPairCandleAggregationService { + database: std::sync::Arc, + persistence: crate::KbDetectionPersistenceService, +} + +impl KbPairCandleAggregationService { + /// Creates a new pair-candle aggregation service. + pub fn new(database: std::sync::Arc) -> Self { + let persistence = crate::KbDetectionPersistenceService::new(database.clone()); + Self { + database, + persistence, + } + } + + /// Returns the list of materialized timeframes in seconds. + pub fn materialized_timeframes_seconds(&self) -> std::vec::Vec { + vec![60, 300, 900, 3600] + } + + /// Rebuilds all impacted materialized candles for one resolved transaction signature. + pub async fn record_transaction_by_signature( + &self, + signature: &str, + ) -> Result, crate::KbError> { + let transaction_result = + crate::get_chain_transaction_by_signature(self.database.as_ref(), signature).await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => return Err(error), + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => { + return Err(crate::KbError::InvalidState(format!( + "cannot aggregate pair candles for unknown transaction '{}'", + signature + ))); + } + }; + let transaction_id = match transaction.id { + Some(transaction_id) => transaction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "transaction '{}' has no internal id", + signature + ))); + } + }; + let trade_events_result = + crate::list_trade_events_by_transaction_id(self.database.as_ref(), transaction_id) + .await; + let trade_events = match trade_events_result { + Ok(trade_events) => trade_events, + Err(error) => return Err(error), + }; + let materialized_timeframes = self.materialized_timeframes_seconds(); + let mut seen = std::collections::HashSet::<(i64, i64, i64)>::new(); + let mut results = std::vec::Vec::new(); + for trade_event in &trade_events { + let event_time_option = + kb_extract_trade_event_unix_time(self.database.as_ref(), trade_event).await?; + let event_time_unix = match event_time_option { + Some(event_time_unix) => event_time_unix, + None => continue, + }; + for timeframe_seconds in &materialized_timeframes { + let bucket_start_unix = kb_bucket_start_unix(event_time_unix, *timeframe_seconds)?; + let dedupe_key = (trade_event.pair_id, *timeframe_seconds, bucket_start_unix); + if seen.contains(&dedupe_key) { + continue; + } + seen.insert(dedupe_key); + let rebuilt_result = self + .rebuild_one_candle(trade_event.pair_id, *timeframe_seconds, bucket_start_unix) + .await; + let rebuilt = match rebuilt_result { + Ok(rebuilt) => rebuilt, + Err(error) => return Err(error), + }; + if let Some(rebuilt) = rebuilt { + results.push(rebuilt); + } + } + } + if !results.is_empty() { + let payload = serde_json::json!({ + "transactionSignature": signature, + "pairCandleCount": results.len() + }); + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + "pair.candle_aggregation".to_string(), + crate::KbObservationSourceKind::Dex, + transaction.source_endpoint_name.clone(), + transaction.signature.clone(), + transaction.slot, + payload.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + "signal.pair.candle_aggregation.recorded".to_string(), + crate::KbAnalysisSignalSeverity::Low, + transaction.signature.clone(), + Some(observation_id), + None, + payload, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(results) + } + + async fn rebuild_one_candle( + &self, + pair_id: i64, + timeframe_seconds: i64, + bucket_start_unix: i64, + ) -> Result, crate::KbError> { + let trade_events_result = + crate::list_trade_events_by_pair_id(self.database.as_ref(), pair_id).await; + let trade_events = match trade_events_result { + Ok(trade_events) => trade_events, + Err(error) => return Err(error), + }; + let candle_option_result = kb_build_candle_from_trade_events( + self.database.as_ref(), + pair_id, + timeframe_seconds, + bucket_start_unix, + &trade_events, + ) + .await; + let candle_option = match candle_option_result { + Ok(candle_option) => candle_option, + Err(error) => return Err(error), + }; + let candle = match candle_option { + Some(candle) => candle, + None => return Ok(None), + }; + let pair_candle_id_result = + crate::upsert_pair_candle(self.database.as_ref(), &candle).await; + let pair_candle_id = match pair_candle_id_result { + Ok(pair_candle_id) => pair_candle_id, + Err(error) => return Err(error), + }; + Ok(Some(crate::KbPairCandleAggregationResult { + pair_id, + timeframe_seconds, + bucket_start_unix, + pair_candle_id, + })) + } +} + +pub(crate) async fn kb_build_candle_from_trade_events( + database: &crate::KbDatabase, + pair_id: i64, + timeframe_seconds: i64, + bucket_start_unix: i64, + trade_events: &[crate::KbTradeEventDto], +) -> Result, crate::KbError> { + let bucket_end_unix = bucket_start_unix.saturating_add(timeframe_seconds); + let mut rows = std::vec::Vec::::new(); + for trade_event in trade_events { + if trade_event.pair_id != pair_id { + continue; + } + let event_time_option = kb_extract_trade_event_unix_time(database, trade_event).await?; + let event_time_unix = match event_time_option { + Some(event_time_unix) => event_time_unix, + None => continue, + }; + if event_time_unix < bucket_start_unix || event_time_unix >= bucket_end_unix { + continue; + } + let price_quote_per_base = match trade_event.price_quote_per_base { + Some(price_quote_per_base) => price_quote_per_base, + None => continue, + }; + rows.push(KbTradeEventForCandle { + event_time_unix, + decoded_event_id: trade_event.decoded_event_id, + signature: trade_event.signature.clone(), + trade_side: trade_event.trade_side, + price_quote_per_base, + base_amount_raw: trade_event.base_amount_raw.clone(), + quote_amount_raw: trade_event.quote_amount_raw.clone(), + }); + } + if rows.is_empty() { + return Ok(None); + } + rows.sort_by(|left, right| { + let time_compare = left.event_time_unix.cmp(&right.event_time_unix); + if time_compare != std::cmp::Ordering::Equal { + return time_compare; + } + left.decoded_event_id.cmp(&right.decoded_event_id) + }); + let open_price_quote_per_base = rows[0].price_quote_per_base; + let close_price_quote_per_base = rows[rows.len() - 1].price_quote_per_base; + let mut high_price_quote_per_base = open_price_quote_per_base; + let mut low_price_quote_per_base = open_price_quote_per_base; + let mut trade_count = 0_i64; + let mut buy_count = 0_i64; + let mut sell_count = 0_i64; + let mut base_volume_raw = std::option::Option::::None; + let mut quote_volume_raw = std::option::Option::::None; + for row in &rows { + trade_count += 1; + if row.trade_side == crate::KbSwapTradeSide::BuyBase { + buy_count += 1; + } + if row.trade_side == crate::KbSwapTradeSide::SellBase { + sell_count += 1; + } + if row.price_quote_per_base > high_price_quote_per_base { + high_price_quote_per_base = row.price_quote_per_base; + } + if row.price_quote_per_base < low_price_quote_per_base { + low_price_quote_per_base = row.price_quote_per_base; + } + base_volume_raw = kb_add_raw_amounts(base_volume_raw, row.base_amount_raw.clone()); + quote_volume_raw = kb_add_raw_amounts(quote_volume_raw, row.quote_amount_raw.clone()); + } + Ok(Some(crate::KbPairCandleDto::new( + pair_id, + timeframe_seconds, + bucket_start_unix, + bucket_end_unix, + open_price_quote_per_base, + high_price_quote_per_base, + low_price_quote_per_base, + close_price_quote_per_base, + trade_count, + buy_count, + sell_count, + base_volume_raw, + quote_volume_raw, + Some(rows[0].signature.clone()), + Some(rows[rows.len() - 1].signature.clone()), + ))) +} + +pub(crate) async fn kb_extract_trade_event_unix_time( + database: &crate::KbDatabase, + trade_event: &crate::KbTradeEventDto, +) -> Result, crate::KbError> { + let transaction_result = + crate::get_chain_transaction_by_signature(database, trade_event.signature.as_str()).await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => return Err(error), + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => return Ok(Some(trade_event.created_at.timestamp())), + }; + match transaction.block_time_unix { + Some(block_time_unix) => Ok(Some(block_time_unix)), + None => Ok(Some(trade_event.created_at.timestamp())), + } +} + +pub(crate) fn kb_bucket_start_unix( + event_time_unix: i64, + timeframe_seconds: i64, +) -> Result { + if timeframe_seconds <= 0 { + return Err(crate::KbError::InvalidState(format!( + "invalid timeframe_seconds '{}'", + timeframe_seconds + ))); + } + + Ok((event_time_unix / timeframe_seconds) * timeframe_seconds) +} + +fn kb_add_raw_amounts( + left: std::option::Option, + right: std::option::Option, +) -> std::option::Option { + match (left, right) { + (None, None) => None, + (Some(left), None) => Some(left), + (None, Some(right)) => Some(right), + (Some(left), Some(right)) => { + let left_value_result = left.parse::(); + let left_value = match left_value_result { + Ok(left_value) => left_value, + Err(_) => return Some(left), + }; + let right_value_result = right.parse::(); + let right_value = match right_value_result { + Ok(right_value) => right_value, + Err(_) => return Some(left), + }; + Some((left_value + right_value).to_string()) + } + } +} + +#[derive(Debug, Clone)] +struct KbTradeEventForCandle { + event_time_unix: i64, + decoded_event_id: i64, + signature: std::string::String, + trade_side: crate::KbSwapTradeSide, + price_quote_per_base: f64, + base_amount_raw: std::option::Option, + quote_amount_raw: std::option::Option, +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("pair_candle_aggregation.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + async fn seed_fluxbeam_swap_transaction( + database: std::sync::Arc, + signature: &str, + block_time_unix: i64, + base_amount_raw: &str, + quote_amount_raw: &str, + ) { + let transaction_model = crate::KbTransactionModelService::new(database.clone()); + let dex_decode = crate::KbDexDecodeService::new(database.clone()); + let dex_detect = crate::KbDexDetectService::new(database.clone()); + let trade_aggregation = crate::KbTradeAggregationService::new(database.clone()); + let resolved_transaction = serde_json::json!({ + "slot": 960001, + "blockTime": block_time_unix, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_FLUXBEAM_PROGRAM_ID, + "program": "fluxbeam", + "stackHeight": 1, + "accounts": [ + "CandlePool111", + "CandleLpMint111", + "CandleTokenA111", + "So11111111111111111111111111111111111111112" + ], + "parsed": { + "info": { + "instruction": "swap", + "pool": "CandlePool111", + "tokenA": "CandleTokenA111", + "tokenB": "So11111111111111111111111111111111111111112", + "baseAmountRaw": base_amount_raw, + "quoteAmountRaw": quote_amount_raw + } + }, + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: Instruction: Swap", + "Program log: buy" + ] + } + }); + let project_result = transaction_model + .persist_resolved_transaction( + signature, + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await; + if let Err(error) = project_result { + panic!("projection must succeed: {}", error); + } + let decode_result = dex_decode.decode_transaction_by_signature(signature).await; + if let Err(error) = decode_result { + panic!("dex decode must succeed: {}", error); + } + let detect_result = dex_detect.detect_transaction_by_signature(signature).await; + if let Err(error) = detect_result { + panic!("dex detect must succeed: {}", error); + } + let trade_result = trade_aggregation + .record_transaction_by_signature(signature) + .await; + if let Err(error) = trade_result { + panic!("trade aggregation must succeed: {}", error); + } + } + + #[tokio::test] + async fn record_transaction_by_signature_creates_materialized_candles() { + let database = make_database().await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-1", + 1_700_000_000, + "1000", + "2000", + ) + .await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-2", + 1_700_000_020, + "1000", + "3000", + ) + .await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-3", + 1_700_000_070, + "1000", + "1500", + ) + .await; + let service = crate::KbPairCandleAggregationService::new(database.clone()); + let result_1 = service + .record_transaction_by_signature("sig-pair-candle-1") + .await; + if let Err(error) = result_1 { + panic!("candle aggregation 1 must succeed: {}", error); + } + let result_2 = service + .record_transaction_by_signature("sig-pair-candle-2") + .await; + if let Err(error) = result_2 { + panic!("candle aggregation 2 must succeed: {}", error); + } + let result_3 = service + .record_transaction_by_signature("sig-pair-candle-3") + .await; + if let Err(error) = result_3 { + panic!("candle aggregation 3 must succeed: {}", error); + } + let pools_result = crate::list_pools(database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => panic!("pool list must succeed: {}", error), + }; + let pool_id = pools[0].id.unwrap_or_default(); + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + let pair_id = pair.id.unwrap_or_default(); + let candles_result = + crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => panic!("candle list must succeed: {}", error), + }; + assert_eq!(candles.len(), 2); + assert_eq!(candles[0].open_price_quote_per_base, 2.0); + assert_eq!(candles[0].high_price_quote_per_base, 3.0); + assert_eq!(candles[0].low_price_quote_per_base, 2.0); + assert_eq!(candles[0].close_price_quote_per_base, 3.0); + assert_eq!(candles[0].trade_count, 2); + assert_eq!(candles[0].base_volume_raw, Some("2000".to_string())); + assert_eq!(candles[0].quote_volume_raw, Some("5000".to_string())); + assert_eq!(candles[1].open_price_quote_per_base, 1.5); + assert_eq!(candles[1].close_price_quote_per_base, 1.5); + assert_eq!(candles[1].trade_count, 1); + } + + #[tokio::test] + async fn materialized_candle_rebuild_is_idempotent() { + let database = make_database().await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-idempotent", + 1_700_001_000, + "1000", + "2500", + ) + .await; + let service = crate::KbPairCandleAggregationService::new(database.clone()); + let first_result = service + .record_transaction_by_signature("sig-pair-candle-idempotent") + .await; + let first_results = match first_result { + Ok(first_results) => first_results, + Err(error) => panic!("first candle aggregation must succeed: {}", error), + }; + assert!(!first_results.is_empty()); + let second_result = service + .record_transaction_by_signature("sig-pair-candle-idempotent") + .await; + let second_results = match second_result { + Ok(second_results) => second_results, + Err(error) => panic!("second candle aggregation must succeed: {}", error), + }; + assert!(!second_results.is_empty()); + let pools_result = crate::list_pools(database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => panic!("pool list must succeed: {}", error), + }; + let pool_id = pools[0].id.unwrap_or_default(); + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + let pair_id = pair.id.unwrap_or_default(); + let candles_result = + crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => panic!("candle list must succeed: {}", error), + }; + assert_eq!(candles.len(), 1); + assert_eq!(candles[0].trade_count, 1); + } +} diff --git a/kb_lib/src/pair_candle_query.rs b/kb_lib/src/pair_candle_query.rs new file mode 100644 index 0000000..05d9d7c --- /dev/null +++ b/kb_lib/src/pair_candle_query.rs @@ -0,0 +1,333 @@ +// file: kb_lib/src/pair_candle_query.rs + +//! Pair-candle query service. + +/// Pair-candle query service. +/// +/// Standard materialized timeframes are served from base storage. +/// Arbitrary timeframes are rebuilt on demand from `trade_events`. +#[derive(Debug, Clone)] +pub struct KbPairCandleQueryService { + database: std::sync::Arc, +} + +impl KbPairCandleQueryService { + /// Creates a new pair-candle query service. + pub fn new(database: std::sync::Arc) -> Self { + Self { database } + } + + /// Lists candles for one pair and one timeframe. + /// + /// When `prefer_materialized` is true and the timeframe is standard, + /// stored candles are returned. Otherwise the candles are rebuilt on demand. + pub async fn list_pair_candles( + &self, + pair_id: i64, + timeframe_seconds: i64, + bucket_start_from: std::option::Option, + bucket_start_to: std::option::Option, + prefer_materialized: bool, + ) -> Result, crate::KbError> { + if timeframe_seconds <= 0 { + return Err(crate::KbError::InvalidState(format!( + "invalid timeframe_seconds '{}'", + timeframe_seconds + ))); + } + if prefer_materialized && kb_is_materialized_timeframe(timeframe_seconds) { + let candles_result = crate::list_pair_candles_by_pair_and_timeframe( + self.database.as_ref(), + pair_id, + timeframe_seconds, + ) + .await; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => return Err(error), + }; + return Ok(kb_filter_candles_by_bucket_range( + candles, + bucket_start_from, + bucket_start_to, + )); + } + let trade_events_result = + crate::list_trade_events_by_pair_id(self.database.as_ref(), pair_id).await; + let trade_events = match trade_events_result { + Ok(trade_events) => trade_events, + Err(error) => return Err(error), + }; + let mut bucket_starts = std::collections::BTreeSet::::new(); + for trade_event in &trade_events { + let event_time_option = + crate::pair_candle_aggregation::kb_extract_trade_event_unix_time( + self.database.as_ref(), + trade_event, + ) + .await; + let event_time_option = match event_time_option { + Ok(event_time_option) => event_time_option, + Err(error) => return Err(error), + }; + let event_time_unix = match event_time_option { + Some(event_time_unix) => event_time_unix, + None => continue, + }; + let bucket_start_result = crate::pair_candle_aggregation::kb_bucket_start_unix( + event_time_unix, + timeframe_seconds, + ); + let bucket_start_unix = match bucket_start_result { + Ok(bucket_start_unix) => bucket_start_unix, + Err(error) => return Err(error), + }; + if let Some(bucket_start_from) = bucket_start_from { + if bucket_start_unix < bucket_start_from { + continue; + } + } + if let Some(bucket_start_to) = bucket_start_to { + if bucket_start_unix > bucket_start_to { + continue; + } + } + bucket_starts.insert(bucket_start_unix); + } + let mut candles = std::vec::Vec::new(); + for bucket_start_unix in bucket_starts { + let candle_result = crate::pair_candle_aggregation::kb_build_candle_from_trade_events( + self.database.as_ref(), + pair_id, + timeframe_seconds, + bucket_start_unix, + &trade_events, + ) + .await; + let candle_option = match candle_result { + Ok(candle_option) => candle_option, + Err(error) => return Err(error), + }; + if let Some(candle) = candle_option { + candles.push(candle); + } + } + Ok(candles) + } +} + +fn kb_is_materialized_timeframe(timeframe_seconds: i64) -> bool { + timeframe_seconds == 60 + || timeframe_seconds == 300 + || timeframe_seconds == 900 + || timeframe_seconds == 3600 +} + +fn kb_filter_candles_by_bucket_range( + candles: std::vec::Vec, + bucket_start_from: std::option::Option, + bucket_start_to: std::option::Option, +) -> std::vec::Vec { + let mut filtered = std::vec::Vec::new(); + for candle in candles { + if let Some(bucket_start_from) = bucket_start_from { + if candle.bucket_start_unix < bucket_start_from { + continue; + } + } + if let Some(bucket_start_to) = bucket_start_to { + if candle.bucket_start_unix > bucket_start_to { + continue; + } + } + filtered.push(candle); + } + filtered +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("pair_candle_aggregation.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + async fn seed_fluxbeam_swap_transaction( + database: std::sync::Arc, + signature: &str, + block_time_unix: i64, + base_amount_raw: &str, + quote_amount_raw: &str, + ) { + let transaction_model = crate::KbTransactionModelService::new(database.clone()); + let dex_decode = crate::KbDexDecodeService::new(database.clone()); + let dex_detect = crate::KbDexDetectService::new(database.clone()); + let trade_aggregation = crate::KbTradeAggregationService::new(database.clone()); + let resolved_transaction = serde_json::json!({ + "slot": 960001, + "blockTime": block_time_unix, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_FLUXBEAM_PROGRAM_ID, + "program": "fluxbeam", + "stackHeight": 1, + "accounts": [ + "CandlePool111", + "CandleLpMint111", + "CandleTokenA111", + "So11111111111111111111111111111111111111112" + ], + "parsed": { + "info": { + "instruction": "swap", + "pool": "CandlePool111", + "tokenA": "CandleTokenA111", + "tokenB": "So11111111111111111111111111111111111111112", + "baseAmountRaw": base_amount_raw, + "quoteAmountRaw": quote_amount_raw + } + }, + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: Instruction: Swap", + "Program log: buy" + ] + } + }); + let project_result = transaction_model + .persist_resolved_transaction( + signature, + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await; + if let Err(error) = project_result { + panic!("projection must succeed: {}", error); + } + let decode_result = dex_decode.decode_transaction_by_signature(signature).await; + if let Err(error) = decode_result { + panic!("dex decode must succeed: {}", error); + } + let detect_result = dex_detect.detect_transaction_by_signature(signature).await; + if let Err(error) = detect_result { + panic!("dex detect must succeed: {}", error); + } + let trade_result = trade_aggregation + .record_transaction_by_signature(signature) + .await; + if let Err(error) = trade_result { + panic!("trade aggregation must succeed: {}", error); + } + } + + #[tokio::test] + async fn list_pair_candles_can_rebuild_custom_timeframe_on_demand() { + let database = make_database().await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-query-1", + 1_700_010_000, + "1000", + "2000", + ) + .await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-query-2", + 1_700_010_020, + "1000", + "3000", + ) + .await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-query-3", + 1_700_010_070, + "1000", + "1500", + ) + .await; + let trade_service = crate::KbPairCandleAggregationService::new(database.clone()); + let record_1 = trade_service + .record_transaction_by_signature("sig-pair-candle-query-1") + .await; + if let Err(error) = record_1 { + panic!("candle aggregation 1 must succeed: {}", error); + } + let record_2 = trade_service + .record_transaction_by_signature("sig-pair-candle-query-2") + .await; + if let Err(error) = record_2 { + panic!("candle aggregation 2 must succeed: {}", error); + } + let record_3 = trade_service + .record_transaction_by_signature("sig-pair-candle-query-3") + .await; + if let Err(error) = record_3 { + panic!("candle aggregation 3 must succeed: {}", error); + } + let pools_result = crate::list_pools(database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => panic!("pool list must succeed: {}", error), + }; + let pool_id = pools[0].id.unwrap_or_default(); + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + let pair_id = pair.id.unwrap_or_default(); + let query_service = crate::KbPairCandleQueryService::new(database); + let candles_result = query_service + .list_pair_candles(pair_id, 120, None, None, false) + .await; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => panic!("custom candle query must succeed: {}", error), + }; + assert_eq!(candles.len(), 1); + assert_eq!(candles[0].open_price_quote_per_base, 2.0); + assert_eq!(candles[0].high_price_quote_per_base, 3.0); + assert_eq!(candles[0].low_price_quote_per_base, 1.5); + assert_eq!(candles[0].close_price_quote_per_base, 1.5); + assert_eq!(candles[0].trade_count, 3); + assert_eq!(candles[0].base_volume_raw, Some("3000".to_string())); + assert_eq!(candles[0].quote_volume_raw, Some("6500".to_string())); + } +} diff --git a/kb_lib/src/tx_resolution.rs b/kb_lib/src/tx_resolution.rs index 06b89c2..b7874d4 100644 --- a/kb_lib/src/tx_resolution.rs +++ b/kb_lib/src/tx_resolution.rs @@ -107,6 +107,7 @@ pub struct KbTransactionResolutionService { wallet_observation_service: crate::KbWalletObservationService, trade_aggregation_service: crate::KbTradeAggregationService, wallet_holding_observation_service: crate::KbWalletHoldingObservationService, + pair_candle_aggregation_service: crate::KbPairCandleAggregationService, resolved_signatures: std::sync::Arc>>, } @@ -128,6 +129,8 @@ impl KbTransactionResolutionService { let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone()); let wallet_holding_observation_service = crate::KbWalletHoldingObservationService::new(database.clone()); + let pair_candle_aggregation_service = + crate::KbPairCandleAggregationService::new(database.clone()); Self { http_pool, persistence, @@ -140,6 +143,7 @@ impl KbTransactionResolutionService { wallet_observation_service, trade_aggregation_service, wallet_holding_observation_service, + pair_candle_aggregation_service, resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), )), @@ -375,6 +379,15 @@ impl KbTransactionResolutionService { Err(error) => return Err(error), }; let trade_event_count = trade_aggregations.len(); + let pair_candle_aggregations_result = self + .pair_candle_aggregation_service + .record_transaction_by_signature(request.signature.as_str()) + .await; + let pair_candle_aggregations = match pair_candle_aggregations_result { + Ok(pair_candle_aggregations) => pair_candle_aggregations, + Err(error) => return Err(error), + }; + let pair_candle_count = pair_candle_aggregations.len(); let payload = serde_json::json!({ "status": "resolved", "signature": request.signature.clone(), @@ -389,6 +402,7 @@ impl KbTransactionResolutionService { "walletParticipationCount": wallet_participation_count, "walletHoldingCount": wallet_holding_count, "tradeEventCount": trade_event_count, + "pairCandleCount": pair_candle_count, "triggerPayload": request.trigger_payload.clone(), "transaction": transaction_value });