0.7.4
This commit is contained in:
@@ -8,6 +8,8 @@ pub struct KbDexDecodeService {
|
||||
database: std::sync::Arc<crate::KbDatabase>,
|
||||
persistence: crate::KbDetectionPersistenceService,
|
||||
raydium_amm_v4_decoder: crate::KbRaydiumAmmV4Decoder,
|
||||
pump_fun_decoder: crate::KbPumpFunDecoder,
|
||||
pump_swap_decoder: crate::KbPumpSwapDecoder,
|
||||
}
|
||||
|
||||
impl KbDexDecodeService {
|
||||
@@ -18,6 +20,8 @@ impl KbDexDecodeService {
|
||||
database,
|
||||
persistence,
|
||||
raydium_amm_v4_decoder: crate::KbRaydiumAmmV4Decoder::new(),
|
||||
pump_fun_decoder: crate::KbPumpFunDecoder::new(),
|
||||
pump_swap_decoder: crate::KbPumpSwapDecoder::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,15 +64,15 @@ impl KbDexDecodeService {
|
||||
Ok(instructions) => instructions,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let decoded_result = self
|
||||
let mut persisted = std::vec::Vec::new();
|
||||
let raydium_decoded_result = self
|
||||
.raydium_amm_v4_decoder
|
||||
.decode_transaction(&transaction, &instructions);
|
||||
let decoded = match decoded_result {
|
||||
Ok(decoded) => decoded,
|
||||
let raydium_decoded = match raydium_decoded_result {
|
||||
Ok(raydium_decoded) => raydium_decoded,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let mut persisted = std::vec::Vec::new();
|
||||
for decoded_event in &decoded {
|
||||
for decoded_event in &raydium_decoded {
|
||||
let persist_result = self
|
||||
.persist_raydium_event(&transaction, decoded_event)
|
||||
.await;
|
||||
@@ -78,6 +82,40 @@ impl KbDexDecodeService {
|
||||
};
|
||||
persisted.push(persisted_event);
|
||||
}
|
||||
let pump_fun_decoded_result = self
|
||||
.pump_fun_decoder
|
||||
.decode_transaction(&transaction, &instructions);
|
||||
let pump_fun_decoded = match pump_fun_decoded_result {
|
||||
Ok(pump_fun_decoded) => pump_fun_decoded,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
for decoded_event in &pump_fun_decoded {
|
||||
let persist_result = self
|
||||
.persist_pump_fun_event(&transaction, decoded_event)
|
||||
.await;
|
||||
let persisted_event = match persist_result {
|
||||
Ok(persisted_event) => persisted_event,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
persisted.push(persisted_event);
|
||||
}
|
||||
let pump_swap_decoded_result = self
|
||||
.pump_swap_decoder
|
||||
.decode_transaction(&transaction, &instructions);
|
||||
let pump_swap_decoded = match pump_swap_decoded_result {
|
||||
Ok(pump_swap_decoded) => pump_swap_decoded,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
for decoded_event in &pump_swap_decoded {
|
||||
let persist_result = self
|
||||
.persist_pump_swap_event(&transaction, decoded_event)
|
||||
.await;
|
||||
let persisted_event = match persist_result {
|
||||
Ok(persisted_event) => persisted_event,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
persisted.push(persisted_event);
|
||||
}
|
||||
Ok(persisted)
|
||||
}
|
||||
|
||||
@@ -183,6 +221,239 @@ impl KbDexDecodeService {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn persist_pump_fun_event(
|
||||
&self,
|
||||
transaction: &crate::KbChainTransactionDto,
|
||||
decoded_event: &crate::KbPumpFunDecodedEvent,
|
||||
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
|
||||
match decoded_event {
|
||||
crate::KbPumpFunDecodedEvent::CreateV2Token(event) => {
|
||||
let payload_json_result = serde_json::to_string(&event.payload_json);
|
||||
let payload_json = match payload_json_result {
|
||||
Ok(payload_json) => payload_json,
|
||||
Err(error) => {
|
||||
return Err(crate::KbError::Json(format!(
|
||||
"cannot serialize decoded pump.fun payload: {}",
|
||||
error
|
||||
)));
|
||||
}
|
||||
};
|
||||
let existing_result = crate::get_dex_decoded_event_by_key(
|
||||
self.database.as_ref(),
|
||||
event.transaction_id,
|
||||
Some(event.instruction_id),
|
||||
"pump_fun.create_v2_token",
|
||||
)
|
||||
.await;
|
||||
let existing_option = match existing_result {
|
||||
Ok(existing_option) => existing_option,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let already_present = existing_option.is_some();
|
||||
let dto = crate::KbDexDecodedEventDto::new(
|
||||
event.transaction_id,
|
||||
Some(event.instruction_id),
|
||||
"pump_fun".to_string(),
|
||||
event.program_id.clone(),
|
||||
"pump_fun.create_v2_token".to_string(),
|
||||
event.bonding_curve.clone(),
|
||||
None,
|
||||
event.mint.clone(),
|
||||
Some(crate::WSOL_MINT_ID.to_string()),
|
||||
event.associated_bonding_curve.clone(),
|
||||
payload_json,
|
||||
);
|
||||
let upsert_result =
|
||||
crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
|
||||
if let Err(error) = upsert_result {
|
||||
return Err(error);
|
||||
}
|
||||
let fetched_result = crate::get_dex_decoded_event_by_key(
|
||||
self.database.as_ref(),
|
||||
event.transaction_id,
|
||||
Some(event.instruction_id),
|
||||
"pump_fun.create_v2_token",
|
||||
)
|
||||
.await;
|
||||
let fetched_option = match fetched_result {
|
||||
Ok(fetched_option) => fetched_option,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let fetched = match fetched_option {
|
||||
Some(fetched) => fetched,
|
||||
None => {
|
||||
return Err(crate::KbError::InvalidState(
|
||||
"decoded event disappeared after upsert".to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
if !already_present {
|
||||
let payload_value = event.payload_json.clone();
|
||||
let observation_result = self
|
||||
.persistence
|
||||
.record_observation(&crate::KbDetectionObservationInput::new(
|
||||
"dex.pump_fun.create_v2_token".to_string(),
|
||||
crate::KbObservationSourceKind::HttpRpc,
|
||||
transaction.source_endpoint_name.clone(),
|
||||
transaction.signature.clone(),
|
||||
transaction.slot,
|
||||
payload_value.clone(),
|
||||
))
|
||||
.await;
|
||||
let observation_id = match observation_result {
|
||||
Ok(observation_id) => observation_id,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let signal_result = self
|
||||
.persistence
|
||||
.record_signal(&crate::KbDetectionSignalInput::new(
|
||||
"signal.dex.pump_fun.create_v2_token".to_string(),
|
||||
crate::KbAnalysisSignalSeverity::Low,
|
||||
transaction.signature.clone(),
|
||||
Some(observation_id),
|
||||
None,
|
||||
payload_value,
|
||||
))
|
||||
.await;
|
||||
if let Err(error) = signal_result {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
Ok(fetched)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn persist_pump_swap_event(
|
||||
&self,
|
||||
transaction: &crate::KbChainTransactionDto,
|
||||
decoded_event: &crate::KbPumpSwapDecodedEvent,
|
||||
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
|
||||
match decoded_event {
|
||||
crate::KbPumpSwapDecodedEvent::BuyTrade(event) => {
|
||||
self.persist_pump_swap_trade_event(
|
||||
transaction,
|
||||
event,
|
||||
"pump_swap.buy",
|
||||
"signal.dex.pump_swap.buy",
|
||||
"dex.pump_swap.buy",
|
||||
)
|
||||
.await
|
||||
}
|
||||
crate::KbPumpSwapDecodedEvent::SellTrade(event) => {
|
||||
self.persist_pump_swap_trade_event(
|
||||
transaction,
|
||||
event,
|
||||
"pump_swap.sell",
|
||||
"signal.dex.pump_swap.sell",
|
||||
"dex.pump_swap.sell",
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn persist_pump_swap_trade_event(
|
||||
&self,
|
||||
transaction: &crate::KbChainTransactionDto,
|
||||
event: &crate::KbPumpSwapTradeDecoded,
|
||||
event_kind: &str,
|
||||
signal_kind: &str,
|
||||
observation_kind: &str,
|
||||
) -> Result<crate::KbDexDecodedEventDto, crate::KbError> {
|
||||
let payload_json_result = serde_json::to_string(&event.payload_json);
|
||||
let payload_json = match payload_json_result {
|
||||
Ok(payload_json) => payload_json,
|
||||
Err(error) => {
|
||||
return Err(crate::KbError::Json(format!(
|
||||
"cannot serialize decoded pump swap payload: {}",
|
||||
error
|
||||
)));
|
||||
}
|
||||
};
|
||||
let existing_result = crate::get_dex_decoded_event_by_key(
|
||||
self.database.as_ref(),
|
||||
event.transaction_id,
|
||||
Some(event.instruction_id),
|
||||
event_kind,
|
||||
)
|
||||
.await;
|
||||
let existing_option = match existing_result {
|
||||
Ok(existing_option) => existing_option,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let already_present = existing_option.is_some();
|
||||
let dto = crate::KbDexDecodedEventDto::new(
|
||||
event.transaction_id,
|
||||
Some(event.instruction_id),
|
||||
"pump_swap".to_string(),
|
||||
event.program_id.clone(),
|
||||
event_kind.to_string(),
|
||||
event.pool_account.clone(),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
payload_json,
|
||||
);
|
||||
let upsert_result = crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await;
|
||||
if let Err(error) = upsert_result {
|
||||
return Err(error);
|
||||
}
|
||||
let fetched_result = crate::get_dex_decoded_event_by_key(
|
||||
self.database.as_ref(),
|
||||
event.transaction_id,
|
||||
Some(event.instruction_id),
|
||||
event_kind,
|
||||
)
|
||||
.await;
|
||||
let fetched_option = match fetched_result {
|
||||
Ok(fetched_option) => fetched_option,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let fetched = match fetched_option {
|
||||
Some(fetched) => fetched,
|
||||
None => {
|
||||
return Err(crate::KbError::InvalidState(
|
||||
"decoded event disappeared after upsert".to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
if !already_present {
|
||||
let payload_value = event.payload_json.clone();
|
||||
let observation_result = self
|
||||
.persistence
|
||||
.record_observation(&crate::KbDetectionObservationInput::new(
|
||||
observation_kind.to_string(),
|
||||
crate::KbObservationSourceKind::HttpRpc,
|
||||
transaction.source_endpoint_name.clone(),
|
||||
transaction.signature.clone(),
|
||||
transaction.slot,
|
||||
payload_value.clone(),
|
||||
))
|
||||
.await;
|
||||
let observation_id = match observation_result {
|
||||
Ok(observation_id) => observation_id,
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
let signal_result = self
|
||||
.persistence
|
||||
.record_signal(&crate::KbDetectionSignalInput::new(
|
||||
signal_kind.to_string(),
|
||||
crate::KbAnalysisSignalSeverity::Low,
|
||||
transaction.signature.clone(),
|
||||
Some(observation_id),
|
||||
None,
|
||||
payload_value,
|
||||
))
|
||||
.await;
|
||||
if let Err(error) = signal_result {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
Ok(fetched)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -214,7 +485,7 @@ mod tests {
|
||||
std::sync::Arc::new(database)
|
||||
}
|
||||
|
||||
async fn seed_projected_transaction(
|
||||
async fn seed_projected_raydium_transaction(
|
||||
database: std::sync::Arc<crate::KbDatabase>,
|
||||
signature: &str,
|
||||
) {
|
||||
@@ -273,10 +544,61 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn seed_projected_pump_fun_transaction(
|
||||
database: std::sync::Arc<crate::KbDatabase>,
|
||||
signature: &str,
|
||||
) {
|
||||
let service = crate::KbTransactionModelService::new(database);
|
||||
let resolved_transaction = serde_json::json!({
|
||||
"slot": 999002,
|
||||
"blockTime": 1779000002,
|
||||
"version": 0,
|
||||
"transaction": {
|
||||
"message": {
|
||||
"instructions": [
|
||||
{
|
||||
"programId": crate::KB_PUMP_FUN_PROGRAM_ID,
|
||||
"program": "pump",
|
||||
"stackHeight": 1,
|
||||
"accounts": [
|
||||
"MintPF111",
|
||||
"MintAuthorityPF111",
|
||||
"BondingCurvePF111",
|
||||
"AssociatedBondingCurvePF111",
|
||||
"GlobalPF111",
|
||||
"CreatorPF111",
|
||||
"System111",
|
||||
"Token2022Program111",
|
||||
"AtaProgram111"
|
||||
],
|
||||
"data": "opaque"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"meta": {
|
||||
"err": null,
|
||||
"logMessages": [
|
||||
"Program log: Instruction: CreateV2"
|
||||
]
|
||||
}
|
||||
});
|
||||
let persist_result = service
|
||||
.persist_resolved_transaction(
|
||||
signature,
|
||||
Some("helius_primary_http".to_string()),
|
||||
&resolved_transaction,
|
||||
)
|
||||
.await;
|
||||
if let Err(error) = persist_result {
|
||||
panic!("projection must succeed: {}", error);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn decode_transaction_by_signature_persists_decoded_event() {
|
||||
async fn decode_transaction_by_signature_persists_decoded_raydium_event() {
|
||||
let database = make_database().await;
|
||||
seed_projected_transaction(database.clone(), "sig-dex-decode-1").await;
|
||||
seed_projected_raydium_transaction(database.clone(), "sig-dex-decode-1").await;
|
||||
let service = crate::KbDexDecodeService::new(database.clone());
|
||||
let decoded_result = service
|
||||
.decode_transaction_by_signature("sig-dex-decode-1")
|
||||
@@ -289,73 +611,27 @@ mod tests {
|
||||
assert_eq!(decoded[0].protocol_name, "raydium_amm_v4");
|
||||
assert_eq!(decoded[0].event_kind, "raydium_amm_v4.initialize2_pool");
|
||||
assert_eq!(decoded[0].pool_account, Some("PoolXYZ".to_string()));
|
||||
let transaction_result =
|
||||
crate::get_chain_transaction_by_signature(database.as_ref(), "sig-dex-decode-1").await;
|
||||
let transaction_option = match transaction_result {
|
||||
Ok(transaction_option) => transaction_option,
|
||||
Err(error) => panic!("transaction fetch must succeed: {}", error),
|
||||
};
|
||||
let transaction = match transaction_option {
|
||||
Some(transaction) => transaction,
|
||||
None => panic!("transaction must exist"),
|
||||
};
|
||||
let transaction_id_option = transaction.id;
|
||||
let transaction_id = match transaction_id_option {
|
||||
Some(transaction_id) => transaction_id,
|
||||
None => panic!("transaction id must exist"),
|
||||
};
|
||||
let listed_result =
|
||||
crate::list_dex_decoded_events_by_transaction_id(database.as_ref(), transaction_id)
|
||||
.await;
|
||||
let listed = match listed_result {
|
||||
Ok(listed) => listed,
|
||||
Err(error) => panic!("dex event list must succeed: {}", error),
|
||||
};
|
||||
assert_eq!(listed.len(), 1);
|
||||
assert_eq!(listed[0].lp_mint, Some("LpMintXYZ".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn decode_transaction_by_signature_is_idempotent_on_same_transaction() {
|
||||
async fn decode_transaction_by_signature_persists_decoded_pump_fun_event() {
|
||||
let database = make_database().await;
|
||||
seed_projected_transaction(database.clone(), "sig-dex-decode-2").await;
|
||||
seed_projected_pump_fun_transaction(database.clone(), "sig-dex-decode-pump-1").await;
|
||||
let service = crate::KbDexDecodeService::new(database.clone());
|
||||
let first_result = service
|
||||
.decode_transaction_by_signature("sig-dex-decode-2")
|
||||
let decoded_result = service
|
||||
.decode_transaction_by_signature("sig-dex-decode-pump-1")
|
||||
.await;
|
||||
if let Err(error) = first_result {
|
||||
panic!("first decode must succeed: {}", error);
|
||||
}
|
||||
let second_result = service
|
||||
.decode_transaction_by_signature("sig-dex-decode-2")
|
||||
.await;
|
||||
let second = match second_result {
|
||||
Ok(second) => second,
|
||||
Err(error) => panic!("second decode must succeed: {}", error),
|
||||
let decoded = match decoded_result {
|
||||
Ok(decoded) => decoded,
|
||||
Err(error) => panic!("decode must succeed: {}", error),
|
||||
};
|
||||
assert_eq!(second.len(), 1);
|
||||
let transaction_result =
|
||||
crate::get_chain_transaction_by_signature(database.as_ref(), "sig-dex-decode-2").await;
|
||||
let transaction_option = match transaction_result {
|
||||
Ok(transaction_option) => transaction_option,
|
||||
Err(error) => panic!("transaction fetch must succeed: {}", error),
|
||||
};
|
||||
let transaction = match transaction_option {
|
||||
Some(transaction) => transaction,
|
||||
None => panic!("transaction must exist"),
|
||||
};
|
||||
let transaction_id_option = transaction.id;
|
||||
let transaction_id = match transaction_id_option {
|
||||
Some(transaction_id) => transaction_id,
|
||||
None => panic!("transaction id must exist"),
|
||||
};
|
||||
let listed_result =
|
||||
crate::list_dex_decoded_events_by_transaction_id(database.as_ref(), transaction_id)
|
||||
.await;
|
||||
let listed = match listed_result {
|
||||
Ok(listed) => listed,
|
||||
Err(error) => panic!("dex event list must succeed: {}", error),
|
||||
};
|
||||
assert_eq!(listed.len(), 1);
|
||||
assert_eq!(decoded.len(), 1);
|
||||
assert_eq!(decoded[0].protocol_name, "pump_fun");
|
||||
assert_eq!(decoded[0].event_kind, "pump_fun.create_v2_token");
|
||||
assert_eq!(
|
||||
decoded[0].pool_account,
|
||||
Some("BondingCurvePF111".to_string())
|
||||
);
|
||||
assert_eq!(decoded[0].token_a_mint, Some("MintPF111".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user