diff --git a/kb_lib/src/db/queries/trade_event.rs b/kb_lib/src/db/queries/trade_event.rs index b7a7db7..257a74a 100644 --- a/kb_lib/src/db/queries/trade_event.rs +++ b/kb_lib/src/db/queries/trade_event.rs @@ -33,9 +33,20 @@ INSERT INTO kb_trade_events ( ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(decoded_event_id) DO UPDATE SET + dex_id = excluded.dex_id, + pool_id = excluded.pool_id, + pair_id = excluded.pair_id, + transaction_id = excluded.transaction_id, + signature = excluded.signature, + slot = COALESCE(excluded.slot, kb_trade_events.slot), + trade_side = excluded.trade_side, + base_token_id = excluded.base_token_id, + quote_token_id = excluded.quote_token_id, base_amount_raw = COALESCE(excluded.base_amount_raw, kb_trade_events.base_amount_raw), quote_amount_raw = COALESCE(excluded.quote_amount_raw, kb_trade_events.quote_amount_raw), price_quote_per_base = COALESCE(excluded.price_quote_per_base, kb_trade_events.price_quote_per_base), + source_kind = excluded.source_kind, + source_endpoint_name = COALESCE(excluded.source_endpoint_name, kb_trade_events.source_endpoint_name), payload_json = excluded.payload_json, updated_at = excluded.updated_at "#, diff --git a/kb_lib/src/dex/pump_swap.rs b/kb_lib/src/dex/pump_swap.rs index 83dfb9e..9c9ed2b 100644 --- a/kb_lib/src/dex/pump_swap.rs +++ b/kb_lib/src/dex/pump_swap.rs @@ -122,7 +122,7 @@ impl KbPumpSwapDecoder { "mint0", ], ) - .or_else(|| kb_extract_account(&accounts, 1)); + .or_else(|| kb_extract_account(&accounts, 3)); let token_b_mint = kb_extract_string_by_candidate_keys( parsed_json.as_ref(), &[ @@ -134,7 +134,7 @@ impl KbPumpSwapDecoder { "mint1", ], ) - .or_else(|| kb_extract_account(&accounts, 2)); + .or_else(|| kb_extract_account(&accounts, 4)); let pool_v2 = kb_extract_string_by_candidate_keys( parsed_json.as_ref(), &[ @@ -144,8 +144,9 @@ impl KbPumpSwapDecoder { "bondingCurveV2", "bonding_curve_v2", ], - ) - .or_else(|| kb_extract_account(&accounts, 3)); + ); + let pool_base_token_account = kb_extract_account(&accounts, 7); + let pool_quote_token_account = kb_extract_account(&accounts, 8); let is_buy = kb_log_messages_contain_keyword(&log_messages, "buy"); let is_sell = kb_log_messages_contain_keyword(&log_messages, "sell"); if !is_buy && !is_sell { @@ -162,7 +163,9 @@ impl KbPumpSwapDecoder { "poolAccount": pool_account, "tokenAMint": token_a_mint, "tokenBMint": token_b_mint, - "poolV2": pool_v2 + "poolV2": pool_v2, + "poolBaseTokenAccount": pool_base_token_account, + "poolQuoteTokenAccount": pool_quote_token_account }); if is_buy { decoded_events.push(crate::KbPumpSwapDecodedEvent::BuyTrade( @@ -376,7 +379,17 @@ mod tests { Some(crate::KB_PUMP_SWAP_PROGRAM_ID.to_string()), Some("pump-amm".to_string()), Some(1), - serde_json::json!(["PumpPool111", "TokenA111", "TokenB111", "PoolV2_111"]).to_string(), + serde_json::json!([ + "PumpPool111", + "User111", + "GlobalConfig111", + "TokenA111", + "TokenB111", + "UserBaseAta111", + "UserQuoteAta111", + "PoolBaseVault111", + "PoolQuoteVault111" + ]).to_string(), None, None, Some( @@ -414,6 +427,14 @@ mod tests { assert_eq!(event.token_a_mint, Some("TokenA111".to_string())); assert_eq!(event.token_b_mint, Some("TokenB111".to_string())); assert_eq!(event.pool_v2, Some("PoolV2_111".to_string())); + assert_eq!( + event.payload_json.get("poolBaseTokenAccount"), + Some(&serde_json::Value::String("PoolBaseVault111".to_string())) + ); + assert_eq!( + event.payload_json.get("poolQuoteTokenAccount"), + Some(&serde_json::Value::String("PoolQuoteVault111".to_string())) + ); assert_eq!(event.trade_side, crate::KbSwapTradeSide::BuyBase); } crate::KbPumpSwapDecodedEvent::SellTrade(_) => { diff --git a/kb_lib/src/dex_detect.rs b/kb_lib/src/dex_detect.rs index dbb5436..a1d9b33 100644 --- a/kb_lib/src/dex_detect.rs +++ b/kb_lib/src/dex_detect.rs @@ -905,33 +905,17 @@ impl KbDexDetectService { }; let created_pair = existing_pair_option.is_none(); let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); - let pair_id = match existing_pair_option { - Some(pair) => { - let pair_id_option = pair.id; - match pair_id_option { - Some(pair_id) => pair_id, - None => { - return Err(crate::KbError::InvalidState(format!( - "pair for pool '{}' has no internal id", - pool_id - ))); - } - } - } - None => { - let pair_dto = crate::KbPairDto::new( - dex_id, - pool_id, - base_token_id, - quote_token_id, - pair_symbol, - ); - let upsert_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; - match upsert_result { - Ok(pair_id) => pair_id, - Err(error) => return Err(error), - } - } + let pair_dto = crate::KbPairDto::new( + dex_id, + pool_id, + base_token_id, + quote_token_id, + pair_symbol, + ); + let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; + let pair_id = match pair_id_result { + Ok(pair_id) => pair_id, + Err(error) => return Err(error), }; let upsert_base_pool_token_result = crate::upsert_pool_token( self.database.as_ref(), diff --git a/kb_lib/src/pair_candle_query.rs b/kb_lib/src/pair_candle_query.rs index 05d9d7c..2824294 100644 --- a/kb_lib/src/pair_candle_query.rs +++ b/kb_lib/src/pair_candle_query.rs @@ -20,7 +20,8 @@ impl KbPairCandleQueryService { /// Lists candles for one pair and one timeframe. /// /// When `prefer_materialized` is true and the timeframe is standard, - /// stored candles are returned. Otherwise the candles are rebuilt on demand. + /// stored candles are returned when available. If no stored candle exists, + /// candles are rebuilt on demand from `trade_events`. pub async fn list_pair_candles( &self, pair_id: i64, @@ -46,11 +47,11 @@ impl KbPairCandleQueryService { Ok(candles) => candles, Err(error) => return Err(error), }; - return Ok(kb_filter_candles_by_bucket_range( - candles, - bucket_start_from, - bucket_start_to, - )); + let filtered_candles = + kb_filter_candles_by_bucket_range(candles, bucket_start_from, bucket_start_to); + if !filtered_candles.is_empty() { + return Ok(filtered_candles); + } } let trade_events_result = crate::list_trade_events_by_pair_id(self.database.as_ref(), pair_id).await; @@ -330,4 +331,63 @@ mod tests { assert_eq!(candles[0].base_volume_raw, Some("3000".to_string())); assert_eq!(candles[0].quote_volume_raw, Some("6500".to_string())); } + + #[tokio::test] + async fn list_pair_candles_falls_back_when_materialized_storage_is_empty() { + let database = make_database().await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-fallback-1", + 1_700_020_000, + "1000", + "2000", + ) + .await; + seed_fluxbeam_swap_transaction( + database.clone(), + "sig-pair-candle-fallback-2", + 1_700_020_020, + "1000", + "3000", + ) + .await; + let pools_result = crate::list_pools(database.as_ref()).await; + let pools = match pools_result { + Ok(pools) => pools, + Err(error) => panic!("pool list must succeed: {}", error), + }; + let pool_id = pools[0].id.unwrap_or_default(); + let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await; + let pair_option = match pair_result { + Ok(pair_option) => pair_option, + Err(error) => panic!("pair fetch must succeed: {}", error), + }; + let pair = match pair_option { + Some(pair) => pair, + None => panic!("pair must exist"), + }; + let pair_id = pair.id.unwrap_or_default(); + let materialized_result = + crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await; + let materialized = match materialized_result { + Ok(materialized) => materialized, + Err(error) => panic!("materialized candle list must succeed: {}", error), + }; + assert_eq!(materialized.len(), 0); + let query_service = crate::KbPairCandleQueryService::new(database); + let candles_result = query_service + .list_pair_candles(pair_id, 60, None, None, true) + .await; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => panic!("fallback candle query must succeed: {}", error), + }; + assert_eq!(candles.len(), 1); + assert_eq!(candles[0].open_price_quote_per_base, 2.0); + assert_eq!(candles[0].high_price_quote_per_base, 3.0); + assert_eq!(candles[0].low_price_quote_per_base, 2.0); + assert_eq!(candles[0].close_price_quote_per_base, 3.0); + assert_eq!(candles[0].trade_count, 2); + } + } diff --git a/kb_lib/src/token_backfill.rs b/kb_lib/src/token_backfill.rs index b1cf941..c44d006 100644 --- a/kb_lib/src/token_backfill.rs +++ b/kb_lib/src/token_backfill.rs @@ -31,6 +31,8 @@ pub struct KbTokenBackfillResult { pub wallet_participation_count: usize, /// Total number of trade-aggregation results produced during this run. pub trade_event_count: usize, + /// Total number of pair-candle aggregation results produced during this run. + pub pair_candle_count: usize, } /// One pool-backfill result summary. @@ -58,6 +60,8 @@ pub struct KbPoolBackfillResult { pub wallet_participation_count: usize, /// Total number of trade-aggregation results produced during this run. pub trade_event_count: usize, + /// Total number of pair-candle aggregation results produced during this run. + pub pair_candle_count: usize, } /// Historical token backfill service. @@ -77,6 +81,7 @@ pub struct KbTokenBackfillService { pool_origin_service: crate::KbPoolOriginService, wallet_observation_service: crate::KbWalletObservationService, trade_aggregation_service: crate::KbTradeAggregationService, + pair_candle_aggregation_service: crate::KbPairCandleAggregationService, } impl KbTokenBackfillService { @@ -94,6 +99,8 @@ impl KbTokenBackfillService { let pool_origin_service = crate::KbPoolOriginService::new(database.clone()); let wallet_observation_service = crate::KbWalletObservationService::new(database.clone()); let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone()); + let pair_candle_aggregation_service = + crate::KbPairCandleAggregationService::new(database.clone()); Self { http_pool, database, @@ -106,6 +113,7 @@ impl KbTokenBackfillService { pool_origin_service, wallet_observation_service, trade_aggregation_service, + pair_candle_aggregation_service, } } @@ -130,6 +138,7 @@ impl KbTokenBackfillService { pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, + pair_candle_count: 0, }; let mut seen_signatures = std::collections::HashSet::::new(); let mint_signatures_result = self @@ -199,7 +208,8 @@ impl KbTokenBackfillService { "launchAttributionCount": result.launch_attribution_count, "poolOriginCount": result.pool_origin_count, "walletParticipationCount": result.wallet_participation_count, - "tradeEventCount": result.trade_event_count + "tradeEventCount": result.trade_event_count, + "pairCandleCount": result.pair_candle_count }); let observation_result = self .persistence @@ -335,6 +345,7 @@ impl KbTokenBackfillService { pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, + pair_candle_count: 0, }); } let existing_transaction_result = @@ -405,6 +416,14 @@ impl KbTokenBackfillService { Ok(trade_aggregations) => trade_aggregations, Err(error) => return Err(error), }; + let pair_candle_aggregations_result = self + .pair_candle_aggregation_service + .record_transaction_by_signature(signature.as_str()) + .await; + let pair_candle_aggregations = match pair_candle_aggregations_result { + Ok(pair_candle_aggregations) => pair_candle_aggregations, + Err(error) => return Err(error), + }; Ok(KbTokenBackfillSignatureResult { resolved_transaction_count: 1, missing_transaction_count: 0, @@ -414,6 +433,7 @@ impl KbTokenBackfillService { pool_origin_count: pool_origins.len(), wallet_participation_count: wallet_observations.len(), trade_event_count: trade_aggregations.len(), + pair_candle_count: pair_candle_aggregations.len(), }) } @@ -440,6 +460,7 @@ impl KbTokenBackfillService { pool_origin_count: 0, wallet_participation_count: 0, trade_event_count: 0, + pair_candle_count: 0, }; let mut seen_addresses = std::collections::BTreeSet::::new(); let mut addresses_to_scan = std::vec::Vec::::new(); @@ -522,6 +543,7 @@ impl KbTokenBackfillService { result.pool_origin_count += replay_result.pool_origin_count; result.wallet_participation_count += replay_result.wallet_participation_count; result.trade_event_count += replay_result.trade_event_count; + result.pair_candle_count += replay_result.pair_candle_count; } } let summary_payload = serde_json::json!({ @@ -536,6 +558,7 @@ impl KbTokenBackfillService { "poolOriginCount": result.pool_origin_count, "walletParticipationCount": result.wallet_participation_count, "tradeEventCount": result.trade_event_count, + "pairCandleCount": result.pair_candle_count, "scannedAddressCount": addresses_to_scan.len(), "effectiveSignatureLimit": effective_limit }); @@ -582,6 +605,7 @@ struct KbTokenBackfillSignatureResult { pool_origin_count: usize, wallet_participation_count: usize, trade_event_count: usize, + pair_candle_count: usize, } fn kb_merge_token_backfill_signature_result( @@ -596,6 +620,7 @@ fn kb_merge_token_backfill_signature_result( aggregate.pool_origin_count += value.pool_origin_count; aggregate.wallet_participation_count += value.wallet_participation_count; aggregate.trade_event_count += value.trade_event_count; + aggregate.pair_candle_count += value.pair_candle_count; } #[cfg(test)] @@ -948,6 +973,7 @@ mod tests { assert_eq!(backfill.resolved_transaction_count, 2); assert_eq!(backfill.missing_transaction_count, 0); assert_eq!(backfill.trade_event_count, 1); + assert!(backfill.pair_candle_count > 0); let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await; let token_option = match token_result { Ok(token_option) => token_option, @@ -1004,6 +1030,15 @@ mod tests { }; assert_eq!(pair_metric.trade_count, 1); assert_eq!(pair_metric.buy_count, 1); + let candles_result = + crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await; + let candles = match candles_result { + Ok(candles) => candles, + Err(error) => panic!("pair candle list must succeed: {}", error), + }; + assert_eq!(candles.len(), 1); + assert_eq!(candles[0].trade_count, 1); + assert_eq!(candles[0].close_price_quote_per_base, 2.5); server.shutdown().await; } diff --git a/kb_lib/src/trade_aggregation.rs b/kb_lib/src/trade_aggregation.rs index 7d9e2f5..e231d86 100644 --- a/kb_lib/src/trade_aggregation.rs +++ b/kb_lib/src/trade_aggregation.rs @@ -212,14 +212,7 @@ impl KbTradeAggregationService { ); } let slot_i64 = kb_convert_slot_to_i64(transaction.slot); - let existing_trade_was_empty = match &existing_trade_option { - Some(existing_trade) => { - existing_trade.base_amount_raw.is_none() - && existing_trade.quote_amount_raw.is_none() - && existing_trade.price_quote_per_base.is_none() - } - None => false, - }; + let created_trade_event = existing_trade_option.is_none(); let trade_event_dto = crate::KbTradeEventDto::new( pool.dex_id, pool_id, @@ -242,20 +235,15 @@ impl KbTradeAggregationService { event_kind = %decoded_event.event_kind, pool_account = ?decoded_event.pool_account, decoded_event_id = ?decoded_event.id, + created_trade_event = created_trade_event, "trade aggregation candidate" ); - let upsert_result = + let trade_event_id_result = crate::upsert_trade_event(self.database.as_ref(), &trade_event_dto).await; - let trade_event_id = match upsert_result { + let trade_event_id = match trade_event_id_result { Ok(trade_event_id) => trade_event_id, Err(error) => return Err(error), }; - let created_trade_event = existing_trade_option.is_none(); - let repaired_trade_event = !created_trade_event - && existing_trade_was_empty - && (base_amount_raw.is_some() - || quote_amount_raw.is_some() - || price_quote_per_base.is_some()); let pair_metric_result = crate::get_pair_metric_by_pair_id(self.database.as_ref(), pair_id).await; let pair_metric_option = match pair_metric_result { @@ -271,7 +259,7 @@ impl KbTradeAggregationService { )); } }; - if created_trade_event || repaired_trade_event { + if created_trade_event { let mut updated_metric = existing_metric.clone(); kb_apply_trade_to_pair_metric( &mut updated_metric, diff --git a/khadhroony-bobobot-v0.7.24-pre.1.zip b/khadhroony-bobobot-v0.7.24-pre.1.zip deleted file mode 100644 index 5708ff3..0000000 Binary files a/khadhroony-bobobot-v0.7.24-pre.1.zip and /dev/null differ