0.7.27 +Refactor

This commit is contained in:
2026-05-10 00:33:01 +02:00
parent cb2e8e7096
commit 1f0137b9de
261 changed files with 12308 additions and 8928 deletions

View File

@@ -4,7 +4,7 @@
/// One token-backfill result summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbTokenBackfillResult {
pub struct TokenBackfillResult {
/// Input token mint.
pub token_mint: std::string::String,
/// Number of signatures returned directly for the mint.
@@ -37,7 +37,7 @@ pub struct KbTokenBackfillResult {
/// One pool-backfill result summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbPoolBackfillResult {
pub struct PoolBackfillResult {
/// Input pool address.
pub pool_address: std::string::String,
/// Number of signatures returned directly for the pool address.
@@ -69,40 +69,40 @@ pub struct KbPoolBackfillResult {
/// This service reuses the existing transaction projection and downstream
/// DEX pipeline instead of introducing a separate historical code path.
#[derive(Debug, Clone)]
pub struct KbTokenBackfillService {
pub struct TokenBackfillService {
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
database: std::sync::Arc<crate::KbDatabase>,
persistence: crate::KbDetectionPersistenceService,
database: std::sync::Arc<crate::Database>,
persistence: crate::DetectionPersistenceService,
http_role: std::string::String,
transaction_model: crate::KbTransactionModelService,
dex_decode_service: crate::KbDexDecodeService,
dex_detect_service: crate::KbDexDetectService,
launch_origin_service: crate::KbLaunchOriginService,
pool_origin_service: crate::KbPoolOriginService,
wallet_observation_service: crate::KbWalletObservationService,
trade_aggregation_service: crate::KbTradeAggregationService,
pair_candle_aggregation_service: crate::KbPairCandleAggregationService,
token_metadata_service: crate::KbTokenMetadataBackfillService,
transaction_model: crate::TransactionModelService,
dex_decode_service: crate::DexDecodeService,
dex_detect_service: crate::DexDetectService,
launch_origin_service: crate::LaunchOriginService,
pool_origin_service: crate::PoolOriginService,
wallet_observation_service: crate::WalletObservationService,
trade_aggregation_service: crate::TradeAggregationService,
pair_candle_aggregation_service: crate::PairCandleAggregationService,
token_metadata_service: crate::TokenMetadataBackfillService,
}
impl KbTokenBackfillService {
impl TokenBackfillService {
/// Creates a new token-backfill service.
pub fn new(
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
database: std::sync::Arc<crate::KbDatabase>,
database: std::sync::Arc<crate::Database>,
http_role: std::string::String,
) -> Self {
let persistence = crate::KbDetectionPersistenceService::new(database.clone());
let transaction_model = crate::KbTransactionModelService::new(database.clone());
let dex_decode_service = crate::KbDexDecodeService::new(database.clone());
let dex_detect_service = crate::KbDexDetectService::new(database.clone());
let launch_origin_service = crate::KbLaunchOriginService::new(database.clone());
let pool_origin_service = crate::KbPoolOriginService::new(database.clone());
let wallet_observation_service = crate::KbWalletObservationService::new(database.clone());
let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone());
let persistence = crate::DetectionPersistenceService::new(database.clone());
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode_service = crate::DexDecodeService::new(database.clone());
let dex_detect_service = crate::DexDetectService::new(database.clone());
let launch_origin_service = crate::LaunchOriginService::new(database.clone());
let pool_origin_service = crate::PoolOriginService::new(database.clone());
let wallet_observation_service = crate::WalletObservationService::new(database.clone());
let trade_aggregation_service = crate::TradeAggregationService::new(database.clone());
let pair_candle_aggregation_service =
crate::KbPairCandleAggregationService::new(database.clone());
let token_metadata_service = crate::KbTokenMetadataBackfillService::new(
crate::PairCandleAggregationService::new(database.clone());
let token_metadata_service = crate::TokenMetadataBackfillService::new(
http_pool.clone(),
database.clone(),
http_role.clone(),
@@ -130,8 +130,8 @@ impl KbTokenBackfillService {
token_mint: &str,
mint_signature_limit: usize,
pool_signature_limit: usize,
) -> Result<crate::KbTokenBackfillResult, crate::KbError> {
let mut result = crate::KbTokenBackfillResult {
) -> Result<crate::TokenBackfillResult, crate::Error> {
let mut result = crate::TokenBackfillResult {
token_mint: token_mint.to_string(),
mint_signature_count: 0,
pool_address_count: 0,
@@ -169,7 +169,7 @@ impl KbTokenBackfillService {
Ok(replay_result) => replay_result,
Err(error) => return Err(error),
};
kb_merge_token_backfill_signature_result(&mut result, replay_result);
merge_token_backfill_signature_result(&mut result, replay_result);
}
let pool_addresses_result = self.collect_pool_addresses_for_token_mint(token_mint).await;
let pool_addresses = match pool_addresses_result {
@@ -199,7 +199,7 @@ impl KbTokenBackfillService {
Ok(replay_result) => replay_result,
Err(error) => return Err(error),
};
kb_merge_token_backfill_signature_result(&mut result, replay_result);
merge_token_backfill_signature_result(&mut result, replay_result);
}
}
self.backfill_missing_token_metadata_best_effort(100).await;
@@ -221,9 +221,9 @@ impl KbTokenBackfillService {
});
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
.record_observation(&crate::DetectionObservationInput::new(
"token.backfill.completed".to_string(),
crate::KbObservationSourceKind::HttpRpc,
crate::ObservationSourceKind::HttpRpc,
Some(format!("backfill:{}", self.http_role)),
token_mint.to_string(),
None,
@@ -236,9 +236,9 @@ impl KbTokenBackfillService {
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
.record_signal(&crate::DetectionSignalInput::new(
"signal.token.backfill.completed".to_string(),
crate::KbAnalysisSignalSeverity::Low,
crate::AnalysisSignalSeverity::Low,
token_mint.to_string(),
Some(observation_id),
None,
@@ -257,7 +257,7 @@ impl KbTokenBackfillService {
limit: usize,
) -> Result<
std::vec::Vec<solana_rpc_client_api::response::RpcConfirmedTransactionStatusWithSignature>,
crate::KbError,
crate::Error,
> {
let config = solana_rpc_client_api::config::RpcSignaturesForAddressConfig {
before: None,
@@ -275,8 +275,9 @@ impl KbTokenBackfillService {
async fn collect_pool_addresses_for_token_mint(
&self,
token_mint: &str,
) -> Result<std::vec::Vec<std::string::String>, crate::KbError> {
let token_result = crate::get_token_by_mint(self.database.as_ref(), token_mint).await;
) -> Result<std::vec::Vec<std::string::String>, crate::Error> {
let token_result =
crate::query_tokens_get_by_mint(self.database.as_ref(), token_mint).await;
let token_option = match token_result {
Ok(token_option) => token_option,
Err(error) => return Err(error),
@@ -288,13 +289,13 @@ impl KbTokenBackfillService {
let token_id = match token.id {
Some(token_id) => token_id,
None => {
return Err(crate::KbError::InvalidState(format!(
return Err(crate::Error::InvalidState(format!(
"token '{}' has no internal id",
token.mint
)));
},
};
let pools_result = crate::list_pools(self.database.as_ref()).await;
let pools_result = crate::query_pools_list(self.database.as_ref()).await;
let pools = match pools_result {
Ok(pools) => pools,
Err(error) => return Err(error),
@@ -307,7 +308,7 @@ impl KbTokenBackfillService {
None => continue,
};
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await;
crate::query_pool_tokens_list_by_pool_id(self.database.as_ref(), pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => return Err(error),
@@ -331,7 +332,7 @@ impl KbTokenBackfillService {
async fn replay_signature(
&self,
signature: std::string::String,
) -> Result<KbTokenBackfillSignatureResult, crate::KbError> {
) -> Result<TokenBackfillSignatureResult, crate::Error> {
let config = Some(serde_json::json!({
"encoding": "jsonParsed",
"maxSupportedTransactionVersion": 0
@@ -345,7 +346,7 @@ impl KbTokenBackfillService {
Err(error) => return Err(error),
};
if transaction_value.is_null() {
return Ok(KbTokenBackfillSignatureResult {
return Ok(TokenBackfillSignatureResult {
resolved_transaction_count: 0,
missing_transaction_count: 1,
decoded_event_count: 0,
@@ -357,9 +358,11 @@ impl KbTokenBackfillService {
pair_candle_count: 0,
});
}
let existing_transaction_result =
crate::get_chain_transaction_by_signature(self.database.as_ref(), signature.as_str())
.await;
let existing_transaction_result = crate::query_chain_transactions_get_by_signature(
self.database.as_ref(),
signature.as_str(),
)
.await;
let existing_transaction_option = match existing_transaction_result {
Ok(existing_transaction_option) => existing_transaction_option,
Err(error) => return Err(error),
@@ -433,7 +436,7 @@ impl KbTokenBackfillService {
Ok(pair_candle_aggregations) => pair_candle_aggregations,
Err(error) => return Err(error),
};
return Ok(KbTokenBackfillSignatureResult {
return Ok(TokenBackfillSignatureResult {
resolved_transaction_count: 1,
missing_transaction_count: 0,
decoded_event_count: decoded.len(),
@@ -451,9 +454,9 @@ impl KbTokenBackfillService {
&self,
pool_address: &str,
pool_signature_limit: usize,
) -> Result<crate::KbPoolBackfillResult, crate::KbError> {
) -> Result<crate::PoolBackfillResult, crate::Error> {
let effective_limit = if pool_signature_limit > 1000 { 1000 } else { pool_signature_limit };
let mut result = crate::KbPoolBackfillResult {
let mut result = crate::PoolBackfillResult {
pool_address: pool_address.to_string(),
pool_signature_count: 0,
unique_signature_count: 0,
@@ -471,12 +474,15 @@ impl KbTokenBackfillService {
let mut addresses_to_scan = std::vec::Vec::<std::string::String>::new();
let trimmed_pool_address = pool_address.trim().to_string();
if trimmed_pool_address.is_empty() {
return Err(crate::KbError::Config("pool_address must not be empty".to_string()));
return Err(crate::Error::Config("pool_address must not be empty".to_string()));
}
seen_addresses.insert(trimmed_pool_address.clone());
addresses_to_scan.push(trimmed_pool_address.clone());
let pool_result =
crate::get_pool_by_address(self.database.as_ref(), trimmed_pool_address.as_str()).await;
let pool_result = crate::query_pools_get_by_address(
self.database.as_ref(),
trimmed_pool_address.as_str(),
)
.await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => return Err(error),
@@ -485,14 +491,14 @@ impl KbTokenBackfillService {
let pool_id = match pool.id {
Some(pool_id) => pool_id,
None => {
return Err(crate::KbError::InvalidState(format!(
return Err(crate::Error::InvalidState(format!(
"pool '{}' has no internal id",
pool.address
)));
},
};
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await;
crate::query_pool_tokens_list_by_pool_id(self.database.as_ref(), pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => return Err(error),
@@ -567,9 +573,9 @@ impl KbTokenBackfillService {
});
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
.record_observation(&crate::DetectionObservationInput::new(
"pool.backfill.completed".to_string(),
crate::KbObservationSourceKind::HttpRpc,
crate::ObservationSourceKind::HttpRpc,
Some(format!("backfill:{}", self.http_role)),
pool_address.to_string(),
None,
@@ -582,9 +588,9 @@ impl KbTokenBackfillService {
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
.record_signal(&crate::DetectionSignalInput::new(
"signal.pool.backfill.completed".to_string(),
crate::KbAnalysisSignalSeverity::Low,
crate::AnalysisSignalSeverity::Low,
pool_address.to_string(),
Some(observation_id),
None,
@@ -625,7 +631,7 @@ impl KbTokenBackfillService {
}
#[derive(Debug, Clone, Default)]
struct KbTokenBackfillSignatureResult {
struct TokenBackfillSignatureResult {
resolved_transaction_count: usize,
missing_transaction_count: usize,
decoded_event_count: usize,
@@ -637,9 +643,9 @@ struct KbTokenBackfillSignatureResult {
pair_candle_count: usize,
}
fn kb_merge_token_backfill_signature_result(
aggregate: &mut crate::KbTokenBackfillResult,
value: KbTokenBackfillSignatureResult,
fn merge_token_backfill_signature_result(
aggregate: &mut crate::TokenBackfillResult,
value: TokenBackfillSignatureResult,
) {
aggregate.resolved_transaction_count += value.resolved_transaction_count;
aggregate.missing_transaction_count += value.missing_transaction_count;
@@ -795,14 +801,14 @@ mod tests {
"message": {
"instructions": [
{
"programId": crate::KB_FLUXBEAM_PROGRAM_ID,
"programId": crate::FLUXBEAM_PROGRAM_ID,
"program": "fluxbeam",
"stackHeight": 1,
"accounts": [
"BackfillPool111",
"BackfillLpMint111",
"BackfillToken111",
"So11111111111111111111111111111111111111112",
crate::WSOL_MINT_ID,
"BackfillCreator111"
],
"parsed": {
@@ -811,7 +817,7 @@ mod tests {
"pool": "BackfillPool111",
"lpMint": "BackfillLpMint111",
"tokenA": "BackfillToken111",
"tokenB": "So11111111111111111111111111111111111111112",
"tokenB": crate::WSOL_MINT_ID,
"payer": "BackfillCreator111"
}
},
@@ -840,21 +846,21 @@ mod tests {
"message": {
"instructions": [
{
"programId": crate::KB_FLUXBEAM_PROGRAM_ID,
"programId": crate::FLUXBEAM_PROGRAM_ID,
"program": "fluxbeam",
"stackHeight": 1,
"accounts": [
"BackfillPool111",
"BackfillLpMint111",
"BackfillToken111",
"So11111111111111111111111111111111111111112"
crate::WSOL_MINT_ID
],
"parsed": {
"info": {
"instruction": "swap",
"pool": "BackfillPool111",
"tokenA": "BackfillToken111",
"tokenB": "So11111111111111111111111111111111111111112",
"tokenB": crate::WSOL_MINT_ID,
"baseAmountRaw": "1000",
"quoteAmountRaw": "2500"
}
@@ -922,17 +928,17 @@ mod tests {
}
}
async fn make_database() -> std::sync::Arc<crate::KbDatabase> {
async fn make_database() -> std::sync::Arc<crate::Database> {
let tempdir_result = tempfile::tempdir();
let tempdir = match tempdir_result {
Ok(tempdir) => tempdir,
Err(error) => panic!("tempdir must succeed: {}", error),
};
let database_path = tempdir.path().join("token_backfill.sqlite3");
let config = crate::KbDatabaseConfig {
let config = crate::DatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
backend: crate::DatabaseBackend::Sqlite,
sqlite: crate::SqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
@@ -941,7 +947,7 @@ mod tests {
use_wal: true,
},
};
let database_result = crate::KbDatabase::connect_and_initialize(&config).await;
let database_result = crate::Database::connect_and_initialize(&config).await;
let database = match database_result {
Ok(database) => database,
Err(error) => panic!("database init must succeed: {}", error),
@@ -949,8 +955,8 @@ mod tests {
return std::sync::Arc::new(database);
}
fn make_http_endpoint_config(url: std::string::String) -> crate::KbHttpEndpointConfig {
return crate::KbHttpEndpointConfig {
fn make_http_endpoint_config(url: std::string::String) -> crate::HttpEndpointConfig {
return crate::HttpEndpointConfig {
name: "backfill_http".to_string(),
enabled: true,
provider: "test".to_string(),
@@ -983,7 +989,7 @@ mod tests {
Ok(http_pool) => std::sync::Arc::new(http_pool),
Err(error) => panic!("http pool creation must succeed: {}", error),
};
let service = crate::KbTokenBackfillService::new(
let service = crate::TokenBackfillService::new(
http_pool,
database.clone(),
"history_backfill".to_string(),
@@ -1001,7 +1007,8 @@ mod tests {
assert_eq!(backfill.missing_transaction_count, 0);
assert_eq!(backfill.trade_event_count, 1);
assert!(backfill.pair_candle_count > 0);
let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await;
let token_result =
crate::query_tokens_get_by_mint(database.as_ref(), "BackfillToken111").await;
let token_option = match token_result {
Ok(token_option) => token_option,
Err(error) => panic!("token fetch must succeed: {}", error),
@@ -1011,7 +1018,8 @@ mod tests {
None => panic!("token must exist"),
};
assert!(token.id.is_some());
let pool_result = crate::get_pool_by_address(database.as_ref(), "BackfillPool111").await;
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "BackfillPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
@@ -1024,7 +1032,7 @@ mod tests {
Some(pool_id) => pool_id,
None => panic!("pool must have an id"),
};
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await;
let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
@@ -1038,7 +1046,7 @@ mod tests {
None => panic!("pair must have an id"),
};
let trade_events_result =
crate::list_trade_events_by_pair_id(database.as_ref(), pair_id).await;
crate::query_trade_events_list_by_pair_id(database.as_ref(), pair_id).await;
let trade_events = match trade_events_result {
Ok(trade_events) => trade_events,
Err(error) => panic!("trade event list must succeed: {}", error),
@@ -1046,7 +1054,7 @@ mod tests {
assert_eq!(trade_events.len(), 1);
assert_eq!(trade_events[0].price_quote_per_base, Some(2.5));
let pair_metric_result =
crate::get_pair_metric_by_pair_id(database.as_ref(), pair_id).await;
crate::query_pair_metrics_get_by_pair_id(database.as_ref(), pair_id).await;
let pair_metric_option = match pair_metric_result {
Ok(pair_metric_option) => pair_metric_option,
Err(error) => panic!("pair metric fetch must succeed: {}", error),
@@ -1058,7 +1066,8 @@ mod tests {
assert_eq!(pair_metric.trade_count, 1);
assert_eq!(pair_metric.buy_count, 1);
let candles_result =
crate::list_pair_candles_by_pair_and_timeframe(database.as_ref(), pair_id, 60).await;
crate::query_pair_candles_list_by_pair_and_timeframe(database.as_ref(), pair_id, 60)
.await;
let candles = match candles_result {
Ok(candles) => candles,
Err(error) => panic!("pair candle list must succeed: {}", error),
@@ -1081,7 +1090,7 @@ mod tests {
Ok(http_pool) => std::sync::Arc::new(http_pool),
Err(error) => panic!("http pool creation must succeed: {}", error),
};
let service = crate::KbTokenBackfillService::new(
let service = crate::TokenBackfillService::new(
http_pool,
database.clone(),
"history_backfill".to_string(),
@@ -1095,7 +1104,8 @@ mod tests {
if let Err(error) = second_result {
panic!("second backfill must succeed: {}", error);
}
let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await;
let token_result =
crate::query_tokens_get_by_mint(database.as_ref(), "BackfillToken111").await;
let token_option = match token_result {
Ok(token_option) => token_option,
Err(error) => panic!("token fetch must succeed: {}", error),
@@ -1105,7 +1115,7 @@ mod tests {
None => panic!("token must exist"),
};
let token_id = token.id.unwrap_or_default();
let pools_result = crate::list_pools(database.as_ref()).await;
let pools_result = crate::query_pools_list(database.as_ref()).await;
let pools = match pools_result {
Ok(pools) => pools,
Err(error) => panic!("pool list must succeed: {}", error),
@@ -1113,7 +1123,7 @@ mod tests {
assert_eq!(pools.len(), 1);
let pool_id = pools[0].id.unwrap_or_default();
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(database.as_ref(), pool_id).await;
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool token list must succeed: {}", error),
@@ -1125,7 +1135,7 @@ mod tests {
}
}
assert!(found_token);
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await;
let pair_result = crate::query_pairs_get_by_pool_id(database.as_ref(), pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
@@ -1136,13 +1146,13 @@ mod tests {
};
let pair_id = pair.id.unwrap_or_default();
let trade_events_result =
crate::list_trade_events_by_pair_id(database.as_ref(), pair_id).await;
crate::query_trade_events_list_by_pair_id(database.as_ref(), pair_id).await;
let trade_events = match trade_events_result {
Ok(trade_events) => trade_events,
Err(error) => panic!("trade event list must succeed: {}", error),
};
assert_eq!(trade_events.len(), 1);
let pair_metrics_result = crate::list_pair_metrics(database.as_ref()).await;
let pair_metrics_result = crate::query_pair_metrics_list(database.as_ref()).await;
let pair_metrics = match pair_metrics_result {
Ok(pair_metrics) => pair_metrics,
Err(error) => panic!("pair metric list must succeed: {}", error),