This commit is contained in:
2026-04-30 00:04:43 +02:00
parent 0f228b2ae5
commit 2f4d80d5ef
6 changed files with 982 additions and 11 deletions

View File

@@ -48,3 +48,4 @@
0.7.15 - Ajout dune couche wallets observés et participations observées, avec extraction des rôles depuis les payloads décodés et rattachement transaction / decoded event / pool / pair 0.7.15 - Ajout dune couche wallets observés et participations observées, avec extraction des rôles depuis les payloads décodés et rattachement transaction / decoded event / pool / pair
0.7.16 - Ajout dune première couche trade events et pair metrics avec normalisation des swaps, agrégation par paire et branchement automatique dans le pipeline de résolution transactionnelle 0.7.16 - Ajout dune première couche trade events et pair metrics avec normalisation des swaps, agrégation par paire et branchement automatique dans le pipeline de résolution transactionnelle
0.7.17 - Ajout dune première couche WS hybride avec collecte de cibles `programSubscribe` / `accountSubscribe` et persistance technique dédupliquée des notifications `logs / program / account` 0.7.17 - Ajout dune première couche WS hybride avec collecte de cibles `programSubscribe` / `accountSubscribe` et persistance technique dédupliquée des notifications `logs / program / account`
0.7.18 - Ajout dun premier backfill historique ciblé par token mint, basé sur `getSignaturesForAddress` + `getTransaction`, avec réutilisation du pipeline interne pour reconstruire transactions, pools, swaps, origins, wallets et métriques

View File

@@ -8,7 +8,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "0.7.17" version = "0.7.18"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -625,17 +625,14 @@ Réalisé :
- ajout dune façade runtime pour exposer ce comportement au futur branchement `ws_manager`. - ajout dune façade runtime pour exposer ce comportement au futur branchement `ws_manager`.
### 6.050. Version `0.7.18` — Backfill historique ciblé par token ### 6.050. Version `0.7.18` — Backfill historique ciblé par token
Objectif : permettre la reconstruction historique ciblée dun token encore actif non observé en live. Réalisé :
À faire : - ajout dun premier service de backfill ciblé par `token_mint`,
- récupération des signatures historiques via `getSignaturesForAddress`,
- ajouter une entrée `token_mint -> backfill`, - résolution des transactions pertinentes via `getTransaction`,
- retrouver les signatures historiques via `getSignaturesForAddress`, - relecture du pipeline interne pour reconstruire transactions, décodage DEX, détection métier, origins, wallets et trade metrics,
- résoudre les transactions pertinentes via `getTransaction`, - ajout dune seconde passe sur les pools découverts pour le token afin de récupérer des signatures supplémentaires liées à lactivité du pool,
- retrouver les pools et paires liés au token, - conservation dun périmètre ciblé sur des tokens encore actifs au lieu dun scan exhaustif de la blockchain.
- reconstruire les swaps observables et les métriques dérivées,
- cibler prioritairement des tokens encore actifs comme `USDC`, `USDT`, `RAY`, `JUP`,
- éviter un scan exhaustif de toute la blockchain.
### 6.051. Version `0.7.19` — Holdings observés ### 6.051. Version `0.7.19` — Holdings observés
Objectif : compléter la couche acteurs observés avec une première vision des balances et comptes token observés, sans viser encore un portefeuille temps réel exhaustif. Objectif : compléter la couche acteurs observés avec une première vision des balances et comptes token observés, sans viser encore un portefeuille temps réel exhaustif.

View File

@@ -278,6 +278,60 @@ impl HttpEndpointPool {
}; };
client.get_transaction_raw(signature, config).await client.get_transaction_raw(signature, config).await
} }
/// Executes `getProgramAccounts` through the pool and returns the raw result value.
pub async fn get_program_accounts_raw_for_role(
&self,
required_role: &str,
program_id: std::string::String,
config: std::option::Option<serde_json::Value>,
) -> Result<serde_json::Value, crate::KbError> {
let client_result = self
.select_client_for_role_and_method(required_role, "getProgramAccounts")
.await;
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
client.get_program_accounts_raw(program_id, config).await
}
/// Executes `getSignaturesForAddress` through the pool and returns the raw result value.
pub async fn get_signatures_for_address_raw_for_role(
&self,
required_role: &str,
address: std::string::String,
config: std::option::Option<serde_json::Value>,
) -> Result<serde_json::Value, crate::KbError> {
let client_result = self
.select_client_for_role_and_method(required_role, "getSignaturesForAddress")
.await;
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
client.get_signatures_for_address_raw(address, config).await
}
/// Executes typed `getSignaturesForAddress` through the pool.
pub async fn get_signatures_for_address_for_role(
&self,
required_role: &str,
address: std::string::String,
config: std::option::Option<solana_rpc_client_api::config::RpcSignaturesForAddressConfig>,
) -> Result<
std::vec::Vec<solana_rpc_client_api::response::RpcConfirmedTransactionStatusWithSignature>,
crate::KbError,
> {
let client_result = self
.select_client_for_role_and_method(required_role, "getSignaturesForAddress")
.await;
let client = match client_result {
Ok(client) => client,
Err(error) => return Err(error),
};
client.get_signatures_for_address(address, config).await
}
} }
fn kb_pool_build_optional_config_only_params( fn kb_pool_build_optional_config_only_params(

View File

@@ -22,6 +22,7 @@ mod json_rpc_ws;
mod launch_origin; mod launch_origin;
mod pool_origin; mod pool_origin;
mod solana_pubsub_ws; mod solana_pubsub_ws;
mod token_backfill;
mod tracing; mod tracing;
mod trade_aggregation; mod trade_aggregation;
mod tx_model; mod tx_model;
@@ -280,6 +281,8 @@ pub use pool_origin::KbPoolOriginService;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification; pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
pub use token_backfill::KbTokenBackfillResult;
pub use token_backfill::KbTokenBackfillService;
pub use tracing::KbTracingGuard; pub use tracing::KbTracingGuard;
pub use tracing::init_tracing; pub use tracing::init_tracing;
pub use trade_aggregation::KbTradeAggregationResult; pub use trade_aggregation::KbTradeAggregationResult;

View File

@@ -0,0 +1,916 @@
// file: kb_lib/src/token_backfill.rs
//! Historical token backfill service.
/// One token-backfill result summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbTokenBackfillResult {
/// Input token mint.
pub token_mint: std::string::String,
/// Number of signatures returned directly for the mint.
pub mint_signature_count: usize,
/// Number of pool addresses discovered for the token after the first replay pass.
pub pool_address_count: usize,
/// Number of signatures returned from those pool addresses.
pub pool_signature_count: usize,
/// Number of unique signatures processed during this run.
pub unique_signature_count: usize,
/// Number of transactions resolved through HTTP during this run.
pub resolved_transaction_count: usize,
/// Number of signatures whose `getTransaction` lookup returned `null`.
pub missing_transaction_count: usize,
/// Total number of decoded DEX events replayed during this run.
pub decoded_event_count: usize,
/// Total number of DEX detection results produced during this run.
pub detection_count: usize,
/// Total number of launch-attribution results produced during this run.
pub launch_attribution_count: usize,
/// Total number of pool-origin results produced during this run.
pub pool_origin_count: usize,
/// Total number of wallet-participation observations produced during this run.
pub wallet_participation_count: usize,
/// Total number of trade-aggregation results produced during this run.
pub trade_event_count: usize,
}
/// Historical token backfill service.
///
/// This service reuses the existing transaction projection and downstream
/// DEX pipeline instead of introducing a separate historical code path.
#[derive(Debug, Clone)]
pub struct KbTokenBackfillService {
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
database: std::sync::Arc<crate::KbDatabase>,
persistence: crate::KbDetectionPersistenceService,
http_role: std::string::String,
transaction_model: crate::KbTransactionModelService,
dex_decode_service: crate::KbDexDecodeService,
dex_detect_service: crate::KbDexDetectService,
launch_origin_service: crate::KbLaunchOriginService,
pool_origin_service: crate::KbPoolOriginService,
wallet_observation_service: crate::KbWalletObservationService,
trade_aggregation_service: crate::KbTradeAggregationService,
}
impl KbTokenBackfillService {
/// Creates a new token-backfill service.
pub fn new(
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
database: std::sync::Arc<crate::KbDatabase>,
http_role: std::string::String,
) -> Self {
let persistence = crate::KbDetectionPersistenceService::new(database.clone());
let transaction_model = crate::KbTransactionModelService::new(database.clone());
let dex_decode_service = crate::KbDexDecodeService::new(database.clone());
let dex_detect_service = crate::KbDexDetectService::new(database.clone());
let launch_origin_service = crate::KbLaunchOriginService::new(database.clone());
let pool_origin_service = crate::KbPoolOriginService::new(database.clone());
let wallet_observation_service = crate::KbWalletObservationService::new(database.clone());
let trade_aggregation_service = crate::KbTradeAggregationService::new(database.clone());
Self {
http_pool,
database,
persistence,
http_role,
transaction_model,
dex_decode_service,
dex_detect_service,
launch_origin_service,
pool_origin_service,
wallet_observation_service,
trade_aggregation_service,
}
}
/// Replays the historical activity of one token mint through the existing pipeline.
pub async fn backfill_token_by_mint(
&self,
token_mint: &str,
mint_signature_limit: usize,
pool_signature_limit: usize,
) -> Result<crate::KbTokenBackfillResult, crate::KbError> {
let mut result = crate::KbTokenBackfillResult {
token_mint: token_mint.to_string(),
mint_signature_count: 0,
pool_address_count: 0,
pool_signature_count: 0,
unique_signature_count: 0,
resolved_transaction_count: 0,
missing_transaction_count: 0,
decoded_event_count: 0,
detection_count: 0,
launch_attribution_count: 0,
pool_origin_count: 0,
wallet_participation_count: 0,
trade_event_count: 0,
};
let mut seen_signatures = std::collections::HashSet::<std::string::String>::new();
let mint_signatures_result = self
.fetch_signatures_for_address(token_mint.to_string(), mint_signature_limit)
.await;
let mut mint_signatures = match mint_signatures_result {
Ok(mint_signatures) => mint_signatures,
Err(error) => return Err(error),
};
result.mint_signature_count = mint_signatures.len();
mint_signatures.reverse();
for signature_status in mint_signatures {
let signature = signature_status.signature.clone();
if seen_signatures.contains(signature.as_str()) {
continue;
}
seen_signatures.insert(signature.clone());
result.unique_signature_count += 1;
let replay_result = self.replay_signature(signature).await;
let replay_result = match replay_result {
Ok(replay_result) => replay_result,
Err(error) => return Err(error),
};
kb_merge_token_backfill_signature_result(&mut result, replay_result);
}
let pool_addresses_result = self.collect_pool_addresses_for_token_mint(token_mint).await;
let pool_addresses = match pool_addresses_result {
Ok(pool_addresses) => pool_addresses,
Err(error) => return Err(error),
};
result.pool_address_count = pool_addresses.len();
for pool_address in pool_addresses {
let pool_signatures_result = self
.fetch_signatures_for_address(pool_address.clone(), pool_signature_limit)
.await;
let mut pool_signatures = match pool_signatures_result {
Ok(pool_signatures) => pool_signatures,
Err(error) => return Err(error),
};
result.pool_signature_count += pool_signatures.len();
pool_signatures.reverse();
for signature_status in pool_signatures {
let signature = signature_status.signature.clone();
if seen_signatures.contains(signature.as_str()) {
continue;
}
seen_signatures.insert(signature.clone());
result.unique_signature_count += 1;
let replay_result = self.replay_signature(signature).await;
let replay_result = match replay_result {
Ok(replay_result) => replay_result,
Err(error) => return Err(error),
};
kb_merge_token_backfill_signature_result(&mut result, replay_result);
}
}
let summary_payload = serde_json::json!({
"tokenMint": result.token_mint,
"mintSignatureCount": result.mint_signature_count,
"poolAddressCount": result.pool_address_count,
"poolSignatureCount": result.pool_signature_count,
"uniqueSignatureCount": result.unique_signature_count,
"resolvedTransactionCount": result.resolved_transaction_count,
"missingTransactionCount": result.missing_transaction_count,
"decodedEventCount": result.decoded_event_count,
"detectionCount": result.detection_count,
"launchAttributionCount": result.launch_attribution_count,
"poolOriginCount": result.pool_origin_count,
"walletParticipationCount": result.wallet_participation_count,
"tradeEventCount": result.trade_event_count
});
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
"token.backfill.completed".to_string(),
crate::KbObservationSourceKind::HttpRpc,
Some(format!("backfill:{}", self.http_role)),
token_mint.to_string(),
None,
summary_payload.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
"signal.token.backfill.completed".to_string(),
crate::KbAnalysisSignalSeverity::Low,
token_mint.to_string(),
Some(observation_id),
None,
summary_payload,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
Ok(result)
}
async fn fetch_signatures_for_address(
&self,
address: std::string::String,
limit: usize,
) -> Result<
std::vec::Vec<solana_rpc_client_api::response::RpcConfirmedTransactionStatusWithSignature>,
crate::KbError,
> {
let config = solana_rpc_client_api::config::RpcSignaturesForAddressConfig {
before: None,
until: None,
limit: Some(limit),
commitment: None,
min_context_slot: None,
};
self.http_pool
.get_signatures_for_address_for_role(self.http_role.as_str(), address, Some(config))
.await
}
async fn collect_pool_addresses_for_token_mint(
&self,
token_mint: &str,
) -> Result<std::vec::Vec<std::string::String>, crate::KbError> {
let token_result = crate::get_token_by_mint(self.database.as_ref(), token_mint).await;
let token_option = match token_result {
Ok(token_option) => token_option,
Err(error) => return Err(error),
};
let token = match token_option {
Some(token) => token,
None => return Ok(std::vec::Vec::new()),
};
let token_id = match token.id {
Some(token_id) => token_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"token '{}' has no internal id",
token.mint
)));
}
};
let pools_result = crate::list_pools(self.database.as_ref()).await;
let pools = match pools_result {
Ok(pools) => pools,
Err(error) => return Err(error),
};
let mut addresses = std::vec::Vec::new();
let mut seen = std::collections::HashSet::<std::string::String>::new();
for pool in pools {
let pool_id = match pool.id {
Some(pool_id) => pool_id,
None => continue,
};
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(self.database.as_ref(), pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => return Err(error),
};
let mut contains_token = false;
for pool_token in pool_tokens {
if pool_token.token_id == token_id {
contains_token = true;
break;
}
}
if contains_token && !seen.contains(pool.address.as_str()) {
seen.insert(pool.address.clone());
addresses.push(pool.address.clone());
}
}
addresses.sort();
Ok(addresses)
}
async fn replay_signature(
&self,
signature: std::string::String,
) -> Result<KbTokenBackfillSignatureResult, crate::KbError> {
let config = Some(serde_json::json!({
"encoding": "jsonParsed",
"maxSupportedTransactionVersion": 0
}));
let transaction_value_result = self
.http_pool
.get_transaction_raw_for_role(self.http_role.as_str(), signature.clone(), config)
.await;
let transaction_value = match transaction_value_result {
Ok(transaction_value) => transaction_value,
Err(error) => return Err(error),
};
if transaction_value.is_null() {
return Ok(KbTokenBackfillSignatureResult {
resolved_transaction_count: 0,
missing_transaction_count: 1,
decoded_event_count: 0,
detection_count: 0,
launch_attribution_count: 0,
pool_origin_count: 0,
wallet_participation_count: 0,
trade_event_count: 0,
});
}
let existing_transaction_result = crate::get_chain_transaction_by_signature(
self.database.as_ref(),
signature.as_str(),
)
.await;
let existing_transaction_option = match existing_transaction_result {
Ok(existing_transaction_option) => existing_transaction_option,
Err(error) => return Err(error),
};
if existing_transaction_option.is_none() {
let persist_result = self
.transaction_model
.persist_resolved_transaction(
signature.as_str(),
Some(format!("backfill:{}", self.http_role)),
&transaction_value,
)
.await;
if let Err(error) = persist_result {
return Err(error);
}
}
let decoded_result = self
.dex_decode_service
.decode_transaction_by_signature(signature.as_str())
.await;
let decoded = match decoded_result {
Ok(decoded) => decoded,
Err(error) => return Err(error),
};
let detections_result = self
.dex_detect_service
.detect_transaction_by_signature(signature.as_str())
.await;
let detections = match detections_result {
Ok(detections) => detections,
Err(error) => return Err(error),
};
let launch_attributions_result = self
.launch_origin_service
.attribute_transaction_by_signature(signature.as_str())
.await;
let launch_attributions = match launch_attributions_result {
Ok(launch_attributions) => launch_attributions,
Err(error) => return Err(error),
};
let pool_origins_result = self
.pool_origin_service
.record_transaction_by_signature(signature.as_str())
.await;
let pool_origins = match pool_origins_result {
Ok(pool_origins) => pool_origins,
Err(error) => return Err(error),
};
let wallet_observations_result = self
.wallet_observation_service
.record_transaction_by_signature(signature.as_str())
.await;
let wallet_observations = match wallet_observations_result {
Ok(wallet_observations) => wallet_observations,
Err(error) => return Err(error),
};
let trade_aggregations_result = self
.trade_aggregation_service
.record_transaction_by_signature(signature.as_str())
.await;
let trade_aggregations = match trade_aggregations_result {
Ok(trade_aggregations) => trade_aggregations,
Err(error) => return Err(error),
};
Ok(KbTokenBackfillSignatureResult {
resolved_transaction_count: 1,
missing_transaction_count: 0,
decoded_event_count: decoded.len(),
detection_count: detections.len(),
launch_attribution_count: launch_attributions.len(),
pool_origin_count: pool_origins.len(),
wallet_participation_count: wallet_observations.len(),
trade_event_count: trade_aggregations.len(),
})
}
}
#[derive(Debug, Clone, Default)]
struct KbTokenBackfillSignatureResult {
resolved_transaction_count: usize,
missing_transaction_count: usize,
decoded_event_count: usize,
detection_count: usize,
launch_attribution_count: usize,
pool_origin_count: usize,
wallet_participation_count: usize,
trade_event_count: usize,
}
fn kb_merge_token_backfill_signature_result(
aggregate: &mut crate::KbTokenBackfillResult,
value: KbTokenBackfillSignatureResult,
) {
aggregate.resolved_transaction_count += value.resolved_transaction_count;
aggregate.missing_transaction_count += value.missing_transaction_count;
aggregate.decoded_event_count += value.decoded_event_count;
aggregate.detection_count += value.detection_count;
aggregate.launch_attribution_count += value.launch_attribution_count;
aggregate.pool_origin_count += value.pool_origin_count;
aggregate.wallet_participation_count += value.wallet_participation_count;
aggregate.trade_event_count += value.trade_event_count;
}
#[cfg(test)]
mod tests {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
#[derive(Debug)]
struct TestBackfillHttpServer {
url: std::string::String,
shutdown_tx: std::option::Option<tokio::sync::oneshot::Sender<()>>,
}
impl TestBackfillHttpServer {
async fn spawn() -> Self {
let listener_result = tokio::net::TcpListener::bind("127.0.0.1:0").await;
let listener = match listener_result {
Ok(listener) => listener,
Err(error) => panic!("listener bind must succeed: {error}"),
};
let local_addr_result = listener.local_addr();
let local_addr = match local_addr_result {
Ok(local_addr) => local_addr,
Err(error) => panic!("local addr must exist: {error}"),
};
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
accept_result = listener.accept() => {
let (mut stream, _) = match accept_result {
Ok(pair) => pair,
Err(_) => break,
};
tokio::spawn(async move {
let mut buffer = vec![0u8; 65536];
let read_result = stream.read(&mut buffer).await;
let bytes_read = match read_result {
Ok(bytes_read) => bytes_read,
Err(_) => return,
};
if bytes_read == 0 {
return;
}
let request_text = std::string::String::from_utf8_lossy(&buffer[..bytes_read]).to_string();
let body_split = request_text.split("\r\n\r\n").collect::<std::vec::Vec<_>>();
if body_split.len() < 2 {
return;
}
let body = body_split[1];
let request_value_result = serde_json::from_str::<serde_json::Value>(body);
let request_value = match request_value_result {
Ok(request_value) => request_value,
Err(_) => return,
};
let method = request_value
.get("method")
.and_then(serde_json::Value::as_str)
.unwrap_or_default()
.to_string();
let id_value = match request_value.get("id") {
Some(id_value) => id_value.clone(),
None => serde_json::Value::from(1_u64),
};
let response_body = if method == "getSignaturesForAddress" {
let params = request_value
.get("params")
.and_then(serde_json::Value::as_array)
.cloned()
.unwrap_or_default();
let address = params
.first()
.and_then(serde_json::Value::as_str)
.unwrap_or_default()
.to_string();
if address == "BackfillToken111" {
serde_json::json!({
"jsonrpc": "2.0",
"result": [
{
"signature": "sig-backfill-swap-1",
"slot": 2002_u64,
"err": null,
"memo": null,
"blockTime": 1779500002_i64,
"confirmationStatus": "finalized"
},
{
"signature": "sig-backfill-create-1",
"slot": 2001_u64,
"err": null,
"memo": null,
"blockTime": 1779500001_i64,
"confirmationStatus": "finalized"
}
],
"id": id_value
}).to_string()
} else if address == "BackfillPool111" {
serde_json::json!({
"jsonrpc": "2.0",
"result": [
{
"signature": "sig-backfill-swap-1",
"slot": 2002_u64,
"err": null,
"memo": null,
"blockTime": 1779500002_i64,
"confirmationStatus": "finalized"
}
],
"id": id_value
}).to_string()
} else {
serde_json::json!({
"jsonrpc": "2.0",
"result": [],
"id": id_value
}).to_string()
}
} else if method == "getTransaction" {
let params = request_value
.get("params")
.and_then(serde_json::Value::as_array)
.cloned()
.unwrap_or_default();
let signature = params
.first()
.and_then(serde_json::Value::as_str)
.unwrap_or_default()
.to_string();
if signature == "sig-backfill-create-1" {
serde_json::json!({
"jsonrpc": "2.0",
"result": {
"slot": 2001,
"blockTime": 1779500001,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::KB_FLUXBEAM_PROGRAM_ID,
"program": "fluxbeam",
"stackHeight": 1,
"accounts": [
"BackfillPool111",
"BackfillLpMint111",
"BackfillToken111",
"So11111111111111111111111111111111111111112",
"BackfillCreator111"
],
"parsed": {
"info": {
"instruction": "create_pool",
"pool": "BackfillPool111",
"lpMint": "BackfillLpMint111",
"tokenA": "BackfillToken111",
"tokenB": "So11111111111111111111111111111111111111112",
"payer": "BackfillCreator111"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreatePool"
]
}
},
"id": id_value
}).to_string()
} else if signature == "sig-backfill-swap-1" {
serde_json::json!({
"jsonrpc": "2.0",
"result": {
"slot": 2002,
"blockTime": 1779500002,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::KB_FLUXBEAM_PROGRAM_ID,
"program": "fluxbeam",
"stackHeight": 1,
"accounts": [
"BackfillPool111",
"BackfillLpMint111",
"BackfillToken111",
"So11111111111111111111111111111111111111112"
],
"parsed": {
"info": {
"instruction": "swap",
"pool": "BackfillPool111",
"tokenA": "BackfillToken111",
"tokenB": "So11111111111111111111111111111111111111112",
"baseAmountRaw": "1000",
"quoteAmountRaw": "2500"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: Swap",
"Program log: buy"
]
}
},
"id": id_value
}).to_string()
} else {
serde_json::json!({
"jsonrpc": "2.0",
"result": serde_json::Value::Null,
"id": id_value
}).to_string()
}
} else if method == "getProgramAccounts" {
serde_json::json!({
"jsonrpc": "2.0",
"result": [],
"id": id_value
}).to_string()
} else {
serde_json::json!({
"jsonrpc": "2.0",
"result": serde_json::Value::Null,
"id": id_value
}).to_string()
};
let response_text = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
response_body.len(),
response_body
);
let write_result = stream.write_all(response_text.as_bytes()).await;
if write_result.is_err() {
return;
}
let _ = stream.shutdown().await;
});
}
}
}
});
Self {
url: format!("http://{}", local_addr),
shutdown_tx: Some(shutdown_tx),
}
}
async fn shutdown(mut self) {
let shutdown_tx_option = self.shutdown_tx.take();
if let Some(shutdown_tx) = shutdown_tx_option {
let _ = shutdown_tx.send(());
}
}
}
async fn make_database() -> std::sync::Arc<crate::KbDatabase> {
let tempdir_result = tempfile::tempdir();
let tempdir = match tempdir_result {
Ok(tempdir) => tempdir,
Err(error) => panic!("tempdir must succeed: {}", error),
};
let database_path = tempdir.path().join("token_backfill.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database_result = crate::KbDatabase::connect_and_initialize(&config).await;
let database = match database_result {
Ok(database) => database,
Err(error) => panic!("database init must succeed: {}", error),
};
std::sync::Arc::new(database)
}
fn make_http_endpoint_config(url: std::string::String) -> crate::KbHttpEndpointConfig {
crate::KbHttpEndpointConfig {
name: "backfill_http".to_string(),
enabled: true,
provider: "test".to_string(),
url,
api_key_env_var: None,
roles: vec!["history_backfill".to_string()],
requests_per_second: 100,
burst_capacity: 100,
send_transaction_requests_per_second: None,
send_transaction_burst_capacity: None,
heavy_requests_per_second: None,
heavy_burst_capacity: None,
connect_timeout_ms: 5_000,
request_timeout_ms: 5_000,
max_idle_connections_per_host: 8,
pause_after_http_429_ms: None,
max_concurrent_requests_per_endpoint: 16,
}
}
#[tokio::test]
async fn backfill_token_by_mint_reconstructs_pool_and_trade() {
let server = TestBackfillHttpServer::spawn().await;
let database = make_database().await;
let pool_result =
crate::HttpEndpointPool::from_endpoint_configs(vec![make_http_endpoint_config(
server.url.clone(),
)]);
let http_pool = match pool_result {
Ok(http_pool) => std::sync::Arc::new(http_pool),
Err(error) => panic!("http pool creation must succeed: {}", error),
};
let service = crate::KbTokenBackfillService::new(
http_pool,
database.clone(),
"history_backfill".to_string(),
);
let backfill_result = service
.backfill_token_by_mint("BackfillToken111", 20, 20)
.await;
let backfill = match backfill_result {
Ok(backfill) => backfill,
Err(error) => panic!("backfill must succeed: {}", error),
};
assert_eq!(backfill.mint_signature_count, 2);
assert_eq!(backfill.pool_address_count, 1);
assert_eq!(backfill.pool_signature_count, 1);
assert_eq!(backfill.unique_signature_count, 2);
assert_eq!(backfill.resolved_transaction_count, 2);
assert_eq!(backfill.missing_transaction_count, 0);
assert_eq!(backfill.trade_event_count, 1);
let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await;
let token_option = match token_result {
Ok(token_option) => token_option,
Err(error) => panic!("token fetch must succeed: {}", error),
};
let token = match token_option {
Some(token) => token,
None => panic!("token must exist"),
};
assert!(token.id.is_some());
let pool_result = crate::get_pool_by_address(database.as_ref(), "BackfillPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
let pool_id = match pool.id {
Some(pool_id) => pool_id,
None => panic!("pool must have an id"),
};
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
let pair_id = match pair.id {
Some(pair_id) => pair_id,
None => panic!("pair must have an id"),
};
let trade_events_result =
crate::list_trade_events_by_pair_id(database.as_ref(), pair_id).await;
let trade_events = match trade_events_result {
Ok(trade_events) => trade_events,
Err(error) => panic!("trade event list must succeed: {}", error),
};
assert_eq!(trade_events.len(), 1);
assert_eq!(trade_events[0].price_quote_per_base, Some(2.5));
let pair_metric_result =
crate::get_pair_metric_by_pair_id(database.as_ref(), pair_id).await;
let pair_metric_option = match pair_metric_result {
Ok(pair_metric_option) => pair_metric_option,
Err(error) => panic!("pair metric fetch must succeed: {}", error),
};
let pair_metric = match pair_metric_option {
Some(pair_metric) => pair_metric,
None => panic!("pair metric must exist"),
};
assert_eq!(pair_metric.trade_count, 1);
assert_eq!(pair_metric.buy_count, 1);
server.shutdown().await;
}
#[tokio::test]
async fn backfill_token_by_mint_is_state_idempotent() {
let server = TestBackfillHttpServer::spawn().await;
let database = make_database().await;
let pool_result =
crate::HttpEndpointPool::from_endpoint_configs(vec![make_http_endpoint_config(
server.url.clone(),
)]);
let http_pool = match pool_result {
Ok(http_pool) => std::sync::Arc::new(http_pool),
Err(error) => panic!("http pool creation must succeed: {}", error),
};
let service = crate::KbTokenBackfillService::new(
http_pool,
database.clone(),
"history_backfill".to_string(),
);
let first_result = service
.backfill_token_by_mint("BackfillToken111", 20, 20)
.await;
if let Err(error) = first_result {
panic!("first backfill must succeed: {}", error);
}
let second_result = service
.backfill_token_by_mint("BackfillToken111", 20, 20)
.await;
if let Err(error) = second_result {
panic!("second backfill must succeed: {}", error);
}
let token_result = crate::get_token_by_mint(database.as_ref(), "BackfillToken111").await;
let token_option = match token_result {
Ok(token_option) => token_option,
Err(error) => panic!("token fetch must succeed: {}", error),
};
let token = match token_option {
Some(token) => token,
None => panic!("token must exist"),
};
let token_id = token.id.unwrap_or_default();
let pools_result = crate::list_pools(database.as_ref()).await;
let pools = match pools_result {
Ok(pools) => pools,
Err(error) => panic!("pool list must succeed: {}", error),
};
assert_eq!(pools.len(), 1);
let pool_id = pools[0].id.unwrap_or_default();
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(database.as_ref(), pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool token list must succeed: {}", error),
};
let mut found_token = false;
for pool_token in pool_tokens {
if pool_token.token_id == token_id {
found_token = true;
}
}
assert!(found_token);
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
let pair_id = pair.id.unwrap_or_default();
let trade_events_result =
crate::list_trade_events_by_pair_id(database.as_ref(), pair_id).await;
let trade_events = match trade_events_result {
Ok(trade_events) => trade_events,
Err(error) => panic!("trade event list must succeed: {}", error),
};
assert_eq!(trade_events.len(), 1);
let pair_metrics_result = crate::list_pair_metrics(database.as_ref()).await;
let pair_metrics = match pair_metrics_result {
Ok(pair_metrics) => pair_metrics,
Err(error) => panic!("pair metric list must succeed: {}", error),
};
assert_eq!(pair_metrics.len(), 1);
server.shutdown().await;
}
}