0.7.24-pre.1

This commit is contained in:
2026-05-02 13:17:23 +02:00
parent aaff2dbd94
commit d10a2270d8
7 changed files with 156 additions and 57 deletions

View File

@@ -33,9 +33,20 @@ INSERT INTO kb_trade_events (
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(decoded_event_id) DO UPDATE SET ON CONFLICT(decoded_event_id) DO UPDATE SET
dex_id = excluded.dex_id,
pool_id = excluded.pool_id,
pair_id = excluded.pair_id,
transaction_id = excluded.transaction_id,
signature = excluded.signature,
slot = COALESCE(excluded.slot, kb_trade_events.slot),
trade_side = excluded.trade_side,
base_token_id = excluded.base_token_id,
quote_token_id = excluded.quote_token_id,
base_amount_raw = COALESCE(excluded.base_amount_raw, kb_trade_events.base_amount_raw), base_amount_raw = COALESCE(excluded.base_amount_raw, kb_trade_events.base_amount_raw),
quote_amount_raw = COALESCE(excluded.quote_amount_raw, kb_trade_events.quote_amount_raw), quote_amount_raw = COALESCE(excluded.quote_amount_raw, kb_trade_events.quote_amount_raw),
price_quote_per_base = COALESCE(excluded.price_quote_per_base, kb_trade_events.price_quote_per_base), price_quote_per_base = COALESCE(excluded.price_quote_per_base, kb_trade_events.price_quote_per_base),
source_kind = excluded.source_kind,
source_endpoint_name = COALESCE(excluded.source_endpoint_name, kb_trade_events.source_endpoint_name),
payload_json = excluded.payload_json, payload_json = excluded.payload_json,
updated_at = excluded.updated_at updated_at = excluded.updated_at
"#, "#,

View File

@@ -122,7 +122,7 @@ impl KbPumpSwapDecoder {
"mint0", "mint0",
], ],
) )
.or_else(|| kb_extract_account(&accounts, 1)); .or_else(|| kb_extract_account(&accounts, 3));
let token_b_mint = kb_extract_string_by_candidate_keys( let token_b_mint = kb_extract_string_by_candidate_keys(
parsed_json.as_ref(), parsed_json.as_ref(),
&[ &[
@@ -134,7 +134,7 @@ impl KbPumpSwapDecoder {
"mint1", "mint1",
], ],
) )
.or_else(|| kb_extract_account(&accounts, 2)); .or_else(|| kb_extract_account(&accounts, 4));
let pool_v2 = kb_extract_string_by_candidate_keys( let pool_v2 = kb_extract_string_by_candidate_keys(
parsed_json.as_ref(), parsed_json.as_ref(),
&[ &[
@@ -144,8 +144,9 @@ impl KbPumpSwapDecoder {
"bondingCurveV2", "bondingCurveV2",
"bonding_curve_v2", "bonding_curve_v2",
], ],
) );
.or_else(|| kb_extract_account(&accounts, 3)); let pool_base_token_account = kb_extract_account(&accounts, 7);
let pool_quote_token_account = kb_extract_account(&accounts, 8);
let is_buy = kb_log_messages_contain_keyword(&log_messages, "buy"); let is_buy = kb_log_messages_contain_keyword(&log_messages, "buy");
let is_sell = kb_log_messages_contain_keyword(&log_messages, "sell"); let is_sell = kb_log_messages_contain_keyword(&log_messages, "sell");
if !is_buy && !is_sell { if !is_buy && !is_sell {
@@ -162,7 +163,9 @@ impl KbPumpSwapDecoder {
"poolAccount": pool_account, "poolAccount": pool_account,
"tokenAMint": token_a_mint, "tokenAMint": token_a_mint,
"tokenBMint": token_b_mint, "tokenBMint": token_b_mint,
"poolV2": pool_v2 "poolV2": pool_v2,
"poolBaseTokenAccount": pool_base_token_account,
"poolQuoteTokenAccount": pool_quote_token_account
}); });
if is_buy { if is_buy {
decoded_events.push(crate::KbPumpSwapDecodedEvent::BuyTrade( decoded_events.push(crate::KbPumpSwapDecodedEvent::BuyTrade(
@@ -376,7 +379,17 @@ mod tests {
Some(crate::KB_PUMP_SWAP_PROGRAM_ID.to_string()), Some(crate::KB_PUMP_SWAP_PROGRAM_ID.to_string()),
Some("pump-amm".to_string()), Some("pump-amm".to_string()),
Some(1), Some(1),
serde_json::json!(["PumpPool111", "TokenA111", "TokenB111", "PoolV2_111"]).to_string(), serde_json::json!([
"PumpPool111",
"User111",
"GlobalConfig111",
"TokenA111",
"TokenB111",
"UserBaseAta111",
"UserQuoteAta111",
"PoolBaseVault111",
"PoolQuoteVault111"
]).to_string(),
None, None,
None, None,
Some( Some(
@@ -414,6 +427,14 @@ mod tests {
assert_eq!(event.token_a_mint, Some("TokenA111".to_string())); assert_eq!(event.token_a_mint, Some("TokenA111".to_string()));
assert_eq!(event.token_b_mint, Some("TokenB111".to_string())); assert_eq!(event.token_b_mint, Some("TokenB111".to_string()));
assert_eq!(event.pool_v2, Some("PoolV2_111".to_string())); assert_eq!(event.pool_v2, Some("PoolV2_111".to_string()));
assert_eq!(
event.payload_json.get("poolBaseTokenAccount"),
Some(&serde_json::Value::String("PoolBaseVault111".to_string()))
);
assert_eq!(
event.payload_json.get("poolQuoteTokenAccount"),
Some(&serde_json::Value::String("PoolQuoteVault111".to_string()))
);
assert_eq!(event.trade_side, crate::KbSwapTradeSide::BuyBase); assert_eq!(event.trade_side, crate::KbSwapTradeSide::BuyBase);
} }
crate::KbPumpSwapDecodedEvent::SellTrade(_) => { crate::KbPumpSwapDecodedEvent::SellTrade(_) => {

View File

@@ -905,20 +905,6 @@ impl KbDexDetectService {
}; };
let created_pair = existing_pair_option.is_none(); let created_pair = existing_pair_option.is_none();
let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str()); let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str());
let pair_id = match existing_pair_option {
Some(pair) => {
let pair_id_option = pair.id;
match pair_id_option {
Some(pair_id) => pair_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pair for pool '{}' has no internal id",
pool_id
)));
}
}
}
None => {
let pair_dto = crate::KbPairDto::new( let pair_dto = crate::KbPairDto::new(
dex_id, dex_id,
pool_id, pool_id,
@@ -926,12 +912,10 @@ impl KbDexDetectService {
quote_token_id, quote_token_id,
pair_symbol, pair_symbol,
); );
let upsert_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await; let pair_id_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await;
match upsert_result { let pair_id = match pair_id_result {
Ok(pair_id) => pair_id, Ok(pair_id) => pair_id,
Err(error) => return Err(error), Err(error) => return Err(error),
}
}
}; };
let upsert_base_pool_token_result = crate::upsert_pool_token( let upsert_base_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(), self.database.as_ref(),

View File

@@ -20,7 +20,8 @@ impl KbPairCandleQueryService {
/// Lists candles for one pair and one timeframe. /// Lists candles for one pair and one timeframe.
/// ///
/// When `prefer_materialized` is true and the timeframe is standard, /// When `prefer_materialized` is true and the timeframe is standard,
/// stored candles are returned. Otherwise the candles are rebuilt on demand. /// stored candles are returned when available. If no stored candle exists,
/// candles are rebuilt on demand from `trade_events`.
pub async fn list_pair_candles( pub async fn list_pair_candles(
&self, &self,
pair_id: i64, pair_id: i64,
@@ -46,11 +47,11 @@ impl KbPairCandleQueryService {
Ok(candles) => candles, Ok(candles) => candles,
Err(error) => return Err(error), Err(error) => return Err(error),
}; };
return Ok(kb_filter_candles_by_bucket_range( let filtered_candles =
candles, kb_filter_candles_by_bucket_range(candles, bucket_start_from, bucket_start_to);
bucket_start_from, if !filtered_candles.is_empty() {
bucket_start_to, return Ok(filtered_candles);
)); }
} }
let trade_events_result = let trade_events_result =
crate::list_trade_events_by_pair_id(self.database.as_ref(), pair_id).await; crate::list_trade_events_by_pair_id(self.database.as_ref(), pair_id).await;
@@ -330,4 +331,63 @@ mod tests {
assert_eq!(candles[0].base_volume_raw, Some("3000".to_string())); assert_eq!(candles[0].base_volume_raw, Some("3000".to_string()));
assert_eq!(candles[0].quote_volume_raw, Some("6500".to_string())); assert_eq!(candles[0].quote_volume_raw, Some("6500".to_string()));
} }
#[tokio::test]
async fn list_pair_candles_falls_back_when_materialized_storage_is_empty() {
let database = make_database().await;
seed_fluxbeam_swap_transaction(
database.clone(),
"sig-pair-candle-fallback-1",
1_700_020_000,
"1000",
"2000",
)
.await;
seed_fluxbeam_swap_transaction(
database.clone(),
"sig-pair-candle-fallback-2",
1_700_020_020,
"1000",
"3000",
)
.await;
let pools_result = crate::list_pools(database.as_ref()).await;
let pools = match pools_result {
Ok(pools) => pools,
Err(error) => panic!("pool list must succeed: {}", error),
};
let pool_id = pools[0].id.unwrap_or_default();
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
let pair_id = pair.id.unwrap_or_default();
let materialized_result =
crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await;
let materialized = match materialized_result {
Ok(materialized) => materialized,
Err(error) => panic!("materialized candle list must succeed: {}", error),
};
assert_eq!(materialized.len(), 0);
let query_service = crate::KbPairCandleQueryService::new(database);
let candles_result = query_service
.list_pair_candles(pair_id, 60, None, None, true)
.await;
let candles = match candles_result {
Ok(candles) => candles,
Err(error) => panic!("fallback candle query must succeed: {}", error),
};
assert_eq!(candles.len(), 1);
assert_eq!(candles[0].open_price_quote_per_base, 2.0);
assert_eq!(candles[0].high_price_quote_per_base, 3.0);
assert_eq!(candles[0].low_price_quote_per_base, 2.0);
assert_eq!(candles[0].close_price_quote_per_base, 3.0);
assert_eq!(candles[0].trade_count, 2);
}
} }

View File

@@ -31,6 +31,8 @@ pub struct KbTokenBackfillResult {
pub wallet_participation_count: usize, pub wallet_participation_count: usize,
/// Total number of trade-aggregation results produced during this run. /// Total number of trade-aggregation results produced during this run.
pub trade_event_count: usize, pub trade_event_count: usize,
/// Total number of pair-candle aggregation results produced during this run.
pub pair_candle_count: usize,
} }
/// One pool-backfill result summary. /// One pool-backfill result summary.
@@ -58,6 +60,8 @@ pub struct KbPoolBackfillResult {
pub wallet_participation_count: usize, pub wallet_participation_count: usize,
/// Total number of trade-aggregation results produced during this run. /// Total number of trade-aggregation results produced during this run.
pub trade_event_count: usize, pub trade_event_count: usize,
/// Total number of pair-candle aggregation results produced during this run.
pub pair_candle_count: usize,
} }
/// Historical token backfill service. /// Historical token backfill service.
@@ -77,6 +81,7 @@ pub struct KbTokenBackfillService {
pool_origin_service: crate::KbPoolOriginService, pool_origin_service: crate::KbPoolOriginService,
wallet_observation_service: crate::KbWalletObservationService, wallet_observation_service: crate::KbWalletObservationService,
trade_aggregation_service: crate::KbTradeAggregationService, trade_aggregation_service: crate::KbTradeAggregationService,
pair_candle_aggregation_service: crate::KbPairCandleAggregationService,
} }
impl KbTokenBackfillService { impl KbTokenBackfillService {
@@ -94,6 +99,8 @@ impl KbTokenBackfillService {
let pool_origin_service = crate::KbPoolOriginService::new(database.clone()); let pool_origin_service = crate::KbPoolOriginService::new(database.clone());
let wallet_observation_service = crate::KbWalletObservationService::new(database.clone()); let wallet_observation_service = crate::KbWalletObservationService::new(database.clone());
let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone()); let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone());
let pair_candle_aggregation_service =
crate::KbPairCandleAggregationService::new(database.clone());
Self { Self {
http_pool, http_pool,
database, database,
@@ -106,6 +113,7 @@ impl KbTokenBackfillService {
pool_origin_service, pool_origin_service,
wallet_observation_service, wallet_observation_service,
trade_aggregation_service, trade_aggregation_service,
pair_candle_aggregation_service,
} }
} }
@@ -130,6 +138,7 @@ impl KbTokenBackfillService {
pool_origin_count: 0, pool_origin_count: 0,
wallet_participation_count: 0, wallet_participation_count: 0,
trade_event_count: 0, trade_event_count: 0,
pair_candle_count: 0,
}; };
let mut seen_signatures = std::collections::HashSet::<std::string::String>::new(); let mut seen_signatures = std::collections::HashSet::<std::string::String>::new();
let mint_signatures_result = self let mint_signatures_result = self
@@ -199,7 +208,8 @@ impl KbTokenBackfillService {
"launchAttributionCount": result.launch_attribution_count, "launchAttributionCount": result.launch_attribution_count,
"poolOriginCount": result.pool_origin_count, "poolOriginCount": result.pool_origin_count,
"walletParticipationCount": result.wallet_participation_count, "walletParticipationCount": result.wallet_participation_count,
"tradeEventCount": result.trade_event_count "tradeEventCount": result.trade_event_count,
"pairCandleCount": result.pair_candle_count
}); });
let observation_result = self let observation_result = self
.persistence .persistence
@@ -335,6 +345,7 @@ impl KbTokenBackfillService {
pool_origin_count: 0, pool_origin_count: 0,
wallet_participation_count: 0, wallet_participation_count: 0,
trade_event_count: 0, trade_event_count: 0,
pair_candle_count: 0,
}); });
} }
let existing_transaction_result = let existing_transaction_result =
@@ -405,6 +416,14 @@ impl KbTokenBackfillService {
Ok(trade_aggregations) => trade_aggregations, Ok(trade_aggregations) => trade_aggregations,
Err(error) => return Err(error), Err(error) => return Err(error),
}; };
let pair_candle_aggregations_result = self
.pair_candle_aggregation_service
.record_transaction_by_signature(signature.as_str())
.await;
let pair_candle_aggregations = match pair_candle_aggregations_result {
Ok(pair_candle_aggregations) => pair_candle_aggregations,
Err(error) => return Err(error),
};
Ok(KbTokenBackfillSignatureResult { Ok(KbTokenBackfillSignatureResult {
resolved_transaction_count: 1, resolved_transaction_count: 1,
missing_transaction_count: 0, missing_transaction_count: 0,
@@ -414,6 +433,7 @@ impl KbTokenBackfillService {
pool_origin_count: pool_origins.len(), pool_origin_count: pool_origins.len(),
wallet_participation_count: wallet_observations.len(), wallet_participation_count: wallet_observations.len(),
trade_event_count: trade_aggregations.len(), trade_event_count: trade_aggregations.len(),
pair_candle_count: pair_candle_aggregations.len(),
}) })
} }
@@ -440,6 +460,7 @@ impl KbTokenBackfillService {
pool_origin_count: 0, pool_origin_count: 0,
wallet_participation_count: 0, wallet_participation_count: 0,
trade_event_count: 0, trade_event_count: 0,
pair_candle_count: 0,
}; };
let mut seen_addresses = std::collections::BTreeSet::<std::string::String>::new(); let mut seen_addresses = std::collections::BTreeSet::<std::string::String>::new();
let mut addresses_to_scan = std::vec::Vec::<std::string::String>::new(); let mut addresses_to_scan = std::vec::Vec::<std::string::String>::new();
@@ -522,6 +543,7 @@ impl KbTokenBackfillService {
result.pool_origin_count += replay_result.pool_origin_count; result.pool_origin_count += replay_result.pool_origin_count;
result.wallet_participation_count += replay_result.wallet_participation_count; result.wallet_participation_count += replay_result.wallet_participation_count;
result.trade_event_count += replay_result.trade_event_count; result.trade_event_count += replay_result.trade_event_count;
result.pair_candle_count += replay_result.pair_candle_count;
} }
} }
let summary_payload = serde_json::json!({ let summary_payload = serde_json::json!({
@@ -536,6 +558,7 @@ impl KbTokenBackfillService {
"poolOriginCount": result.pool_origin_count, "poolOriginCount": result.pool_origin_count,
"walletParticipationCount": result.wallet_participation_count, "walletParticipationCount": result.wallet_participation_count,
"tradeEventCount": result.trade_event_count, "tradeEventCount": result.trade_event_count,
"pairCandleCount": result.pair_candle_count,
"scannedAddressCount": addresses_to_scan.len(), "scannedAddressCount": addresses_to_scan.len(),
"effectiveSignatureLimit": effective_limit "effectiveSignatureLimit": effective_limit
}); });
@@ -582,6 +605,7 @@ struct KbTokenBackfillSignatureResult {
pool_origin_count: usize, pool_origin_count: usize,
wallet_participation_count: usize, wallet_participation_count: usize,
trade_event_count: usize, trade_event_count: usize,
pair_candle_count: usize,
} }
fn kb_merge_token_backfill_signature_result( fn kb_merge_token_backfill_signature_result(
@@ -596,6 +620,7 @@ fn kb_merge_token_backfill_signature_result(
aggregate.pool_origin_count += value.pool_origin_count; aggregate.pool_origin_count += value.pool_origin_count;
aggregate.wallet_participation_count += value.wallet_participation_count; aggregate.wallet_participation_count += value.wallet_participation_count;
aggregate.trade_event_count += value.trade_event_count; aggregate.trade_event_count += value.trade_event_count;
aggregate.pair_candle_count += value.pair_candle_count;
} }
#[cfg(test)] #[cfg(test)]
@@ -948,6 +973,7 @@ mod tests {
assert_eq!(backfill.resolved_transaction_count, 2); assert_eq!(backfill.resolved_transaction_count, 2);
assert_eq!(backfill.missing_transaction_count, 0); assert_eq!(backfill.missing_transaction_count, 0);
assert_eq!(backfill.trade_event_count, 1); assert_eq!(backfill.trade_event_count, 1);
assert!(backfill.pair_candle_count > 0);
let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await; let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await;
let token_option = match token_result { let token_option = match token_result {
Ok(token_option) => token_option, Ok(token_option) => token_option,
@@ -1004,6 +1030,15 @@ mod tests {
}; };
assert_eq!(pair_metric.trade_count, 1); assert_eq!(pair_metric.trade_count, 1);
assert_eq!(pair_metric.buy_count, 1); assert_eq!(pair_metric.buy_count, 1);
let candles_result =
crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await;
let candles = match candles_result {
Ok(candles) => candles,
Err(error) => panic!("pair candle list must succeed: {}", error),
};
assert_eq!(candles.len(), 1);
assert_eq!(candles[0].trade_count, 1);
assert_eq!(candles[0].close_price_quote_per_base, 2.5);
server.shutdown().await; server.shutdown().await;
} }

View File

@@ -212,14 +212,7 @@ impl KbTradeAggregationService {
); );
} }
let slot_i64 = kb_convert_slot_to_i64(transaction.slot); let slot_i64 = kb_convert_slot_to_i64(transaction.slot);
let existing_trade_was_empty = match &existing_trade_option { let created_trade_event = existing_trade_option.is_none();
Some(existing_trade) => {
existing_trade.base_amount_raw.is_none()
&& existing_trade.quote_amount_raw.is_none()
&& existing_trade.price_quote_per_base.is_none()
}
None => false,
};
let trade_event_dto = crate::KbTradeEventDto::new( let trade_event_dto = crate::KbTradeEventDto::new(
pool.dex_id, pool.dex_id,
pool_id, pool_id,
@@ -242,20 +235,15 @@ impl KbTradeAggregationService {
event_kind = %decoded_event.event_kind, event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account, pool_account = ?decoded_event.pool_account,
decoded_event_id = ?decoded_event.id, decoded_event_id = ?decoded_event.id,
created_trade_event = created_trade_event,
"trade aggregation candidate" "trade aggregation candidate"
); );
let upsert_result = let trade_event_id_result =
crate::upsert_trade_event(self.database.as_ref(), &trade_event_dto).await; crate::upsert_trade_event(self.database.as_ref(), &trade_event_dto).await;
let trade_event_id = match upsert_result { let trade_event_id = match trade_event_id_result {
Ok(trade_event_id) => trade_event_id, Ok(trade_event_id) => trade_event_id,
Err(error) => return Err(error), Err(error) => return Err(error),
}; };
let created_trade_event = existing_trade_option.is_none();
let repaired_trade_event = !created_trade_event
&& existing_trade_was_empty
&& (base_amount_raw.is_some()
|| quote_amount_raw.is_some()
|| price_quote_per_base.is_some());
let pair_metric_result = let pair_metric_result =
crate::get_pair_metric_by_pair_id(self.database.as_ref(), pair_id).await; crate::get_pair_metric_by_pair_id(self.database.as_ref(), pair_id).await;
let pair_metric_option = match pair_metric_result { let pair_metric_option = match pair_metric_result {
@@ -271,7 +259,7 @@ impl KbTradeAggregationService {
)); ));
} }
}; };
if created_trade_event || repaired_trade_event { if created_trade_event {
let mut updated_metric = existing_metric.clone(); let mut updated_metric = existing_metric.clone();
kb_apply_trade_to_pair_metric( kb_apply_trade_to_pair_metric(
&mut updated_metric, &mut updated_metric,

Binary file not shown.