This commit is contained in:
2026-05-05 20:49:45 +02:00
parent f2c227e08f
commit 348e76660c
28 changed files with 3279 additions and 210 deletions

View File

@@ -16,6 +16,7 @@ mod launch_attribution;
mod launch_surface;
mod launch_surface_key;
mod liquidity_event;
mod local_pipeline_diagnostics;
mod observed_token;
mod onchain_observation;
mod pair;
@@ -49,6 +50,23 @@ pub use launch_attribution::KbLaunchAttributionDto;
pub use launch_surface::KbLaunchSurfaceDto;
pub use launch_surface_key::KbLaunchSurfaceKeyDto;
pub use liquidity_event::KbLiquidityEventDto;
pub use local_pipeline_diagnostics::KbLocalDecodedEventDiagnosticSummaryDto;
pub(crate) use local_pipeline_diagnostics::KbLocalDecodedEventDiagnosticSummaryRow;
pub use local_pipeline_diagnostics::KbLocalDexDiagnosticSummaryDto;
pub(crate) use local_pipeline_diagnostics::KbLocalDexDiagnosticSummaryRow;
pub use local_pipeline_diagnostics::KbLocalDuplicateDecodedEventTradeDiagnosticSampleDto;
pub(crate) use local_pipeline_diagnostics::KbLocalDuplicateDecodedEventTradeDiagnosticSampleRow;
pub use local_pipeline_diagnostics::KbLocalMissingTradeEventDiagnosticSampleDto;
pub(crate) use local_pipeline_diagnostics::KbLocalMissingTradeEventDiagnosticSampleRow;
pub use local_pipeline_diagnostics::KbLocalMultiTradeSignaturePairDiagnosticSampleDto;
pub(crate) use local_pipeline_diagnostics::KbLocalMultiTradeSignaturePairDiagnosticSampleRow;
pub use local_pipeline_diagnostics::KbLocalPairDiagnosticSummaryDto;
pub(crate) use local_pipeline_diagnostics::KbLocalPairDiagnosticSummaryRow;
pub use local_pipeline_diagnostics::KbLocalPairGapDiagnosticSampleDto;
pub(crate) use local_pipeline_diagnostics::KbLocalPairGapDiagnosticSampleRow;
pub use local_pipeline_diagnostics::KbLocalPipelineDiagnosticCountersDto;
pub(crate) use local_pipeline_diagnostics::KbLocalPipelineDiagnosticCountersRow;
pub use local_pipeline_diagnostics::KbLocalPipelineDiagnosticSummaryDto;
pub use observed_token::KbObservedTokenDto;
pub use onchain_observation::KbOnchainObservationDto;
pub use pair::KbPairDto;

View File

@@ -0,0 +1,444 @@
// file: kb_lib/src/db/dtos/local_pipeline_diagnostics.rs
//! DTOs for local pipeline diagnostics.
/// Local pipeline diagnostics summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalPipelineDiagnosticSummaryDto {
/// Total persisted chain transactions.
pub transaction_count: i64,
/// Total successful chain transactions.
pub ok_transaction_count: i64,
/// Total failed chain transactions.
pub failed_transaction_count: i64,
/// Total decoded DEX events.
pub decoded_event_count: i64,
/// Total decoded DEX trade candidates.
pub decoded_trade_candidate_count: i64,
/// Total decoded DEX candle candidates.
pub decoded_candle_candidate_count: i64,
/// Whether the local persisted pipeline has no blocking diagnostic issue.
pub diagnostics_clean: bool,
/// Number of blocking diagnostic issues.
///
/// This currently includes actionable missing trade events, invalid trade
/// events, duplicate decoded-event trade rows, and duplicate candle buckets.
pub blocking_issue_count: i64,
/// Total decoded DEX events that are trade candidates but have no trade event,
/// including intentionally ignored failed transactions.
pub missing_trade_event_count: i64,
/// Total persisted trade events.
pub trade_event_count: i64,
/// Explicit alias for decoded trade candidates without linked trade event.
pub decoded_trade_candidate_without_trade_event_count: i64,
/// Trade candidates without linked trade event on successful transactions.
pub decoded_trade_candidate_without_trade_event_on_ok_transaction_count: i64,
/// Trade candidates without linked trade event on failed transactions.
pub decoded_trade_candidate_without_trade_event_on_failed_transaction_count: i64,
/// Actionable missing trade events on successful transactions.
pub actionable_missing_trade_event_count: i64,
/// Ignored missing trade events caused by failed transactions.
pub ignored_failed_transaction_trade_candidate_count: i64,
/// Trade candidates without linked trade event and without explicit base/quote payload amounts.
pub decoded_trade_candidate_without_amount_payload_count: i64,
/// Total trade events with missing or invalid pricing fields.
pub invalid_trade_event_count: i64,
/// Total persisted pair candles.
pub pair_candle_count: i64,
/// Real duplicate trade rows grouped by decoded event id.
pub duplicate_decoded_event_trade_count: i64,
/// Multi-trade groups sharing the same signature and pair id.
pub multi_trade_signature_pair_count: i64,
/// Total duplicate candle buckets.
pub duplicate_candle_bucket_count: i64,
/// Total known tokens.
pub token_count: i64,
/// Total tokens missing symbol or name.
pub token_metadata_missing_count: i64,
/// Total known pools.
pub pool_count: i64,
/// Total known pairs.
pub pair_count: i64,
/// Total pairs without trade event.
pub pair_without_trade_count: i64,
/// Total pairs without candle.
pub pair_without_candle_count: i64,
/// Diagnostics grouped by DEX.
pub dex_summaries: std::vec::Vec<crate::KbLocalDexDiagnosticSummaryDto>,
/// Diagnostics grouped by pair.
pub pair_summaries: std::vec::Vec<crate::KbLocalPairDiagnosticSummaryDto>,
/// Diagnostics grouped by decoded event kind.
pub decoded_event_summaries: std::vec::Vec<crate::KbLocalDecodedEventDiagnosticSummaryDto>,
/// Samples of decoded trade candidates without linked trade event.
pub missing_trade_event_samples:
std::vec::Vec<crate::KbLocalMissingTradeEventDiagnosticSampleDto>,
/// Samples of duplicated trade rows by decoded event id.
pub duplicate_decoded_event_trade_samples:
std::vec::Vec<crate::KbLocalDuplicateDecodedEventTradeDiagnosticSampleDto>,
/// Samples of multi-trade signature/pair groups.
pub multi_trade_signature_pair_samples:
std::vec::Vec<crate::KbLocalMultiTradeSignaturePairDiagnosticSampleDto>,
/// Samples of pairs without trade.
pub pair_without_trade_samples: std::vec::Vec<crate::KbLocalPairGapDiagnosticSampleDto>,
/// Samples of pairs without candle.
pub pair_without_candle_samples: std::vec::Vec<crate::KbLocalPairGapDiagnosticSampleDto>,
}
/// Local DEX diagnostics summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalDexDiagnosticSummaryDto {
/// DEX code or protocol name.
pub dex_code: std::string::String,
/// Total known pools for this DEX.
pub pool_count: i64,
/// Total known pairs for this DEX.
pub pair_count: i64,
/// Total decoded events for this DEX.
pub decoded_event_count: i64,
/// Total decoded trade candidates for this DEX.
pub decoded_trade_candidate_count: i64,
/// Total decoded candle candidates for this DEX.
pub decoded_candle_candidate_count: i64,
/// Total persisted trade events for this DEX.
pub trade_event_count: i64,
/// Total persisted candles for this DEX.
pub pair_candle_count: i64,
}
/// Local pair diagnostics summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalPairDiagnosticSummaryDto {
/// Pair id.
pub pair_id: i64,
/// Pool address.
pub pool_address: std::string::String,
/// DEX code.
pub dex_code: std::string::String,
/// Base token mint.
pub base_mint: std::string::String,
/// Base token symbol.
pub base_symbol: std::option::Option<std::string::String>,
/// Quote token mint.
pub quote_mint: std::string::String,
/// Quote token symbol.
pub quote_symbol: std::option::Option<std::string::String>,
/// Pair symbol.
pub pair_symbol: std::option::Option<std::string::String>,
/// Total decoded events attached to the pool.
pub decoded_event_count: i64,
/// Total decoded trade candidates attached to the pool.
pub decoded_trade_candidate_count: i64,
/// Total decoded candle candidates attached to the pool.
pub decoded_candle_candidate_count: i64,
/// Total trade events attached to the pair.
pub trade_event_count: i64,
/// Total invalid trade events attached to the pair.
pub invalid_trade_event_count: i64,
/// Total candle buckets attached to the pair.
pub pair_candle_count: i64,
/// Last known price.
pub last_price_quote_per_base: std::option::Option<f64>,
}
/// Local decoded-event diagnostics summary.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalDecodedEventDiagnosticSummaryDto {
/// Protocol name.
pub protocol_name: std::string::String,
/// Event kind.
pub event_kind: std::string::String,
/// Event category.
pub event_category: std::option::Option<std::string::String>,
/// Whether payload says this event is a trade candidate.
pub trade_candidate: std::option::Option<bool>,
/// Whether payload says this event is a candle candidate.
pub candle_candidate: std::option::Option<bool>,
/// Event count.
pub event_count: i64,
/// Trade-event count linked to these decoded events.
pub trade_event_count: i64,
}
/// Internal flat counter row for local diagnostics.
#[derive(Debug, Clone)]
pub struct KbLocalPipelineDiagnosticCountersDto {
/// Total persisted chain transactions.
pub transaction_count: i64,
/// Total successful chain transactions.
pub ok_transaction_count: i64,
/// Total failed chain transactions.
pub failed_transaction_count: i64,
/// Total decoded DEX events.
pub decoded_event_count: i64,
/// Total decoded DEX trade candidates.
pub decoded_trade_candidate_count: i64,
/// Total decoded DEX candle candidates.
pub decoded_candle_candidate_count: i64,
/// Total decoded trade candidates without trade event, including ignored failed transactions.
pub missing_trade_event_count: i64,
/// Explicit alias for decoded trade candidates without linked trade event.
pub decoded_trade_candidate_without_trade_event_count: i64,
/// Trade candidates without linked trade event on successful transactions.
pub decoded_trade_candidate_without_trade_event_on_ok_transaction_count: i64,
/// Trade candidates without linked trade event on failed transactions.
pub decoded_trade_candidate_without_trade_event_on_failed_transaction_count: i64,
/// Actionable missing trade events on successful transactions.
pub actionable_missing_trade_event_count: i64,
/// Ignored missing trade events caused by failed transactions.
pub ignored_failed_transaction_trade_candidate_count: i64,
/// Trade candidates without linked trade event and without explicit base/quote payload amounts.
pub decoded_trade_candidate_without_amount_payload_count: i64,
/// Total persisted trade events.
pub trade_event_count: i64,
/// Total invalid trade events.
pub invalid_trade_event_count: i64,
/// Total persisted candles.
pub pair_candle_count: i64,
/// Real duplicate trade rows grouped by decoded event id.
pub duplicate_decoded_event_trade_count: i64,
/// Multi-trade groups sharing the same signature and pair id.
pub multi_trade_signature_pair_count: i64,
/// Total duplicate candle groups.
pub duplicate_candle_bucket_count: i64,
/// Total known tokens.
pub token_count: i64,
/// Total tokens missing metadata.
pub token_metadata_missing_count: i64,
/// Total known pools.
pub pool_count: i64,
/// Total known pairs.
pub pair_count: i64,
/// Total pairs without trade.
pub pair_without_trade_count: i64,
/// Total pairs without candle.
pub pair_without_candle_count: i64,
}
/// SQL row for global local pipeline diagnostic counters.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalPipelineDiagnosticCountersRow {
pub(crate) transaction_count: i64,
pub(crate) ok_transaction_count: i64,
pub(crate) failed_transaction_count: i64,
pub(crate) decoded_event_count: i64,
pub(crate) decoded_trade_candidate_count: i64,
pub(crate) decoded_candle_candidate_count: i64,
pub(crate) missing_trade_event_count: i64,
pub(crate) decoded_trade_candidate_without_trade_event_count: i64,
pub(crate) decoded_trade_candidate_without_trade_event_on_ok_transaction_count: i64,
pub(crate) decoded_trade_candidate_without_trade_event_on_failed_transaction_count: i64,
pub(crate) actionable_missing_trade_event_count: i64,
pub(crate) ignored_failed_transaction_trade_candidate_count: i64,
pub(crate) decoded_trade_candidate_without_amount_payload_count: i64,
pub(crate) trade_event_count: i64,
pub(crate) invalid_trade_event_count: i64,
pub(crate) pair_candle_count: i64,
pub(crate) duplicate_decoded_event_trade_count: i64,
pub(crate) multi_trade_signature_pair_count: i64,
pub(crate) duplicate_candle_bucket_count: i64,
pub(crate) token_count: i64,
pub(crate) token_metadata_missing_count: i64,
pub(crate) pool_count: i64,
pub(crate) pair_count: i64,
pub(crate) pair_without_trade_count: i64,
pub(crate) pair_without_candle_count: i64,
}
/// SQL row for local DEX diagnostics.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalDexDiagnosticSummaryRow {
pub(crate) dex_code: std::string::String,
pub(crate) pool_count: i64,
pub(crate) pair_count: i64,
pub(crate) decoded_event_count: i64,
pub(crate) decoded_trade_candidate_count: i64,
pub(crate) decoded_candle_candidate_count: i64,
pub(crate) trade_event_count: i64,
pub(crate) pair_candle_count: i64,
}
/// SQL row for local pair diagnostics.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalPairDiagnosticSummaryRow {
pub(crate) pair_id: i64,
pub(crate) pool_address: std::string::String,
pub(crate) dex_code: std::string::String,
pub(crate) base_mint: std::string::String,
pub(crate) base_symbol: std::option::Option<std::string::String>,
pub(crate) quote_mint: std::string::String,
pub(crate) quote_symbol: std::option::Option<std::string::String>,
pub(crate) pair_symbol: std::option::Option<std::string::String>,
pub(crate) decoded_event_count: i64,
pub(crate) decoded_trade_candidate_count: i64,
pub(crate) decoded_candle_candidate_count: i64,
pub(crate) trade_event_count: i64,
pub(crate) invalid_trade_event_count: i64,
pub(crate) pair_candle_count: i64,
pub(crate) last_price_quote_per_base: std::option::Option<f64>,
}
/// SQL row for local decoded-event diagnostics.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalDecodedEventDiagnosticSummaryRow {
pub(crate) protocol_name: std::string::String,
pub(crate) event_kind: std::string::String,
pub(crate) event_category: std::option::Option<std::string::String>,
pub(crate) trade_candidate: std::option::Option<i64>,
pub(crate) candle_candidate: std::option::Option<i64>,
pub(crate) event_count: i64,
pub(crate) trade_event_count: i64,
}
/// Sample of a decoded trade candidate without linked trade event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalMissingTradeEventDiagnosticSampleDto {
/// Decoded event id.
pub decoded_event_id: i64,
/// Chain transaction id.
pub transaction_id: std::option::Option<i64>,
/// Transaction signature.
pub signature: std::option::Option<std::string::String>,
/// Protocol name.
pub protocol_name: std::string::String,
/// Event kind.
pub event_kind: std::string::String,
/// Pool account.
pub pool_account: std::option::Option<std::string::String>,
/// Whether the source transaction failed.
pub transaction_failed: bool,
/// Diagnostic reason explaining why no trade event was linked.
pub reason: std::string::String,
/// Whether payload has an explicit base amount.
pub has_base_amount_payload: bool,
/// Whether payload has an explicit quote amount.
pub has_quote_amount_payload: bool,
/// Whether payload has an explicit price.
pub has_price_payload: bool,
}
/// Sample of duplicated trade rows grouped by decoded event id.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalDuplicateDecodedEventTradeDiagnosticSampleDto {
/// Decoded event id.
pub decoded_event_id: i64,
/// Protocol name.
pub protocol_name: std::option::Option<std::string::String>,
/// Event kind.
pub event_kind: std::option::Option<std::string::String>,
/// Pool account.
pub pool_account: std::option::Option<std::string::String>,
/// Duplicate trade row count.
pub trade_event_count: i64,
/// Trade event ids.
pub trade_event_ids: std::option::Option<std::string::String>,
/// Signatures.
pub signatures: std::option::Option<std::string::String>,
}
/// Sample of multi-trade groups sharing the same signature and pair id.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalMultiTradeSignaturePairDiagnosticSampleDto {
/// Transaction signature.
pub signature: std::string::String,
/// Pair id.
pub pair_id: i64,
/// Pool address.
pub pool_address: std::option::Option<std::string::String>,
/// DEX code.
pub dex_code: std::option::Option<std::string::String>,
/// Trade event count.
pub trade_event_count: i64,
/// Distinct decoded event count.
pub decoded_event_count: i64,
/// Trade event ids.
pub trade_event_ids: std::option::Option<std::string::String>,
/// Decoded event ids.
pub decoded_event_ids: std::option::Option<std::string::String>,
}
/// Sample of a pair gap.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbLocalPairGapDiagnosticSampleDto {
/// Pair id.
pub pair_id: i64,
/// Pool address.
pub pool_address: std::string::String,
/// DEX code.
pub dex_code: std::string::String,
/// Base mint.
pub base_mint: std::string::String,
/// Base symbol.
pub base_symbol: std::option::Option<std::string::String>,
/// Quote mint.
pub quote_mint: std::string::String,
/// Quote symbol.
pub quote_symbol: std::option::Option<std::string::String>,
/// Pair symbol.
pub pair_symbol: std::option::Option<std::string::String>,
/// Decoded event count.
pub decoded_event_count: i64,
/// Decoded trade candidate count.
pub decoded_trade_candidate_count: i64,
/// Trade event count.
pub trade_event_count: i64,
/// Pair candle count.
pub pair_candle_count: i64,
}
/// SQL row for missing trade event samples.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalMissingTradeEventDiagnosticSampleRow {
pub(crate) decoded_event_id: i64,
pub(crate) transaction_id: std::option::Option<i64>,
pub(crate) signature: std::option::Option<std::string::String>,
pub(crate) protocol_name: std::string::String,
pub(crate) event_kind: std::string::String,
pub(crate) pool_account: std::option::Option<std::string::String>,
pub(crate) transaction_failed: i64,
pub(crate) reason: std::string::String,
pub(crate) has_base_amount_payload: i64,
pub(crate) has_quote_amount_payload: i64,
pub(crate) has_price_payload: i64,
}
/// SQL row for duplicated decoded event trade samples.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalDuplicateDecodedEventTradeDiagnosticSampleRow {
pub(crate) decoded_event_id: i64,
pub(crate) protocol_name: std::option::Option<std::string::String>,
pub(crate) event_kind: std::option::Option<std::string::String>,
pub(crate) pool_account: std::option::Option<std::string::String>,
pub(crate) trade_event_count: i64,
pub(crate) trade_event_ids: std::option::Option<std::string::String>,
pub(crate) signatures: std::option::Option<std::string::String>,
}
/// SQL row for multi-trade signature/pair samples.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalMultiTradeSignaturePairDiagnosticSampleRow {
pub(crate) signature: std::string::String,
pub(crate) pair_id: i64,
pub(crate) pool_address: std::option::Option<std::string::String>,
pub(crate) dex_code: std::option::Option<std::string::String>,
pub(crate) trade_event_count: i64,
pub(crate) decoded_event_count: i64,
pub(crate) trade_event_ids: std::option::Option<std::string::String>,
pub(crate) decoded_event_ids: std::option::Option<std::string::String>,
}
/// SQL row for pair gap samples.
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct KbLocalPairGapDiagnosticSampleRow {
pub(crate) pair_id: i64,
pub(crate) pool_address: std::string::String,
pub(crate) dex_code: std::string::String,
pub(crate) base_mint: std::string::String,
pub(crate) base_symbol: std::option::Option<std::string::String>,
pub(crate) quote_mint: std::string::String,
pub(crate) quote_symbol: std::option::Option<std::string::String>,
pub(crate) pair_symbol: std::option::Option<std::string::String>,
pub(crate) decoded_event_count: i64,
pub(crate) decoded_trade_candidate_count: i64,
pub(crate) trade_event_count: i64,
pub(crate) pair_candle_count: i64,
}

View File

@@ -20,6 +20,7 @@ mod launch_attribution;
mod launch_surface;
mod launch_surface_key;
mod liquidity_event;
mod local_pipeline_diagnostics;
mod observed_token;
mod onchain_observation;
mod pair;
@@ -42,6 +43,7 @@ mod wallet_participation;
pub use analysis_signal::insert_analysis_signal;
pub use analysis_signal::list_recent_analysis_signals;
pub use chain_instruction::delete_chain_instructions_by_transaction_id;
pub use chain_instruction::get_chain_instruction_by_id;
pub use chain_instruction::insert_chain_instruction;
pub use chain_instruction::list_chain_instructions_by_transaction_id;
pub use chain_slot::get_chain_slot;
@@ -80,6 +82,15 @@ pub use launch_surface_key::list_launch_surface_keys_by_surface_id;
pub use launch_surface_key::upsert_launch_surface_key;
pub use liquidity_event::list_recent_liquidity_events;
pub use liquidity_event::upsert_liquidity_event;
pub use local_pipeline_diagnostics::get_local_pipeline_diagnostic_counters;
pub use local_pipeline_diagnostics::list_local_decoded_event_diagnostic_summaries;
pub use local_pipeline_diagnostics::list_local_dex_diagnostic_summaries;
pub use local_pipeline_diagnostics::list_local_duplicate_decoded_event_trade_diagnostic_samples;
pub use local_pipeline_diagnostics::list_local_missing_trade_event_diagnostic_samples;
pub use local_pipeline_diagnostics::list_local_multi_trade_signature_pair_diagnostic_samples;
pub use local_pipeline_diagnostics::list_local_pair_diagnostic_summaries;
pub use local_pipeline_diagnostics::list_local_pair_without_candle_diagnostic_samples;
pub use local_pipeline_diagnostics::list_local_pair_without_trade_diagnostic_samples;
pub use observed_token::get_observed_token_by_mint;
pub use observed_token::list_observed_tokens;
pub use observed_token::upsert_observed_token;

View File

@@ -65,6 +65,60 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
}
}
/// Reads one chain instruction by its internal id.
pub async fn get_chain_instruction_by_id(
database: &crate::KbDatabase,
instruction_id: i64,
) -> Result<std::option::Option<crate::KbChainInstructionDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbChainInstructionEntity>(
r#"
SELECT
id,
transaction_id,
parent_instruction_id,
instruction_index,
inner_instruction_index,
program_id,
program_name,
stack_height,
accounts_json,
data_json,
parsed_type,
parsed_json,
created_at
FROM kb_chain_instructions
WHERE id = ?
LIMIT 1
"#,
)
.bind(instruction_id)
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot fetch kb_chain_instructions id '{}' on sqlite: {}",
instruction_id, error
)));
},
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbChainInstructionDto::try_from(entity);
match dto_result {
Ok(dto) => return Ok(Some(dto)),
Err(error) => return Err(error),
}
},
None => return Ok(None),
}
},
}
}
/// Lists instructions for one transaction ordered from outer to inner.
pub async fn list_chain_instructions_by_transaction_id(
database: &crate::KbDatabase,

View File

@@ -0,0 +1,836 @@
// file: kb_lib/src/db/queries/local_pipeline_diagnostics.rs
//! Local pipeline diagnostics SQL queries.
/// Returns global local-pipeline diagnostic counters.
pub async fn get_local_pipeline_diagnostic_counters(
database: &crate::KbDatabase,
) -> Result<crate::KbLocalPipelineDiagnosticCountersDto, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let row_result =
sqlx::query_as::<sqlx::Sqlite, crate::KbLocalPipelineDiagnosticCountersRow>(
r#"
SELECT
(SELECT COUNT(*) FROM kb_chain_transactions) AS transaction_count,
(SELECT COUNT(*) FROM kb_chain_transactions WHERE err_json IS NULL) AS ok_transaction_count,
(SELECT COUNT(*) FROM kb_chain_transactions WHERE err_json IS NOT NULL) AS failed_transaction_count,
(SELECT COUNT(*) FROM kb_dex_decoded_events) AS decoded_event_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events
WHERE json_extract(payload_json, '$.tradeCandidate') = 1
) AS decoded_trade_candidate_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events
WHERE json_extract(payload_json, '$.candleCandidate') = 1
) AS decoded_candle_candidate_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
) AS missing_trade_event_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
) AS decoded_trade_candidate_without_trade_event_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
LEFT JOIN kb_chain_transactions ct ON ct.id = dde.transaction_id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
AND ct.id IS NOT NULL
AND ct.err_json IS NULL
) AS decoded_trade_candidate_without_trade_event_on_ok_transaction_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
LEFT JOIN kb_chain_transactions ct ON ct.id = dde.transaction_id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
AND ct.id IS NOT NULL
AND ct.err_json IS NOT NULL
) AS decoded_trade_candidate_without_trade_event_on_failed_transaction_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
LEFT JOIN kb_chain_transactions ct ON ct.id = dde.transaction_id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
AND ct.id IS NOT NULL
AND ct.err_json IS NULL
) AS actionable_missing_trade_event_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
LEFT JOIN kb_chain_transactions ct ON ct.id = dde.transaction_id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
AND ct.id IS NOT NULL
AND ct.err_json IS NOT NULL
) AS ignored_failed_transaction_trade_candidate_count,
(
SELECT COUNT(*)
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
AND (
(
json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL
AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL
)
OR (
json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL
AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL
)
)
) AS decoded_trade_candidate_without_amount_payload_count,
(SELECT COUNT(*) FROM kb_trade_events) AS trade_event_count,
(
SELECT COUNT(*)
FROM kb_trade_events
WHERE base_amount_raw IS NULL
OR quote_amount_raw IS NULL
OR price_quote_per_base IS NULL
OR CAST(base_amount_raw AS INTEGER) <= 0
OR CAST(quote_amount_raw AS INTEGER) <= 0
OR price_quote_per_base <= 0
) AS invalid_trade_event_count,
(SELECT COUNT(*) FROM kb_pair_candles) AS pair_candle_count,
(
SELECT COUNT(*)
FROM (
SELECT decoded_event_id
FROM kb_trade_events
WHERE decoded_event_id IS NOT NULL
GROUP BY decoded_event_id
HAVING COUNT(*) > 1
)
) AS duplicate_decoded_event_trade_count,
(
SELECT COUNT(*)
FROM (
SELECT signature, pair_id
FROM kb_trade_events
GROUP BY signature, pair_id
HAVING COUNT(*) > 1
)
) AS multi_trade_signature_pair_count,
(
SELECT COUNT(*)
FROM (
SELECT pair_id, timeframe_seconds, bucket_start_unix
FROM kb_pair_candles
GROUP BY pair_id, timeframe_seconds, bucket_start_unix
HAVING COUNT(*) > 1
)
) AS duplicate_candle_bucket_count,
(SELECT COUNT(*) FROM kb_tokens) AS token_count,
(
SELECT COUNT(*)
FROM kb_tokens
WHERE symbol IS NULL
OR symbol = ''
OR name IS NULL
OR name = ''
) AS token_metadata_missing_count,
(SELECT COUNT(*) FROM kb_pools) AS pool_count,
(SELECT COUNT(*) FROM kb_pairs) AS pair_count,
(
SELECT COUNT(*)
FROM (
SELECT pair.id
FROM kb_pairs pair
LEFT JOIN kb_trade_events te ON te.pair_id = pair.id
GROUP BY pair.id
HAVING COUNT(te.id) = 0
)
) AS pair_without_trade_count,
(
SELECT COUNT(*)
FROM (
SELECT pair.id
FROM kb_pairs pair
LEFT JOIN kb_pair_candles pc ON pc.pair_id = pair.id
GROUP BY pair.id
HAVING COUNT(pc.pair_id) = 0
)
) AS pair_without_candle_count
"#,
)
.fetch_one(pool)
.await;
let row = match row_result {
Ok(row) => row,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot read local pipeline diagnostic counters on sqlite: {}",
error
)));
},
};
return Ok(crate::KbLocalPipelineDiagnosticCountersDto {
transaction_count: row.transaction_count,
ok_transaction_count: row.ok_transaction_count,
failed_transaction_count: row.failed_transaction_count,
decoded_event_count: row.decoded_event_count,
decoded_trade_candidate_count: row.decoded_trade_candidate_count,
decoded_candle_candidate_count: row.decoded_candle_candidate_count,
missing_trade_event_count: row.missing_trade_event_count,
decoded_trade_candidate_without_trade_event_count: row
.decoded_trade_candidate_without_trade_event_count,
decoded_trade_candidate_without_trade_event_on_ok_transaction_count: row
.decoded_trade_candidate_without_trade_event_on_ok_transaction_count,
decoded_trade_candidate_without_trade_event_on_failed_transaction_count: row
.decoded_trade_candidate_without_trade_event_on_failed_transaction_count,
decoded_trade_candidate_without_amount_payload_count: row
.decoded_trade_candidate_without_amount_payload_count,
trade_event_count: row.trade_event_count,
invalid_trade_event_count: row.invalid_trade_event_count,
pair_candle_count: row.pair_candle_count,
duplicate_decoded_event_trade_count: row.duplicate_decoded_event_trade_count,
multi_trade_signature_pair_count: row.multi_trade_signature_pair_count,
duplicate_candle_bucket_count: row.duplicate_candle_bucket_count,
token_count: row.token_count,
token_metadata_missing_count: row.token_metadata_missing_count,
pool_count: row.pool_count,
pair_count: row.pair_count,
pair_without_trade_count: row.pair_without_trade_count,
pair_without_candle_count: row.pair_without_candle_count,
actionable_missing_trade_event_count: row.actionable_missing_trade_event_count,
ignored_failed_transaction_trade_candidate_count: row
.ignored_failed_transaction_trade_candidate_count,
});
},
}
}
/// Lists local DEX diagnostic summaries.
pub async fn list_local_dex_diagnostic_summaries(
database: &crate::KbDatabase,
) -> Result<std::vec::Vec<crate::KbLocalDexDiagnosticSummaryDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let rows_result = sqlx::query_as::<sqlx::Sqlite, crate::KbLocalDexDiagnosticSummaryRow>(
r#"
WITH decoded AS (
SELECT
protocol_name AS dex_code,
COUNT(*) AS decoded_event_count,
SUM(CASE WHEN json_extract(payload_json, '$.tradeCandidate') = 1 THEN 1 ELSE 0 END) AS decoded_trade_candidate_count,
SUM(CASE WHEN json_extract(payload_json, '$.candleCandidate') = 1 THEN 1 ELSE 0 END) AS decoded_candle_candidate_count
FROM kb_dex_decoded_events
GROUP BY protocol_name
),
dex_pool_pairs AS (
SELECT
d.code AS dex_code,
COUNT(DISTINCT p.id) AS pool_count,
COUNT(DISTINCT pair.id) AS pair_count
FROM kb_dexes d
LEFT JOIN kb_pools p ON p.dex_id = d.id
LEFT JOIN kb_pairs pair ON pair.pool_id = p.id
GROUP BY d.code
),
trades AS (
SELECT
d.code AS dex_code,
COUNT(te.id) AS trade_event_count
FROM kb_trade_events te
JOIN kb_pairs pair ON pair.id = te.pair_id
JOIN kb_pools p ON p.id = pair.pool_id
JOIN kb_dexes d ON d.id = p.dex_id
GROUP BY d.code
),
candles AS (
SELECT
d.code AS dex_code,
COUNT(pc.pair_id) AS pair_candle_count
FROM kb_pair_candles pc
JOIN kb_pairs pair ON pair.id = pc.pair_id
JOIN kb_pools p ON p.id = pair.pool_id
JOIN kb_dexes d ON d.id = p.dex_id
GROUP BY d.code
)
SELECT
COALESCE(dex_pool_pairs.dex_code, decoded.dex_code) AS dex_code,
COALESCE(dex_pool_pairs.pool_count, 0) AS pool_count,
COALESCE(dex_pool_pairs.pair_count, 0) AS pair_count,
COALESCE(decoded.decoded_event_count, 0) AS decoded_event_count,
COALESCE(decoded.decoded_trade_candidate_count, 0) AS decoded_trade_candidate_count,
COALESCE(decoded.decoded_candle_candidate_count, 0) AS decoded_candle_candidate_count,
COALESCE(trades.trade_event_count, 0) AS trade_event_count,
COALESCE(candles.pair_candle_count, 0) AS pair_candle_count
FROM dex_pool_pairs
LEFT JOIN decoded ON decoded.dex_code = dex_pool_pairs.dex_code
LEFT JOIN trades ON trades.dex_code = dex_pool_pairs.dex_code
LEFT JOIN candles ON candles.dex_code = dex_pool_pairs.dex_code
UNION
SELECT
decoded.dex_code AS dex_code,
0 AS pool_count,
0 AS pair_count,
decoded.decoded_event_count AS decoded_event_count,
decoded.decoded_trade_candidate_count AS decoded_trade_candidate_count,
decoded.decoded_candle_candidate_count AS decoded_candle_candidate_count,
0 AS trade_event_count,
0 AS pair_candle_count
FROM decoded
LEFT JOIN dex_pool_pairs ON dex_pool_pairs.dex_code = decoded.dex_code
WHERE dex_pool_pairs.dex_code IS NULL
ORDER BY dex_code
"#,
)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list local dex diagnostic summaries on sqlite: {}",
error
)));
},
};
let mut summaries = std::vec::Vec::new();
for row in rows {
summaries.push(crate::KbLocalDexDiagnosticSummaryDto {
dex_code: row.dex_code,
pool_count: row.pool_count,
pair_count: row.pair_count,
decoded_event_count: row.decoded_event_count,
decoded_trade_candidate_count: row.decoded_trade_candidate_count,
decoded_candle_candidate_count: row.decoded_candle_candidate_count,
trade_event_count: row.trade_event_count,
pair_candle_count: row.pair_candle_count,
});
}
return Ok(summaries);
},
}
}
/// Lists local pair diagnostic summaries.
pub async fn list_local_pair_diagnostic_summaries(
database: &crate::KbDatabase,
) -> Result<std::vec::Vec<crate::KbLocalPairDiagnosticSummaryDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let rows_result = sqlx::query_as::<sqlx::Sqlite, crate::KbLocalPairDiagnosticSummaryRow>(
r#"
SELECT
pair.id AS pair_id,
p.address AS pool_address,
d.code AS dex_code,
base_token.mint AS base_mint,
base_token.symbol AS base_symbol,
quote_token.mint AS quote_mint,
quote_token.symbol AS quote_symbol,
pair.symbol AS pair_symbol,
COUNT(DISTINCT dde.id) AS decoded_event_count,
COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 THEN dde.id END) AS decoded_trade_candidate_count,
COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.candleCandidate') = 1 THEN dde.id END) AS decoded_candle_candidate_count,
COUNT(DISTINCT te.id) AS trade_event_count,
COUNT(
DISTINCT CASE
WHEN te.id IS NOT NULL
AND (
te.base_amount_raw IS NULL
OR te.quote_amount_raw IS NULL
OR te.price_quote_per_base IS NULL
OR CAST(te.base_amount_raw AS INTEGER) <= 0
OR CAST(te.quote_amount_raw AS INTEGER) <= 0
OR te.price_quote_per_base <= 0
)
THEN te.id
END
) AS invalid_trade_event_count,
COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) AS pair_candle_count,
(
SELECT te_last.price_quote_per_base
FROM kb_trade_events te_last
WHERE te_last.pair_id = pair.id
ORDER BY te_last.id DESC
LIMIT 1
) AS last_price_quote_per_base
FROM kb_pairs pair
JOIN kb_pools p ON p.id = pair.pool_id
JOIN kb_dexes d ON d.id = p.dex_id
JOIN kb_tokens base_token ON base_token.id = pair.base_token_id
JOIN kb_tokens quote_token ON quote_token.id = pair.quote_token_id
LEFT JOIN kb_dex_decoded_events dde ON dde.pool_account = p.address
LEFT JOIN kb_trade_events te ON te.pair_id = pair.id
LEFT JOIN kb_pair_candles pc ON pc.pair_id = pair.id
GROUP BY
pair.id,
p.address,
d.code,
base_token.mint,
base_token.symbol,
quote_token.mint,
quote_token.symbol,
pair.symbol
ORDER BY pair.id
"#,
)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list local pair diagnostic summaries on sqlite: {}",
error
)));
},
};
let mut summaries = std::vec::Vec::new();
for row in rows {
summaries.push(crate::KbLocalPairDiagnosticSummaryDto {
pair_id: row.pair_id,
pool_address: row.pool_address,
dex_code: row.dex_code,
base_mint: row.base_mint,
base_symbol: row.base_symbol,
quote_mint: row.quote_mint,
quote_symbol: row.quote_symbol,
pair_symbol: row.pair_symbol,
decoded_event_count: row.decoded_event_count,
decoded_trade_candidate_count: row.decoded_trade_candidate_count,
decoded_candle_candidate_count: row.decoded_candle_candidate_count,
trade_event_count: row.trade_event_count,
invalid_trade_event_count: row.invalid_trade_event_count,
pair_candle_count: row.pair_candle_count,
last_price_quote_per_base: row.last_price_quote_per_base,
});
}
return Ok(summaries);
},
}
}
/// Lists local decoded-event diagnostic summaries.
pub async fn list_local_decoded_event_diagnostic_summaries(
database: &crate::KbDatabase,
) -> Result<std::vec::Vec<crate::KbLocalDecodedEventDiagnosticSummaryDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let rows_result =
sqlx::query_as::<sqlx::Sqlite, crate::KbLocalDecodedEventDiagnosticSummaryRow>(
r#"
SELECT
dde.protocol_name AS protocol_name,
dde.event_kind AS event_kind,
json_extract(dde.payload_json, '$.eventCategory') AS event_category,
json_extract(dde.payload_json, '$.tradeCandidate') AS trade_candidate,
json_extract(dde.payload_json, '$.candleCandidate') AS candle_candidate,
COUNT(dde.id) AS event_count,
COUNT(te.id) AS trade_event_count
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
GROUP BY
dde.protocol_name,
dde.event_kind,
event_category,
trade_candidate,
candle_candidate
ORDER BY
dde.protocol_name,
dde.event_kind
"#,
)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list local decoded event diagnostic summaries on sqlite: {}",
error
)));
},
};
let mut summaries = std::vec::Vec::new();
for row in rows {
summaries.push(crate::KbLocalDecodedEventDiagnosticSummaryDto {
protocol_name: row.protocol_name,
event_kind: row.event_kind,
event_category: row.event_category,
trade_candidate: kb_sqlite_bool_to_option(row.trade_candidate),
candle_candidate: kb_sqlite_bool_to_option(row.candle_candidate),
event_count: row.event_count,
trade_event_count: row.trade_event_count,
});
}
return Ok(summaries);
},
}
}
fn kb_sqlite_bool_to_option(value: std::option::Option<i64>) -> std::option::Option<bool> {
match value {
Some(0) => return Some(false),
Some(_) => return Some(true),
None => return None,
}
}
/// Lists samples of decoded trade candidates without linked trade event.
pub async fn list_local_missing_trade_event_diagnostic_samples(
database: &crate::KbDatabase,
limit: i64,
) -> Result<std::vec::Vec<crate::KbLocalMissingTradeEventDiagnosticSampleDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let rows_result =
sqlx::query_as::<sqlx::Sqlite, crate::KbLocalMissingTradeEventDiagnosticSampleRow>(
r#"
SELECT
dde.id AS decoded_event_id,
dde.transaction_id AS transaction_id,
ct.signature AS signature,
dde.protocol_name AS protocol_name,
dde.event_kind AS event_kind,
dde.pool_account AS pool_account,
CASE WHEN ct.err_json IS NOT NULL THEN 1 ELSE 0 END AS transaction_failed,
CASE
WHEN ct.err_json IS NOT NULL THEN 'failed_transaction'
WHEN (
json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL
AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL
AND json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL
AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL
AND json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL
AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL
) THEN 'ok_transaction_without_amount_payload'
WHEN (
json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL
AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL
) THEN 'ok_transaction_without_base_amount_payload'
WHEN (
json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL
AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL
) THEN 'ok_transaction_without_quote_amount_payload'
WHEN (
json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL
AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL
) THEN 'ok_transaction_without_price_payload'
ELSE 'ok_transaction_unclassified'
END AS reason,
CASE
WHEN json_extract(dde.payload_json, '$.baseAmountRaw') IS NOT NULL
OR json_extract(dde.payload_json, '$.base_amount_raw') IS NOT NULL
THEN 1
ELSE 0
END AS has_base_amount_payload,
CASE
WHEN json_extract(dde.payload_json, '$.quoteAmountRaw') IS NOT NULL
OR json_extract(dde.payload_json, '$.quote_amount_raw') IS NOT NULL
THEN 1
ELSE 0
END AS has_quote_amount_payload,
CASE
WHEN json_extract(dde.payload_json, '$.priceQuotePerBase') IS NOT NULL
OR json_extract(dde.payload_json, '$.price_quote_per_base') IS NOT NULL
THEN 1
ELSE 0
END AS has_price_payload
FROM kb_dex_decoded_events dde
LEFT JOIN kb_trade_events te ON te.decoded_event_id = dde.id
LEFT JOIN kb_chain_transactions ct ON ct.id = dde.transaction_id
WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1
AND te.id IS NULL
ORDER BY
transaction_failed ASC,
dde.protocol_name,
dde.event_kind,
dde.id
LIMIT ?
"#,
)
.bind(limit)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list missing trade event diagnostic samples on sqlite: {}",
error
)));
},
};
let mut samples = std::vec::Vec::new();
for row in rows {
samples.push(crate::KbLocalMissingTradeEventDiagnosticSampleDto {
decoded_event_id: row.decoded_event_id,
transaction_id: row.transaction_id,
signature: row.signature,
protocol_name: row.protocol_name,
event_kind: row.event_kind,
pool_account: row.pool_account,
transaction_failed: kb_sqlite_i64_to_bool(row.transaction_failed),
reason: row.reason,
has_base_amount_payload: kb_sqlite_i64_to_bool(row.has_base_amount_payload),
has_quote_amount_payload: kb_sqlite_i64_to_bool(row.has_quote_amount_payload),
has_price_payload: kb_sqlite_i64_to_bool(row.has_price_payload),
});
}
return Ok(samples);
},
}
}
/// Lists samples of duplicated trade rows by decoded event id.
pub async fn list_local_duplicate_decoded_event_trade_diagnostic_samples(
database: &crate::KbDatabase,
limit: i64,
) -> Result<
std::vec::Vec<crate::KbLocalDuplicateDecodedEventTradeDiagnosticSampleDto>,
crate::KbError,
> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let rows_result = sqlx::query_as::<
sqlx::Sqlite,
crate::KbLocalDuplicateDecodedEventTradeDiagnosticSampleRow,
>(
r#"
SELECT
te.decoded_event_id AS decoded_event_id,
dde.protocol_name AS protocol_name,
dde.event_kind AS event_kind,
dde.pool_account AS pool_account,
COUNT(te.id) AS trade_event_count,
GROUP_CONCAT(te.id) AS trade_event_ids,
GROUP_CONCAT(te.signature) AS signatures
FROM kb_trade_events te
LEFT JOIN kb_dex_decoded_events dde ON dde.id = te.decoded_event_id
WHERE te.decoded_event_id IS NOT NULL
GROUP BY
te.decoded_event_id,
dde.protocol_name,
dde.event_kind,
dde.pool_account
HAVING COUNT(te.id) > 1
ORDER BY trade_event_count DESC, te.decoded_event_id
LIMIT ?
"#,
)
.bind(limit)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list duplicate decoded event trade diagnostic samples on sqlite: {}",
error
)));
},
};
let mut samples = std::vec::Vec::new();
for row in rows {
samples.push(crate::KbLocalDuplicateDecodedEventTradeDiagnosticSampleDto {
decoded_event_id: row.decoded_event_id,
protocol_name: row.protocol_name,
event_kind: row.event_kind,
pool_account: row.pool_account,
trade_event_count: row.trade_event_count,
trade_event_ids: row.trade_event_ids,
signatures: row.signatures,
});
}
return Ok(samples);
},
}
}
/// Lists samples of multi-trade signature/pair groups.
pub async fn list_local_multi_trade_signature_pair_diagnostic_samples(
database: &crate::KbDatabase,
limit: i64,
) -> Result<std::vec::Vec<crate::KbLocalMultiTradeSignaturePairDiagnosticSampleDto>, crate::KbError>
{
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let rows_result = sqlx::query_as::<
sqlx::Sqlite,
crate::KbLocalMultiTradeSignaturePairDiagnosticSampleRow,
>(
r#"
SELECT
te.signature AS signature,
te.pair_id AS pair_id,
p.address AS pool_address,
d.code AS dex_code,
COUNT(te.id) AS trade_event_count,
COUNT(DISTINCT te.decoded_event_id) AS decoded_event_count,
GROUP_CONCAT(te.id) AS trade_event_ids,
GROUP_CONCAT(te.decoded_event_id) AS decoded_event_ids
FROM kb_trade_events te
LEFT JOIN kb_pairs pair ON pair.id = te.pair_id
LEFT JOIN kb_pools p ON p.id = pair.pool_id
LEFT JOIN kb_dexes d ON d.id = p.dex_id
GROUP BY
te.signature,
te.pair_id,
p.address,
d.code
HAVING COUNT(te.id) > 1
ORDER BY trade_event_count DESC, te.signature, te.pair_id
LIMIT ?
"#,
)
.bind(limit)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list multi-trade signature/pair diagnostic samples on sqlite: {}",
error
)));
},
};
let mut samples = std::vec::Vec::new();
for row in rows {
samples.push(crate::KbLocalMultiTradeSignaturePairDiagnosticSampleDto {
signature: row.signature,
pair_id: row.pair_id,
pool_address: row.pool_address,
dex_code: row.dex_code,
trade_event_count: row.trade_event_count,
decoded_event_count: row.decoded_event_count,
trade_event_ids: row.trade_event_ids,
decoded_event_ids: row.decoded_event_ids,
});
}
return Ok(samples);
},
}
}
/// Lists samples of pairs without trade events.
pub async fn list_local_pair_without_trade_diagnostic_samples(
database: &crate::KbDatabase,
limit: i64,
) -> Result<std::vec::Vec<crate::KbLocalPairGapDiagnosticSampleDto>, crate::KbError> {
return kb_list_local_pair_gap_diagnostic_samples(database, limit, true).await;
}
/// Lists samples of pairs without candles.
pub async fn list_local_pair_without_candle_diagnostic_samples(
database: &crate::KbDatabase,
limit: i64,
) -> Result<std::vec::Vec<crate::KbLocalPairGapDiagnosticSampleDto>, crate::KbError> {
return kb_list_local_pair_gap_diagnostic_samples(database, limit, false).await;
}
async fn kb_list_local_pair_gap_diagnostic_samples(
database: &crate::KbDatabase,
limit: i64,
without_trade: bool,
) -> Result<std::vec::Vec<crate::KbLocalPairGapDiagnosticSampleDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let having_clause = if without_trade {
"HAVING COUNT(DISTINCT te.id) = 0"
} else {
"HAVING COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) = 0"
};
let sql = format!(
r#"
SELECT
pair.id AS pair_id,
p.address AS pool_address,
d.code AS dex_code,
base_token.mint AS base_mint,
base_token.symbol AS base_symbol,
quote_token.mint AS quote_mint,
quote_token.symbol AS quote_symbol,
pair.symbol AS pair_symbol,
COUNT(DISTINCT dde.id) AS decoded_event_count,
COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 THEN dde.id END) AS decoded_trade_candidate_count,
COUNT(DISTINCT te.id) AS trade_event_count,
COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) AS pair_candle_count
FROM kb_pairs pair
JOIN kb_pools p ON p.id = pair.pool_id
JOIN kb_dexes d ON d.id = p.dex_id
JOIN kb_tokens base_token ON base_token.id = pair.base_token_id
JOIN kb_tokens quote_token ON quote_token.id = pair.quote_token_id
LEFT JOIN kb_dex_decoded_events dde ON dde.pool_account = p.address
LEFT JOIN kb_trade_events te ON te.pair_id = pair.id
LEFT JOIN kb_pair_candles pc ON pc.pair_id = pair.id
GROUP BY
pair.id,
p.address,
d.code,
base_token.mint,
base_token.symbol,
quote_token.mint,
quote_token.symbol,
pair.symbol
{}
ORDER BY decoded_trade_candidate_count DESC, pair.id
LIMIT ?
"#,
having_clause
);
let rows_result = sqlx::query_as::<
sqlx::Sqlite,
crate::KbLocalPairGapDiagnosticSampleRow,
>(sql.as_str())
.bind(limit)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list pair gap diagnostic samples on sqlite: {}",
error
)));
},
};
let mut samples = std::vec::Vec::new();
for row in rows {
samples.push(crate::KbLocalPairGapDiagnosticSampleDto {
pair_id: row.pair_id,
pool_address: row.pool_address,
dex_code: row.dex_code,
base_mint: row.base_mint,
base_symbol: row.base_symbol,
quote_mint: row.quote_mint,
quote_symbol: row.quote_symbol,
pair_symbol: row.pair_symbol,
decoded_event_count: row.decoded_event_count,
decoded_trade_candidate_count: row.decoded_trade_candidate_count,
trade_event_count: row.trade_event_count,
pair_candle_count: row.pair_candle_count,
});
}
return Ok(samples);
},
}
}
fn kb_sqlite_i64_to_bool(value: i64) -> bool {
return value != 0;
}