This commit is contained in:
2026-04-28 20:03:57 +02:00
parent d036b4c79f
commit 574f90ddee
8 changed files with 1367 additions and 19 deletions

View File

@@ -129,6 +129,30 @@ impl KbDexDetectService {
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "meteora_dbc"
&& decoded_event.event_kind == "meteora_dbc.create_pool"
{
let detect_result = self
.detect_meteora_dbc_pool(&transaction, decoded_event)
.await;
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
if decoded_event.protocol_name == "meteora_dbc"
&& decoded_event.event_kind == "meteora_dbc.swap"
{
let detect_result = self
.detect_meteora_dbc_pool(&transaction, decoded_event)
.await;
let detect_result = match detect_result {
Ok(detect_result) => detect_result,
Err(error) => return Err(error),
};
detection_results.push(detect_result);
}
}
Ok(detection_results)
}
@@ -882,6 +906,276 @@ impl KbDexDetectService {
})
}
async fn detect_meteora_dbc_pool(
&self,
transaction: &crate::KbChainTransactionDto,
decoded_event: &crate::KbDexDecodedEventDto,
) -> Result<crate::KbDexPoolDetectionResult, crate::KbError> {
let decoded_event_id_option = decoded_event.id;
let decoded_event_id = match decoded_event_id_option {
Some(decoded_event_id) => decoded_event_id,
None => {
return Err(crate::KbError::InvalidState(
"decoded dex event has no internal id".to_string(),
));
}
};
let dex_id_result = self.ensure_meteora_dbc_dex().await;
let dex_id = match dex_id_result {
Ok(dex_id) => dex_id,
Err(error) => return Err(error),
};
let pool_address_option = decoded_event.pool_account.clone();
let pool_address = match pool_address_option {
Some(pool_address) => pool_address,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no pool_account",
decoded_event_id
)));
}
};
let token_a_mint_option = decoded_event.token_a_mint.clone();
let token_a_mint = match token_a_mint_option {
Some(token_a_mint) => token_a_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_a_mint",
decoded_event_id
)));
}
};
let token_b_mint_option = decoded_event.token_b_mint.clone();
let token_b_mint = match token_b_mint_option {
Some(token_b_mint) => token_b_mint,
None => {
return Err(crate::KbError::InvalidState(format!(
"decoded event '{}' has no token_b_mint",
decoded_event_id
)));
}
};
let base_is_token_a =
kb_choose_base_quote_order(token_a_mint.as_str(), token_b_mint.as_str());
let base_mint = if base_is_token_a {
token_a_mint.clone()
} else {
token_b_mint.clone()
};
let quote_mint = if base_is_token_a {
token_b_mint.clone()
} else {
token_a_mint.clone()
};
let base_token_id_result = self.ensure_token(base_mint.as_str()).await;
let base_token_id = match base_token_id_result {
Ok(base_token_id) => base_token_id,
Err(error) => return Err(error),
};
let quote_token_id_result = self.ensure_token(quote_mint.as_str()).await;
let quote_token_id = match quote_token_id_result {
Ok(quote_token_id) => quote_token_id,
Err(error) => return Err(error),
};
let existing_pool_result =
crate::get_pool_by_address(self.database.as_ref(), pool_address.as_str()).await;
let existing_pool_option = match existing_pool_result {
Ok(existing_pool_option) => existing_pool_option,
Err(error) => return Err(error),
};
let created_pool = existing_pool_option.is_none();
let pool_id = match existing_pool_option {
Some(pool) => {
let pool_id_option = pool.id;
match pool_id_option {
Some(pool_id) => pool_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pool '{}' has no internal id",
pool.address
)));
}
}
}
None => {
let pool_dto = crate::KbPoolDto::new(
dex_id,
pool_address.clone(),
crate::KbPoolKind::BondingCurve,
crate::KbPoolStatus::Pending,
);
let upsert_result = crate::upsert_pool(self.database.as_ref(), &pool_dto).await;
match upsert_result {
Ok(pool_id) => pool_id,
Err(error) => return Err(error),
}
}
};
let existing_pair_result =
crate::get_pair_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_pair_option = match existing_pair_result {
Ok(existing_pair_option) => existing_pair_option,
Err(error) => return Err(error),
};
let created_pair = existing_pair_option.is_none();
let pair_symbol = kb_build_pair_symbol(base_mint.as_str(), quote_mint.as_str());
let pair_id = match existing_pair_option {
Some(pair) => {
let pair_id_option = pair.id;
match pair_id_option {
Some(pair_id) => pair_id,
None => {
return Err(crate::KbError::InvalidState(format!(
"pair for pool '{}' has no internal id",
pool_id
)));
}
}
}
None => {
let pair_dto = crate::KbPairDto::new(
dex_id,
pool_id,
base_token_id,
quote_token_id,
pair_symbol,
);
let upsert_result = crate::upsert_pair(self.database.as_ref(), &pair_dto).await;
match upsert_result {
Ok(pair_id) => pair_id,
Err(error) => return Err(error),
}
}
};
let upsert_base_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
base_token_id,
crate::KbPoolTokenRole::Base,
None,
Some(0),
),
)
.await;
if let Err(error) = upsert_base_pool_token_result {
return Err(error);
}
let upsert_quote_pool_token_result = crate::upsert_pool_token(
self.database.as_ref(),
&crate::KbPoolTokenDto::new(
pool_id,
quote_token_id,
crate::KbPoolTokenRole::Quote,
None,
Some(1),
),
)
.await;
if let Err(error) = upsert_quote_pool_token_result {
return Err(error);
}
let existing_listing_result =
crate::get_pool_listing_by_pool_id(self.database.as_ref(), pool_id).await;
let existing_listing_option = match existing_listing_result {
Ok(existing_listing_option) => existing_listing_option,
Err(error) => return Err(error),
};
let created_listing = existing_listing_option.is_none();
let pool_listing_id = match existing_listing_option {
Some(pool_listing) => pool_listing.id,
None => {
let listing_id_result = self
.upsert_pool_listing_from_decoded_event(dex_id, pool_id, pair_id, transaction)
.await;
match listing_id_result {
Ok(listing_id) => Some(listing_id),
Err(error) => return Err(error),
}
}
};
let payload_value_result = kb_parse_payload_json(decoded_event.payload_json.as_str());
let payload_value = match payload_value_result {
Ok(payload_value) => payload_value,
Err(error) => return Err(error),
};
if created_pool {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.meteora_dbc.new_pool",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_pair {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.meteora_dbc.new_pair",
crate::KbAnalysisSignalSeverity::Low,
payload_value.clone(),
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
if created_listing {
let signal_result = self
.record_detection_signal(
transaction,
"signal.dex.meteora_dbc.first_listing_seen",
crate::KbAnalysisSignalSeverity::Low,
payload_value,
)
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(crate::KbDexPoolDetectionResult {
decoded_event_id,
dex_id,
pool_id,
pair_id,
pool_listing_id,
created_pool,
created_pair,
created_listing,
})
}
async fn ensure_meteora_dbc_dex(&self) -> Result<i64, crate::KbError> {
let dex_result = crate::get_dex_by_code(self.database.as_ref(), "meteora_dbc").await;
let dex_option = match dex_result {
Ok(dex_option) => dex_option,
Err(error) => return Err(error),
};
match dex_option {
Some(dex) => match dex.id {
Some(dex_id) => Ok(dex_id),
None => Err(crate::KbError::InvalidState(
"meteora_dbc dex has no internal id".to_string(),
)),
},
None => {
let dex_dto = crate::KbDexDto::new(
"meteora_dbc".to_string(),
"Meteora DBC".to_string(),
Some(crate::KB_METEORA_DBC_PROGRAM_ID.to_string()),
None,
true,
);
crate::upsert_dex(self.database.as_ref(), &dex_dto).await
}
}
}
async fn ensure_pump_swap_dex(&self) -> Result<i64, crate::KbError> {
let dex_result = crate::get_dex_by_code(self.database.as_ref(), "pump_swap").await;
let dex_option = match dex_result {
@@ -1399,7 +1693,7 @@ mod tests {
};
assert_eq!(pool_tokens.len(), 2);
}
async fn seed_decoded_pump_swap_event(
database: std::sync::Arc<crate::KbDatabase>,
signature: &str,
@@ -1458,14 +1752,12 @@ mod tests {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_pump_swap_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_pump_swap_event(database.clone(), "sig-dex-detect-pumpswap-1").await;
let detect_service = crate::KbDexDetectService::new(database.clone());
let detect_result = detect_service
.detect_transaction_by_signature("sig-dex-detect-pumpswap-1")
.await;
@@ -1473,12 +1765,10 @@ mod tests {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result =
crate::get_pool_by_address(database.as_ref(), "PumpSwapDetectPool111").await;
let pool_option = match pool_result {
@@ -1491,7 +1781,6 @@ mod tests {
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::KbPoolKind::Amm);
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
@@ -1502,7 +1791,6 @@ mod tests {
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::get_pool_listing_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
@@ -1514,7 +1802,6 @@ mod tests {
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
@@ -1552,4 +1839,122 @@ mod tests {
assert!(!second_results[0].created_pair);
assert!(!second_results[0].created_listing);
}
async fn seed_decoded_meteora_dbc_event(
database: std::sync::Arc<crate::KbDatabase>,
signature: &str,
) {
let transaction_model = crate::KbTransactionModelService::new(database.clone());
let dex_decode = crate::KbDexDecodeService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 910004,
"blockTime": 1779100004,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::KB_METEORA_DBC_PROGRAM_ID,
"program": "meteora-dbc",
"stackHeight": 1,
"accounts": [
"DbcDetectPool111",
"DbcDetectTokenA111",
"So11111111111111111111111111111111111111112",
"DbcDetectConfig111",
"DbcDetectCreator111"
],
"parsed": {
"info": {
"pool": "DbcDetectPool111",
"baseMint": "DbcDetectTokenA111",
"quoteMint": "So11111111111111111111111111111111111111112",
"poolConfig": "DbcDetectConfig111",
"creator": "DbcDetectCreator111"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreatePool"
]
}
});
let project_result = transaction_model
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = project_result {
panic!("projection must succeed: {}", error);
}
let decode_result = dex_decode.decode_transaction_by_signature(signature).await;
if let Err(error) = decode_result {
panic!("dex decode must succeed: {}", error);
}
}
#[tokio::test]
async fn detect_transaction_by_signature_creates_meteora_dbc_pool_pair_and_listing() {
let database = make_database().await;
seed_decoded_meteora_dbc_event(database.clone(), "sig-dex-detect-dbc-1").await;
let detect_service = crate::KbDexDetectService::new(database.clone());
let detect_result = detect_service
.detect_transaction_by_signature("sig-dex-detect-dbc-1")
.await;
let results = match detect_result {
Ok(results) => results,
Err(error) => panic!("dex detect must succeed: {}", error),
};
assert_eq!(results.len(), 1);
assert!(results[0].created_pool);
assert!(results[0].created_pair);
assert!(results[0].created_listing);
let pool_result = crate::get_pool_by_address(database.as_ref(), "DbcDetectPool111").await;
let pool_option = match pool_result {
Ok(pool_option) => pool_option,
Err(error) => panic!("pool fetch must succeed: {}", error),
};
let pool = match pool_option {
Some(pool) => pool,
None => panic!("pool must exist"),
};
assert_eq!(pool.id, Some(results[0].pool_id));
assert_eq!(pool.pool_kind, crate::KbPoolKind::BondingCurve);
let pair_result = crate::get_pair_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pair_option = match pair_result {
Ok(pair_option) => pair_option,
Err(error) => panic!("pair fetch must succeed: {}", error),
};
let pair = match pair_option {
Some(pair) => pair,
None => panic!("pair must exist"),
};
assert_eq!(pair.id, Some(results[0].pair_id));
let listing_result =
crate::get_pool_listing_by_pool_id(database.as_ref(), results[0].pool_id).await;
let listing_option = match listing_result {
Ok(listing_option) => listing_option,
Err(error) => panic!("listing fetch must succeed: {}", error),
};
let listing = match listing_option {
Some(listing) => listing,
None => panic!("listing must exist"),
};
assert_eq!(listing.id, results[0].pool_listing_id);
let pool_tokens_result =
crate::list_pool_tokens_by_pool_id(database.as_ref(), results[0].pool_id).await;
let pool_tokens = match pool_tokens_result {
Ok(pool_tokens) => pool_tokens,
Err(error) => panic!("pool tokens list must succeed: {}", error),
};
assert_eq!(pool_tokens.len(), 2);
}
}