0.7.24-pre.0

This commit is contained in:
2026-05-02 11:27:10 +02:00
parent 60db521a88
commit aaff2dbd94
38 changed files with 3074 additions and 207 deletions

View File

@@ -33,6 +33,33 @@ pub struct KbTokenBackfillResult {
pub trade_event_count: usize,
}
/// One pool-backfill result summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbPoolBackfillResult {
/// Input pool address.
pub pool_address: std::string::String,
/// Number of signatures returned directly for the pool address.
pub pool_signature_count: usize,
/// Number of unique signatures processed during this run.
pub unique_signature_count: usize,
/// Number of transactions resolved through HTTP during this run.
pub resolved_transaction_count: usize,
/// Number of signatures whose `getTransaction` lookup returned `null`.
pub missing_transaction_count: usize,
/// Total number of decoded DEX events replayed during this run.
pub decoded_event_count: usize,
/// Total number of DEX detection results produced during this run.
pub detection_count: usize,
/// Total number of launch-attribution results produced during this run.
pub launch_attribution_count: usize,
/// Total number of pool-origin results produced during this run.
pub pool_origin_count: usize,
/// Total number of wallet-participation observations produced during this run.
pub wallet_participation_count: usize,
/// Total number of trade-aggregation results produced during this run.
pub trade_event_count: usize,
}
/// Historical token backfill service.
///
/// This service reuses the existing transaction projection and downstream
@@ -310,11 +337,9 @@ impl KbTokenBackfillService {
trade_event_count: 0,
});
}
let existing_transaction_result = crate::get_chain_transaction_by_signature(
self.database.as_ref(),
signature.as_str(),
)
.await;
let existing_transaction_result =
crate::get_chain_transaction_by_signature(self.database.as_ref(), signature.as_str())
.await;
let existing_transaction_option = match existing_transaction_result {
Ok(existing_transaction_option) => existing_transaction_option,
Err(error) => return Err(error),
@@ -391,6 +416,160 @@ impl KbTokenBackfillService {
trade_event_count: trade_aggregations.len(),
})
}
/// Replays the historical activity of one pool address through the existing pipeline.
pub async fn backfill_pool_by_address(
&self,
pool_address: &str,
pool_signature_limit: usize,
) -> Result<crate::KbPoolBackfillResult, crate::KbError> {
let effective_limit = if pool_signature_limit > 1000 {
1000
} else {
pool_signature_limit
};
let mut result = crate::KbPoolBackfillResult {
pool_address: pool_address.to_string(),
pool_signature_count: 0,
unique_signature_count: 0,
resolved_transaction_count: 0,
missing_transaction_count: 0,
decoded_event_count: 0,
detection_count: 0,
launch_attribution_count: 0,
pool_origin_count: 0,
wallet_participation_count: 0,
trade_event_count: 0,
};
let mut seen_addresses = std::collections::BTreeSet::<std::string::String>::new();
let mut addresses_to_scan = std::vec::Vec::<std::string::String>::new();
let trimmed_pool_address = pool_address.trim().to_string();
if trimmed_pool_address.is_empty() {
return Err(crate::KbError::Config(
"pool_address must not be empty".to_string(),
));
}
seen_addresses.insert(trimmed_pool_address.clone());
addresses_to_scan.push(trimmed_pool_address.clone());
let pool_result =
crate::get_pool_by_address(self.database.as_ref(), trimmed_pool_address.as_str()).await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => return Err(error),
};
if let Some(pool) = pool_option {
let pool_id = match pool.id {
Some(pool_id) => pool_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pool '{}' has no internal id",
pool.address
)));
}
};
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => return Err(error),
};
for pool_token in pool_tokens {
let vault_address_option = pool_token.vault_address.clone();
let vault_address = match vault_address_option {
Some(vault_address) => vault_address.trim().to_string(),
None => continue,
};
if vault_address.is_empty() {
continue;
}
if seen_addresses.contains(vault_address.as_str()) {
continue;
}
seen_addresses.insert(vault_address.clone());
addresses_to_scan.push(vault_address);
}
}
let mut seen_signatures = std::collections::HashSet::<std::string::String>::new();
for address in &addresses_to_scan {
let signatures_result = self
.fetch_signatures_for_address(address.clone(), effective_limit)
.await;
let mut signatures = match signatures_result {
Ok(signatures) => signatures,
Err(error) => return Err(error),
};
if address == &trimmed_pool_address {
result.pool_signature_count = signatures.len();
}
signatures.reverse();
for signature_status in signatures {
let signature = signature_status.signature.clone();
if seen_signatures.contains(signature.as_str()) {
continue;
}
seen_signatures.insert(signature.clone());
result.unique_signature_count += 1;
let replay_result = self.replay_signature(signature).await;
let replay_result = match replay_result {
Ok(replay_result) => replay_result,
Err(error) => return Err(error),
};
result.resolved_transaction_count += replay_result.resolved_transaction_count;
result.missing_transaction_count += replay_result.missing_transaction_count;
result.decoded_event_count += replay_result.decoded_event_count;
result.detection_count += replay_result.detection_count;
result.launch_attribution_count += replay_result.launch_attribution_count;
result.pool_origin_count += replay_result.pool_origin_count;
result.wallet_participation_count += replay_result.wallet_participation_count;
result.trade_event_count += replay_result.trade_event_count;
}
}
let summary_payload = serde_json::json!({
"poolAddress": result.pool_address,
"poolSignatureCount": result.pool_signature_count,
"uniqueSignatureCount": result.unique_signature_count,
"resolvedTransactionCount": result.resolved_transaction_count,
"missingTransactionCount": result.missing_transaction_count,
"decodedEventCount": result.decoded_event_count,
"detectionCount": result.detection_count,
"launchAttributionCount": result.launch_attribution_count,
"poolOriginCount": result.pool_origin_count,
"walletParticipationCount": result.wallet_participation_count,
"tradeEventCount": result.trade_event_count,
"scannedAddressCount": addresses_to_scan.len(),
"effectiveSignatureLimit": effective_limit
});
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
"pool.backfill.completed".to_string(),
crate::KbObservationSourceKind::HttpRpc,
Some(format!("backfill:{}", self.http_role)),
pool_address.to_string(),
None,
summary_payload.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
"signal.pool.backfill.completed".to_string(),
crate::KbAnalysisSignalSeverity::Low,
pool_address.to_string(),
Some(observation_id),
None,
summary_payload,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
Ok(result)
}
}
#[derive(Debug, Clone, Default)]