Files
khadhroony-bobobot/kb_lib/src/dex_decoded_event_materialization.rs
2026-05-12 21:40:03 +02:00

180 lines
6.8 KiB
Rust

// file: kb_lib/src/dex_decoded_event_materialization.rs
//! Decoded DEX event materialization helpers.
//!
//! This module centralizes persistence of decoded DEX events:
//! payload enrichment, upsert, fetch-after-upsert, observation recording
//! and signal recording.
/// Input required to persist one decoded DEX event.
pub(crate) struct DexDecodedEventMaterializationInput<'a> {
/// Database connection.
pub(crate) database: &'a crate::Database,
/// Detection persistence service.
pub(crate) persistence: &'a crate::DetectionPersistenceService,
/// Parent transaction.
pub(crate) transaction: &'a crate::ChainTransactionDto,
/// Internal transaction id.
pub(crate) transaction_id: i64,
/// Optional internal instruction id.
pub(crate) instruction_id: std::option::Option<i64>,
/// Stable protocol name.
pub(crate) protocol_name: std::string::String,
/// Program id that produced the event.
pub(crate) program_id: std::string::String,
/// Stable decoded event kind.
pub(crate) event_kind: std::string::String,
/// Optional pool account.
pub(crate) pool_account: std::option::Option<std::string::String>,
/// Optional market account.
pub(crate) market_account: std::option::Option<std::string::String>,
/// Optional token A mint.
pub(crate) token_a_mint: std::option::Option<std::string::String>,
/// Optional token B mint.
pub(crate) token_b_mint: std::option::Option<std::string::String>,
/// Optional LP mint or protocol-specific secondary mint.
pub(crate) lp_mint: std::option::Option<std::string::String>,
/// Payload used for classification enrichment and DB storage.
pub(crate) enrichment_payload_json: serde_json::Value,
/// Payload recorded in the detection observation.
pub(crate) observation_payload_json: serde_json::Value,
/// Detection observation kind.
pub(crate) observation_kind: std::string::String,
/// Detection signal kind.
pub(crate) signal_kind: std::string::String,
/// Diagnostic message emitted when fetch-after-upsert fails.
pub(crate) missing_after_upsert_message: std::string::String,
}
/// Persists one decoded DEX event and records its first-seen observation/signal.
pub(crate) async fn materialize_dex_decoded_event(
input: crate::dex_decoded_event_materialization::DexDecodedEventMaterializationInput<'_>,
) -> Result<crate::DexDecodedEventDto, crate::Error> {
let enrichment_payload_json =
crate::dex_decoded_event_materialization::prepare_payload_for_transaction_status(
input.transaction,
input.enrichment_payload_json,
);
let payload_json_result = crate::enrich_and_serialize_dex_decoded_payload(
input.protocol_name.as_str(),
input.event_kind.as_str(),
enrichment_payload_json,
);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => return Err(error),
};
let existing_result = crate::query_dex_decoded_events_get_by_key(
input.database,
input.transaction_id,
input.instruction_id,
input.event_kind.as_str(),
)
.await;
let existing_option = match existing_result {
Ok(existing_option) => existing_option,
Err(error) => return Err(error),
};
let already_present = existing_option.is_some();
let dto = crate::DexDecodedEventDto::new(
input.transaction_id,
input.instruction_id,
input.protocol_name,
input.program_id,
input.event_kind.clone(),
input.pool_account,
input.market_account,
input.token_a_mint,
input.token_b_mint,
input.lp_mint,
payload_json,
);
let upsert_result = crate::query_dex_decoded_events_upsert(input.database, &dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
let fetched_result = crate::query_dex_decoded_events_get_by_key(
input.database,
input.transaction_id,
input.instruction_id,
input.event_kind.as_str(),
)
.await;
let fetched_option = match fetched_result {
Ok(fetched_option) => fetched_option,
Err(error) => return Err(error),
};
let fetched = match fetched_option {
Some(fetched) => fetched,
None => {
return Err(crate::Error::InvalidState(input.missing_after_upsert_message));
},
};
if !already_present {
let observation_result = input
.persistence
.record_observation(&crate::DetectionObservationInput::new(
input.observation_kind,
crate::ObservationSourceKind::HttpRpc,
input.transaction.source_endpoint_name.clone(),
input.transaction.signature.clone(),
input.transaction.slot,
input.observation_payload_json.clone(),
))
.await;
let observation_id = match observation_result {
Ok(observation_id) => observation_id,
Err(error) => return Err(error),
};
let signal_result = input
.persistence
.record_signal(&crate::DetectionSignalInput::new(
input.signal_kind,
crate::AnalysisSignalSeverity::Low,
input.transaction.signature.clone(),
Some(observation_id),
None,
input.observation_payload_json,
))
.await;
if let Err(error) = signal_result {
return Err(error);
}
}
return Ok(fetched);
}
fn prepare_payload_for_transaction_status(
transaction: &crate::ChainTransactionDto,
payload_json: serde_json::Value,
) -> serde_json::Value {
if transaction.err_json.is_none() {
return payload_json;
}
let mut object = match payload_json {
serde_json::Value::Object(object) => object,
other => {
let mut object = serde_json::Map::new();
object.insert("rawPayload".to_string(), other);
object
},
};
object.insert("transactionFailed".to_string(), serde_json::Value::Bool(true));
object.insert(
"eventActionability".to_string(),
serde_json::Value::String("failed_transaction".to_string()),
);
object.insert("nonTradeUseful".to_string(), serde_json::Value::Bool(false));
object.insert("tradeCandidate".to_string(), serde_json::Value::Bool(false));
object.insert("candleCandidate".to_string(), serde_json::Value::Bool(false));
object.insert(
"skipTradeReason".to_string(),
serde_json::Value::String("failed_transaction".to_string()),
);
object.insert(
"skipCandleReason".to_string(),
serde_json::Value::String("failed_transaction".to_string()),
);
return serde_json::Value::Object(object);
}