Files
khadhroony-bobobot/kb_lib/src/dex_detect.rs
2026-06-08 12:32:58 +02:00

2189 lines
88 KiB
Rust

// file: kb_lib/src/dex_detect.rs
//! Business-level detection built from decoded DEX events.
/// Result of one business-level DEX pool detection.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DexPoolDetectionResult {
/// Parent decoded event id.
pub decoded_event_id: i64,
/// DEX id.
pub dex_id: i64,
/// Pool id.
pub pool_id: i64,
/// Pair id.
pub pair_id: i64,
/// Optional pool listing id.
pub pool_listing_id: std::option::Option<i64>,
/// Whether the pool was newly created by this detection.
pub created_pool: bool,
/// Whether the pair was newly created by this detection.
pub created_pair: bool,
/// Whether the listing was newly created by this detection.
pub created_listing: bool,
}
/// Business-level DEX detection service.
#[derive(Debug, Clone)]
pub struct DexDetectService {
database: std::sync::Arc<crate::Database>,
persistence: crate::DetectionPersistenceService,
}
impl DexDetectService {
/// Creates a new DEX detection service.
pub fn new(database: std::sync::Arc<crate::Database>) -> Self {
let persistence = crate::DetectionPersistenceService::new(database.clone());
return Self { database, persistence };
}
/// Detects business-level DEX objects from one transaction signature.
pub async fn detect_transaction_by_signature(
&self,
signature: &str,
) -> Result<std::vec::Vec<crate::DexPoolDetectionResult>, crate::Error> {
let transaction_result =
crate::query_chain_transactions_get_by_signature(self.database.as_ref(), signature)
.await;
let transaction_option = match transaction_result {
Ok(transaction_option) => transaction_option,
Err(error) => return Err(error),
};
let transaction = match transaction_option {
Some(transaction) => transaction,
None => {
return Err(crate::Error::InvalidState(format!(
"cannot detect dex objects from unknown transaction '{}'",
signature
)));
},
};
let transaction_id_option = transaction.id;
let transaction_id = match transaction_id_option {
Some(transaction_id) => transaction_id,
None => {
return Err(crate::Error::InvalidState(format!(
"transaction '{}' has no internal id",
signature
)));
},
};
let decoded_events_result = crate::query_dex_decoded_events_list_by_transaction_id(
self.database.as_ref(),
transaction_id,
)
.await;
let decoded_events = match decoded_events_result {
Ok(decoded_events) => decoded_events,
Err(error) => return Err(error),
};
let mut detection_results = std::vec::Vec::new();
for decoded_event in &decoded_events {
let route_option = crate::dex_detection_route::dex_detection_route(decoded_event);
let route = match route_option {
Some(route) => route,
None => continue,
};
if crate::dex_detection_route::dex_detection_route_requires_full_pool_context(route)
&& !crate::dex_detection_route::decoded_event_has_full_pool_context(decoded_event)
{
tracing::trace!(
decoded_event_id = ?decoded_event.id,
protocol_name = %decoded_event.protocol_name,
event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account,
token_a_mint = ?decoded_event.token_a_mint,
token_b_mint = ?decoded_event.token_b_mint,
"skipping business-level dex detection for incomplete decoded pool context"
);
continue;
}
let detect_result = match route {
crate::dex_detection_route::DexDetectionRoute::RaydiumAmmV4Initialize2Pool => {
self.detect_raydium_initialize2_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::RaydiumAmmV4Trade => {
self.detect_raydium_amm_v4_trade(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::RaydiumCpmmTrade => {
self.detect_raydium_cpmm_trade(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::RaydiumClmmTrade => {
self.detect_raydium_clmm_trade(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::RaydiumLaunchpadPool => {
self.detect_raydium_launchpad_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::PumpFunCreateV2Token => {
self.detect_pump_fun_create_v2_token(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::PumpFunTrade => {
self.detect_pump_fun_trade(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::PumpSwapTrade => {
self.detect_pump_swap_trade(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::SkipIncompletePumpSwapTrade => {
tracing::trace!(
decoded_event_id = ?decoded_event.id,
event_kind = %decoded_event.event_kind,
pool_account = ?decoded_event.pool_account,
"skipping incomplete pump_swap decoded event during detection"
);
continue;
},
crate::dex_detection_route::DexDetectionRoute::MeteoraDbcPool => {
self.detect_meteora_dbc_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::MeteoraDlmmPool => {
self.detect_meteora_dlmm_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::MeteoraDammV1Pool => {
self.detect_meteora_damm_v1_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::MeteoraDammV2Pool => {
self.detect_meteora_damm_v2_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::OrcaWhirlpoolsPool => {
self.detect_orca_whirlpools_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::FluxbeamPool => {
self.detect_fluxbeam_pool(&transaction, decoded_event).await
},
crate::dex_detection_route::DexDetectionRoute::DexlabPool => {
self.detect_dexlab_pool(&transaction, decoded_event).await
},
};
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
return Ok(detection_results);
}
async fn detect_raydium_initialize2_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_amm_v4").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::Amm,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
None,
None,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_pump_fun_create_v2_token(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "pump_fun").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let token_mint_result =
crate::dex_pool_materialization::required_token_a_mint(decoded_event);
let token_mint = match token_mint_result {
Ok(token_mint) => token_mint,
Err(error) => return Err(error),
};
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event_with_mints(
decoded_event,
dex_id,
token_mint,
crate::WSOL_MINT_ID.to_string(),
decoded_event.lp_mint.clone(),
crate::PoolKind::BondingCurve,
crate::PoolStatus::Pending,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
None,
None,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.pump_fun",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_pump_fun_trade(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "pump_fun").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let vault_addresses = extract_pump_fun_vault_addresses(&payload_value);
let token_a_vault_address = vault_addresses.0;
let token_b_vault_address = vault_addresses.1;
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::BondingCurve,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
token_a_vault_address,
token_b_vault_address,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.pump_fun",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_pump_swap_trade(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "pump_swap").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let vault_addresses = extract_pump_swap_vault_addresses(&payload_value);
let token_a_vault_address = vault_addresses.0;
let token_b_vault_address = vault_addresses.1;
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::Amm,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
token_a_vault_address,
token_b_vault_address,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.pump_swap",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_meteora_dbc_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event_with_payload_vaults(
transaction,
decoded_event,
"meteora_dbc",
crate::PoolKind::BondingCurve,
crate::PoolStatus::Pending,
"signal.dex.meteora_dbc",
)
.await;
}
async fn detect_meteora_dlmm_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "meteora_dlmm").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let reserve_x_account = extract_payload_string_field(&payload_value, "reserveXAccount");
let reserve_y_account = extract_payload_string_field(&payload_value, "reserveYAccount");
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::Clmm,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
reserve_x_account,
reserve_y_account,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.meteora_dlmm",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_meteora_damm_v1_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event_with_payload_vaults(
transaction,
decoded_event,
"meteora_damm_v1",
crate::PoolKind::Amm,
crate::PoolStatus::Active,
"signal.dex.meteora_damm_v1",
)
.await;
}
async fn detect_meteora_damm_v2_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event_with_payload_vaults(
transaction,
decoded_event,
"meteora_damm_v2",
crate::PoolKind::Amm,
crate::PoolStatus::Active,
"signal.dex.meteora_damm_v2",
)
.await;
}
async fn detect_orca_whirlpools_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event(
transaction,
decoded_event,
"orca_whirlpools",
crate::PoolKind::Clmm,
crate::PoolStatus::Active,
"signal.dex.orca_whirlpools",
)
.await;
}
async fn detect_fluxbeam_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event(
transaction,
decoded_event,
"fluxbeam",
crate::PoolKind::Amm,
crate::PoolStatus::Active,
"signal.dex.fluxbeam",
)
.await;
}
async fn detect_dexlab_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event(
transaction,
decoded_event,
"dexlab",
crate::PoolKind::Amm,
crate::PoolStatus::Active,
"signal.dex.dexlab",
)
.await;
}
async fn detect_raydium_amm_v4_trade(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_amm_v4").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let base_vault_address = extract_payload_string_field(&payload_value, "baseVault");
let quote_vault_address = extract_payload_string_field(&payload_value, "quoteVault");
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::Amm,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::AlreadyBaseQuote,
base_vault_address,
quote_vault_address,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.raydium_amm_v4",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_raydium_clmm_trade(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_clmm").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let base_vault_address = extract_payload_string_field(&payload_value, "base_vault");
let quote_vault_address = extract_payload_string_field(&payload_value, "quote_vault");
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::Clmm,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::AlreadyBaseQuote,
base_vault_address,
quote_vault_address,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.raydium_clmm",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_raydium_launchpad_pool(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
return self
.detect_materialized_pool_from_decoded_event(
transaction,
decoded_event,
"raydium_launchpad",
crate::PoolKind::BondingCurve,
crate::PoolStatus::Pending,
"signal.dex.raydium_launchpad",
)
.await;
}
async fn detect_raydium_cpmm_trade(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), "raydium_cpmm").await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let base_vault_address = extract_payload_string_field(&payload_value, "base_vault");
let quote_vault_address = extract_payload_string_field(&payload_value, "quote_vault");
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
crate::PoolKind::Amm,
crate::PoolStatus::Active,
crate::dex_pool_materialization::DexPoolTokenOrder::AlreadyBaseQuote,
base_vault_address,
quote_vault_address,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
"signal.dex.raydium_cpmm",
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_materialized_pool_from_decoded_event(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
dex_code: &str,
pool_kind: crate::PoolKind,
pool_status: crate::PoolStatus,
signal_prefix: &str,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), dex_code).await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
pool_kind,
pool_status,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
None,
None,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
signal_prefix,
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn detect_materialized_pool_from_decoded_event_with_payload_vaults(
&self,
transaction: &crate::ChainTransactionDto,
decoded_event: &crate::DexDecodedEventDto,
dex_code: &str,
pool_kind: crate::PoolKind,
pool_status: crate::PoolStatus,
signal_prefix: &str,
) -> Result<crate::DexPoolDetectionResult, crate::Error> {
let dex_id_result =
crate::dex_catalog::ensure_known_dex(self.database.as_ref(), dex_code).await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let payload_value_result = parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
let base_vault = extract_payload_string_field(&payload_value, "baseVault");
let quote_vault = extract_payload_string_field(&payload_value, "quoteVault");
let input_result =
crate::dex_pool_materialization::DexPoolMaterializationInput::from_decoded_event(
decoded_event,
dex_id,
pool_kind,
pool_status,
crate::dex_pool_materialization::DexPoolTokenOrder::ChooseBaseQuoteFromTokenAB,
base_vault,
quote_vault,
transaction.source_endpoint_name.clone(),
);
let input = match input_result {
Ok(input) => input,
Err(error) => return Err(error),
};
let detection_result =
crate::dex_pool_materialization::materialize_dex_pool(self.database.as_ref(), &input)
.await;
let detection_result = match detection_result {
Ok(detection_result) => detection_result,
Err(error) => return Err(error),
};
let signal_result = self
.record_pool_detection_signals(
transaction,
signal_prefix,
&detection_result,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
return Ok(detection_result);
}
async fn record_detection_signal(
&self,
transaction: &crate::ChainTransactionDto,
signal_kind: &str,
severity: crate::AnalysisSignalSeverity,
payload: serde_json::Value,
) -> Result<i64, crate::Error> {
let observation_result = self
.persistence
.record_observation(&crate::DetectionObservationInput::new(
"dex.business_detection".to_string(),
crate::ObservationSourceKind::HttpRpc,
transaction.source_endpoint_name.clone(),
transaction.signature.clone(),
transaction.slot,
payload.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
return self
.persistence
.record_signal(&crate::DetectionSignalInput::new(
signal_kind.to_string(),
severity,
transaction.signature.clone(),
Some(observation_id),
None,
payload,
))
.await;
}
async fn record_pool_detection_signals(
&self,
transaction: &crate::ChainTransactionDto,
signal_prefix: &str,
detection_result: &crate::DexPoolDetectionResult,
payload: serde_json::Value,
) -> Result<(), crate::Error> {
if detection_result.created_pool {
let signal_kind = format!("{signal_prefix}.new_pool");
let signal_result = self
.record_detection_signal(
transaction,
signal_kind.as_str(),
crate::AnalysisSignalSeverity::Low,
payload.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if detection_result.created_pair {
let signal_kind = format!("{signal_prefix}.new_pair");
let signal_result = self
.record_detection_signal(
transaction,
signal_kind.as_str(),
crate::AnalysisSignalSeverity::Low,
payload.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if detection_result.created_listing {
let signal_kind = format!("{signal_prefix}.first_listing_seen");
let signal_result = self
.record_detection_signal(
transaction,
signal_kind.as_str(),
crate::AnalysisSignalSeverity::Low,
payload,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
return Ok(());
}
}
fn parse_payload_json(payload_json: &str) -> Result<serde_json::Value, crate::Error> {
let parse_result = serde_json::from_str::<serde_json::Value>(payload_json);
match parse_result {
Ok(value) => return Ok(value),
Err(error) => {
return Err(crate::Error::Json(format!(
"cannot parse dex decoded event payload_json '{}': {}",
payload_json, error
)));
},
}
}
fn extract_string_from_array_index(
values: &[serde_json::Value],
index: usize,
) -> std::option::Option<std::string::String> {
if index >= values.len() {
return None;
}
let value = &values[index];
let text_option = value.as_str();
let text = match text_option {
Some(text) => text.trim(),
None => return None,
};
if text.is_empty() {
return None;
}
return Some(text.to_string());
}
fn extract_pump_fun_vault_addresses(
payload_value: &serde_json::Value,
) -> (
std::option::Option<std::string::String>,
std::option::Option<std::string::String>,
) {
let accounts_option = payload_value.get("accounts");
let accounts = match accounts_option {
Some(accounts) => accounts,
None => return (None, None),
};
let accounts_array_option = accounts.as_array();
let accounts_array = match accounts_array_option {
Some(accounts_array) => accounts_array,
None => return (None, None),
};
let token_a_vault_address = extract_string_from_array_index(accounts_array, 4);
let token_b_native_address = extract_string_from_array_index(accounts_array, 3);
return (token_a_vault_address, token_b_native_address);
}
fn extract_pump_swap_vault_addresses(
payload_value: &serde_json::Value,
) -> (
std::option::Option<std::string::String>,
std::option::Option<std::string::String>,
) {
let accounts_option = payload_value.get("accounts");
let accounts = match accounts_option {
Some(accounts) => accounts,
None => return (None, None),
};
let accounts_array_option = accounts.as_array();
let accounts_array = match accounts_array_option {
Some(accounts_array) => accounts_array,
None => return (None, None),
};
let token_a_vault_address = extract_string_from_array_index(accounts_array, 7);
let token_b_vault_address = extract_string_from_array_index(accounts_array, 8);
return (token_a_vault_address, token_b_vault_address);
}
fn extract_payload_string_field(
payload_value: &serde_json::Value,
field_name: &str,
) -> std::option::Option<std::string::String> {
let value_option = payload_value.get(field_name);
let value = match value_option {
Some(value) => value,
None => return None,
};
match value.as_str() {
Some(value) => return Some(value.to_string()),
None => return None,
}
}
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc<crate::Database> {
let tempdir_result = tempfile::tempdir();
let tempdir = match tempdir_result {
Ok(tempdir) => tempdir,
Err(error) => panic!("tempdir must succeed: {}", error),
};
let database_path = tempdir.path().join("dex_detect.sqlite3");
let config = crate::DatabaseConfig {
enabled: true,
backend: crate::DatabaseBackend::Sqlite,
sqlite: crate::SqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database_result = crate::Database::connect_and_initialize(&config).await;
let database = match database_result {
Ok(database) => database,
Err(error) => panic!("database init must succeed: {}", error),
};
return std::sync::Arc::new(database);
}
async fn seed_decoded_raydium_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910001,
"blockTime": 1779100001,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::RAYDIUM_AMM_V4_PROGRAM_ID,
"program": "raydium_amm_v4",
"stackHeight": 1,
"accounts": [
"Account0",
"Account1",
"Account2",
"Account3",
"PoolDetect111",
"Account5",
"Account6",
"LpDetect111",
"TokenDetectA111",
crate::WSOL_MINT_ID,
"Account10",
"Account11",
"Account12",
"Account13",
"Account14",
"Account15",
"MarketDetect111"
],
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: initialize2"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_raydium_event(database.clone(), "sig-dex-detect-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "PoolDetect111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 3);
}
#[tokio::test]
async fn detect_transaction_by_signature_is_idempotent() {
let database = make_database().await;
seed_decoded_raydium_event(database.clone(), "sig-dex-detect-2").await;
let detect_service = crate::DexDetectService::new(database.clone());
let first_result = detect_service.detect_transaction_by_signature("sig-dex-detect-2").await;
let first_results = match first_result {
Ok(first_results) => first_results,
Err(error) => panic!("first dex detect must succeed: {}", error),
};
assert_eq!(first_results.len(), 1);
assert!(first_results[0].created_pool);
assert!(first_results[0].created_pair);
assert!(first_results[0].created_listing);
let second_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-2").await;
let second_results = match second_result {
Ok(second_results) => second_results,
Err(error) => panic!("second dex detect must succeed: {}", error),
};
assert_eq!(second_results.len(), 1);
assert!(!second_results[0].created_pool);
assert!(!second_results[0].created_pair);
assert!(!second_results[0].created_listing);
let pools_result = crate::query_pools_list(database.as_ref()).await;
let pools = match pools_result {
Ok(pools) => pools,
Err(error) => panic!("pools list must succeed: {}", error),
};
assert_eq!(pools.len(), 1);
let pairs_result = crate::query_pairs_list(database.as_ref()).await;
let pairs = match pairs_result {
Ok(pairs) => pairs,
Err(error) => panic!("pairs list must succeed: {}", error),
};
assert_eq!(pairs.len(), 1);
let listings_result = crate::query_pool_listings_list(database.as_ref()).await;
let listings = match listings_result {
Ok(listings) => listings,
Err(error) => panic!("listings list must succeed: {}", error),
};
assert_eq!(listings.len(), 1);
}
async fn seed_decoded_pump_fun_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910002,
"blockTime": 1779100002,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::PUMP_FUN_PROGRAM_ID,
"program": "pump",
"stackHeight": 1,
"accounts": [
"MintPumpDetect111",
"MintAuthority111",
"BondingCurveDetect111",
"AssociatedBondingCurveDetect111",
"Global111",
"CreatorDetect111",
"System111",
"Token2022Program111",
"AtaProgram111"
],
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreateV2"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_pump_fun_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_pump_fun_event(database.clone(), "sig-dex-detect-pump-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-pump-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "BondingCurveDetect111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::BondingCurve);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_pump_swap_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910003,
"blockTime": 1779100003,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::PUMP_SWAP_PROGRAM_ID,
"program": "pump-amm",
"stackHeight": 1,
"accounts": [
"PumpSwapDetectPool111",
"PumpSwapDetectTokenA111",
crate::WSOL_MINT_ID,
"PumpSwapDetectPoolV2_111"
],
"parsed": {
"info": {
"pool": "PumpSwapDetectPool111",
"baseMint": "PumpSwapDetectTokenA111",
"quoteMint": crate::WSOL_MINT_ID,
"poolV2": "PumpSwapDetectPoolV2_111"
}
},
"data": "AJTQ2h9DXrBfqJi53PDQG2Fvki5tkaTU3"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: Buy"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_pump_swap_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result = detect_service
.detect_transaction_by_signature("sig-dex-detect-pumpswap-1")
.await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "PumpSwapDetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::Amm);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
#[tokio::test]
async fn detect_transaction_by_signature_is_idempotent_for_pump_swap() {
let database = make_database().await;
seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-2").await;
let detect_service = crate::DexDetectService::new(database.clone());
let first_result = detect_service
.detect_transaction_by_signature("sig-dex-detect-pumpswap-2")
.await;
let first_results = match first_result {
Ok(first_results) => first_results,
Err(error) => panic!("first dex detect must succeed: {}", error),
};
assert_eq!(first_results.len(), 1);
assert!(first_results[0].created_pool);
assert!(first_results[0].created_pair);
assert!(first_results[0].created_listing);
let second_result = detect_service
.detect_transaction_by_signature("sig-dex-detect-pumpswap-2")
.await;
let second_results = match second_result {
Ok(second_results) => second_results,
Err(error) => panic!("second dex detect must succeed: {}", error),
};
assert_eq!(second_results.len(), 1);
assert!(!second_results[0].created_pool);
assert!(!second_results[0].created_pair);
assert!(!second_results[0].created_listing);
}
async fn seed_decoded_meteora_dbc_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910004,
"blockTime": 1779100004,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::METEORA_DBC_PROGRAM_ID,
"program": "meteora_dbc",
"stackHeight": 1,
"accounts": [
"DbcDetectPool111",
"DbcDetectTokenA111",
crate::WSOL_MINT_ID,
"DbcDetectConfig111",
"DbcDetectCreator111"
],
"parsed": {
"info": {
"pool": "DbcDetectPool111",
"baseMint": "DbcDetectTokenA111",
"quoteMint": crate::WSOL_MINT_ID,
"poolConfig": "DbcDetectConfig111",
"creator": "DbcDetectCreator111"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreatePool"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_meteora_dbc_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_meteora_dbc_event(database.clone(), "sig-dex-detect-dbc-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-dbc-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "DbcDetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::BondingCurve);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_meteora_damm_v2_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910005,
"blockTime": 1779100005,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::METEORA_DAMM_V2_PROGRAM_ID,
"program": "meteora_damm_v2",
"stackHeight": 1,
"accounts": [
"DammV2DetectPool111",
"DammV2DetectTokenA111",
crate::WSOL_MINT_ID,
"DammV2DetectConfig111",
"DammV2DetectCreator111"
],
"parsed": {
"info": {
"instruction": "initialize_customizable_pool",
"pool": "DammV2DetectPool111",
"tokenAMint": "DammV2DetectTokenA111",
"tokenBMint": crate::WSOL_MINT_ID,
"creator": "DammV2DetectCreator111",
"isCustomizablePool": true
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: InitializeCustomizablePool"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_meteora_damm_v2_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_meteora_damm_v2_event(database.clone(), "sig-dex-detect-dammv2-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-dammv2-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "DammV2DetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::Amm);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_meteora_damm_v1_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910006,
"blockTime": 1779100006,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::METEORA_DAMM_V1_PROGRAM_ID,
"program": "meteora_damm_v1",
"stackHeight": 1,
"accounts": [
"DammV1DetectPool111",
"DammV1DetectTokenA111",
crate::WSOL_MINT_ID,
"DammV1DetectConfig111",
"DammV1DetectCreator111"
],
"parsed": {
"info": {
"instruction": "initialize_pool_with_config",
"pool": "DammV1DetectPool111",
"tokenAMint": "DammV1DetectTokenA111",
"tokenBMint": crate::WSOL_MINT_ID,
"config": "DammV1DetectConfig111",
"creator": "DammV1DetectCreator111"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: InitializePoolWithConfig"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_meteora_damm_v1_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_meteora_damm_v1_event(database.clone(), "sig-dex-detect-dammv1-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-dammv1-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "DammV1DetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::Amm);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_orca_whirlpools_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910007,
"blockTime": 1779100007,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::ORCA_WHIRLPOOLS_PROGRAM_ID,
"program": "orca_whirlpools",
"stackHeight": 1,
"accounts": [
"OrcaDetectPool111",
"OrcaDetectTokenA111",
crate::WSOL_MINT_ID,
"OrcaDetectConfig111",
"OrcaDetectCreator111"
],
"parsed": {
"info": {
"instruction": "initialize_pool_v2",
"whirlpool": "OrcaDetectPool111",
"tokenMintA": "OrcaDetectTokenA111",
"tokenMintB": crate::WSOL_MINT_ID,
"whirlpoolsConfig": "OrcaDetectConfig111",
"funder": "OrcaDetectCreator111",
"tokenProgramA": crate::SPL_TOKEN_PROGRAM_ID,
"tokenProgramB": crate::SPL_TOKEN_PROGRAM_ID
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: InitializePoolV2"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_orca_whirlpools_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_orca_whirlpools_event(database.clone(), "sig-dex-detect-orca-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-orca-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "OrcaDetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::Clmm);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_fluxbeam_event(
database: std::sync::Arc<crate::Database>,
signature: &str,
) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910008,
"blockTime": 1779100008,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::FLUXBEAM_PROGRAM_ID,
"program": "fluxbeam",
"stackHeight": 1,
"accounts": [
"FluxDetectPool111",
"FluxDetectLpMint111",
"FluxDetectTokenA111",
crate::WSOL_MINT_ID,
"FluxDetectCreator111"
],
"parsed": {
"info": {
"instruction": "create_pool",
"pool": "FluxDetectPool111",
"lpMint": "FluxDetectLpMint111",
"tokenA": "FluxDetectTokenA111",
"tokenB": crate::WSOL_MINT_ID,
"payer": "FluxDetectCreator111"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreatePool"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_fluxbeam_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_fluxbeam_event(database.clone(), "sig-dex-detect-fluxbeam-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result = detect_service
.detect_transaction_by_signature("sig-dex-detect-fluxbeam-1")
.await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "FluxDetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::Amm);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_dexlab_event(database: std::sync::Arc<crate::Database>, signature: &str) {
let transaction_model = crate::TransactionModelService::new(database.clone());
let dex_decode = crate::DexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910009,
"blockTime": 1779100009,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::DEXLAB_PROGRAM_ID,
"program": "dexlab",
"stackHeight": 1,
"accounts": [
"DexlabDetectPool111",
"DexlabDetectTokenA111",
crate::WSOL_MINT_ID,
"DexlabDetectCreator111"
],
"parsed": {
"info": {
"instruction": "create_pool",
"pool": "DexlabDetectPool111",
"tokenA": "DexlabDetectTokenA111",
"tokenB": crate::WSOL_MINT_ID,
"payer": "DexlabDetectCreator111",
"feeTier": "0.3%"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreatePool"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_dexlab_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_dexlab_event(database.clone(), "sig-dex-detect-dexlab-1").await;
let detect_service = crate::DexDetectService::new(database.clone());
let detect_result =
detect_service.detect_transaction_by_signature("sig-dex-detect-dexlab-1").await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::query_pools_get_by_address(database.as_ref(), "DexlabDetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::PoolKind::Amm);
let pair_result =
crate::query_pairs_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::query_pool_listings_get_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::query_pool_tokens_list_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
}