This commit is contained in:
2026-04-29 11:21:40 +02:00
parent 7fbeb8826b
commit e37f8415fb
8 changed files with 1349 additions and 9 deletions

View File

@@ -15,6 +15,7 @@ pub struct KbDexDecodeService {
meteora_damm_v1_decoder: crate::KbMeteoraDammV1Decoder,
meteora_damm_v2_decoder: crate::KbMeteoraDammV2Decoder,
fluxbeam_decoder: crate::KbFluxbeamDecoder,
dexlab_decoder: crate::KbDexlabDecoder,
}
impl KbDexDecodeService {
@@ -32,6 +33,7 @@ impl KbDexDecodeService {
meteora_damm_v1_decoder: crate::KbMeteoraDammV1Decoder::new(),
meteora_damm_v2_decoder: crate::KbMeteoraDammV2Decoder::new(),
fluxbeam_decoder: crate::KbFluxbeamDecoder::new(),
dexlab_decoder: crate::KbDexlabDecoder::new(),
}
}
@@ -211,9 +213,221 @@ impl KbDexDecodeService {
};
persisted.push(persisted_event);
}
let dexlab_decoded_result = self
.dexlab_decoder
.decode_transaction(&transaction, &instructions);
let dexlab_decoded = match dexlab_decoded_result {
Ok(dexlab_decoded) => dexlab_decoded,
Err(error) => return Err(error),
};
for decoded_event in &dexlab_decoded {
let persist_result = self.persist_dexlab_event(&transaction, decoded_event).await;
let persisted_event = match persist_result {
Ok(persisted_event) => persisted_event,
Err(error) => return Err(error),
};
persisted.push(persisted_event);
}
Ok(persisted)
}
async fn persist_dexlab_event(
&self,
transaction: &crate::KbChainTransactionDto,
decoded_event: &crate::KbDexlabDecodedEvent,
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
match decoded_event {
crate::KbDexlabDecodedEvent::CreatePool(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded dexlab payload: {}",
error
)));
}
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
event.transaction_id,
Some(event.instruction_id),
"dexlab.create_pool",
)
.await;
let existing_option = match existing_result {
Ok(existing_option) => existing_option,
Err(error) => return Err(error),
};
let already_present = existing_option.is_some();
let dto = crate::KbDexDecodedEventDto::new(
event.transaction_id,
Some(event.instruction_id),
"dexlab".to_string(),
event.program_id.clone(),
"dexlab.create_pool".to_string(),
event.pool_account.clone(),
None,
event.token_a_mint.clone(),
event.token_b_mint.clone(),
None,
payload_json,
);
let upsert_result =
crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
let fetched_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
event.transaction_id,
Some(event.instruction_id),
"dexlab.create_pool",
)
.await;
let fetched_option = match fetched_result {
Ok(fetched_option) => fetched_option,
Err(error) => return Err(error),
};
let fetched = match fetched_option {
Some(fetched) => fetched,
None => {
return Err(crate::KbError::InvalidState(
"decoded event disappeared after upsert".to_string(),
));
}
};
if !already_present {
let payload_value = event.payload_json.clone();
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
"dex.dexlab.create_pool".to_string(),
crate::KbObservationSourceKind::HttpRpc,
transaction.source_endpoint_name.clone(),
transaction.signature.clone(),
transaction.slot,
payload_value.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
"signal.dex.dexlab.create_pool".to_string(),
crate::KbAnalysisSignalSeverity::Low,
transaction.signature.clone(),
Some(observation_id),
None,
payload_value,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(fetched)
}
crate::KbDexlabDecodedEvent::Swap(event) => {
let payload_json_result = serde_json::to_string(&event.payload_json);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize decoded dexlab payload: {}",
error
)));
}
};
let existing_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
event.transaction_id,
Some(event.instruction_id),
"dexlab.swap",
)
.await;
let existing_option = match existing_result {
Ok(existing_option) => existing_option,
Err(error) => return Err(error),
};
let already_present = existing_option.is_some();
let dto = crate::KbDexDecodedEventDto::new(
event.transaction_id,
Some(event.instruction_id),
"dexlab".to_string(),
event.program_id.clone(),
"dexlab.swap".to_string(),
event.pool_account.clone(),
None,
event.token_a_mint.clone(),
event.token_b_mint.clone(),
None,
payload_json,
);
let upsert_result =
crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
let fetched_result = crate::get_dex_decoded_event_by_key(
self.database.as_ref(),
event.transaction_id,
Some(event.instruction_id),
"dexlab.swap",
)
.await;
let fetched_option = match fetched_result {
Ok(fetched_option) => fetched_option,
Err(error) => return Err(error),
};
let fetched = match fetched_option {
Some(fetched) => fetched,
None => {
return Err(crate::KbError::InvalidState(
"decoded event disappeared after upsert".to_string(),
));
}
};
if !already_present {
let payload_value = event.payload_json.clone();
let observation_result = self
.persistence
.record_observation(&crate::KbDetectionObservationInput::new(
"dex.dexlab.swap".to_string(),
crate::KbObservationSourceKind::HttpRpc,
transaction.source_endpoint_name.clone(),
transaction.signature.clone(),
transaction.slot,
payload_value.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = self
.persistence
.record_signal(&crate::KbDetectionSignalInput::new(
"signal.dex.dexlab.swap".to_string(),
crate::KbAnalysisSignalSeverity::Low,
transaction.signature.clone(),
Some(observation_id),
None,
payload_value,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
Ok(fetched)
}
}
}
async fn persist_fluxbeam_event(
&self,
transaction: &crate::KbChainTransactionDto,
@@ -2244,4 +2458,89 @@ mod tests {
Some("So11111111111111111111111111111111111111112".to_string())
);
}
async fn seed_projected_dexlab_transaction(
database: std::sync::Arc<crate::KbDatabase>,
signature: &str,
) {
let service = crate::KbTransactionModelService::new(database);
let resolved_transaction = serde_json::json!({
"slot": 999009,
"blockTime": 1779000009,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::KB_DEXLAB_PROGRAM_ID,
"program": "dexlab",
"stackHeight": 1,
"accounts": [
"DexlabDecodePool111",
"DexlabDecodeTokenA111",
"So11111111111111111111111111111111111111112",
"DexlabDecodeCreator111"
],
"parsed": {
"info": {
"instruction": "create_pool",
"pool": "DexlabDecodePool111",
"tokenA": "DexlabDecodeTokenA111",
"tokenB": "So11111111111111111111111111111111111111112",
"payer": "DexlabDecodeCreator111",
"feeTier": "0.3%"
}
},
"data": "opaque"
}
]
}
},
"meta": {
"err": null,
"logMessages": [
"Program log: Instruction: CreatePool"
]
}
});
let persist_result = service
.persist_resolved_transaction(
signature,
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await;
if let Err(error) = persist_result {
panic!("projection must succeed: {}", error);
}
}
#[tokio::test]
async fn decode_transaction_by_signature_persists_decoded_dexlab_event() {
let database = make_database().await;
seed_projected_dexlab_transaction(database.clone(), "sig-dex-decode-dexlab-1").await;
let service = crate::KbDexDecodeService::new(database.clone());
let decoded_result = service
.decode_transaction_by_signature("sig-dex-decode-dexlab-1")
.await;
let decoded = match decoded_result {
Ok(decoded) => decoded,
Err(error) => panic!("decode must succeed: {}", error),
};
assert_eq!(decoded.len(), 1);
assert_eq!(decoded[0].protocol_name, "dexlab");
assert_eq!(decoded[0].event_kind, "dexlab.create_pool");
assert_eq!(
decoded[0].pool_account,
Some("DexlabDecodePool111".to_string())
);
assert_eq!(
decoded[0].token_a_mint,
Some("DexlabDecodeTokenA111".to_string())
);
assert_eq!(
decoded[0].token_b_mint,
Some("So11111111111111111111111111111111111111112".to_string())
);
}
}