// file: kb_lib/src/db/queries/local_pipeline_diagnostics.rs //! Local pipeline diagnostics SQL queries. /// Returns global local-pipeline diagnostic counters. pub async fn query_local_pipeline_diagnostic_get_counters( database: &crate::Database, ) -> Result { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let row_result = sqlx::query_as::( r#" SELECT (SELECT COUNT(*) FROM k_sol_chain_transactions) AS transaction_count, (SELECT COUNT(*) FROM k_sol_chain_transactions WHERE err_json IS NULL) AS ok_transaction_count, (SELECT COUNT(*) FROM k_sol_chain_transactions WHERE err_json IS NOT NULL) AS failed_transaction_count, (SELECT COUNT(*) FROM k_sol_dex_decoded_events) AS decoded_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events WHERE json_extract(payload_json, '$.tradeCandidate') = 1 ) AS decoded_trade_candidate_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events WHERE json_extract(payload_json, '$.candleCandidate') = 1 ) AS decoded_candle_candidate_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events WHERE COALESCE(json_extract(payload_json, '$.nonTradeUseful'), 0) = 1 OR COALESCE(json_extract(payload_json, '$.eventActionability'), '') = 'non_trade_useful' ) AS decoded_non_trade_useful_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events WHERE COALESCE(json_extract(payload_json, '$.eventActionability'), '') = 'non_actionable_trade' OR ( COALESCE(json_extract(payload_json, '$.eventActionability'), '') = '' AND COALESCE(json_extract(payload_json, '$.eventCategory'), '') = 'trade' AND COALESCE(json_extract(payload_json, '$.tradeCandidate'), 0) = 0 AND COALESCE(json_extract(payload_json, '$.transactionFailed'), 0) = 0 ) ) AS decoded_non_actionable_trade_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events WHERE COALESCE(json_extract(payload_json, '$.eventCategory'), 'unknown') = 'unknown' ) AS decoded_unknown_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL ) AS missing_trade_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL ) AS decoded_trade_candidate_without_trade_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL AND ct.id IS NOT NULL AND ct.err_json IS NULL AND dde.pool_account IS NOT NULL AND dde.token_a_mint IS NOT NULL AND dde.token_b_mint IS NOT NULL AND EXISTS ( SELECT 1 FROM k_sol_pools p JOIN k_sol_pairs pair ON pair.pool_id = p.id WHERE p.address = dde.pool_account ) ) AS decoded_trade_candidate_without_trade_event_on_ok_transaction_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL AND ct.id IS NOT NULL AND ct.err_json IS NOT NULL ) AS decoded_trade_candidate_without_trade_event_on_failed_transaction_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL AND ct.id IS NOT NULL AND ct.err_json IS NULL AND dde.pool_account IS NOT NULL AND dde.token_a_mint IS NOT NULL AND dde.token_b_mint IS NOT NULL AND EXISTS ( SELECT 1 FROM k_sol_pools p JOIN k_sol_pairs pair ON pair.pool_id = p.id WHERE p.address = dde.pool_account ) ) AS actionable_missing_trade_event_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL AND ct.id IS NOT NULL AND ct.err_json IS NOT NULL ) AS ignored_failed_transaction_trade_candidate_count, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL AND ( ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL ) OR ( json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL ) ) ) AS decoded_trade_candidate_without_amount_payload_count, (SELECT COUNT(*) FROM k_sol_trade_events) AS trade_event_count, ( SELECT COUNT(*) FROM k_sol_trade_events WHERE base_amount_raw IS NULL OR quote_amount_raw IS NULL OR price_quote_per_base IS NULL OR CAST(base_amount_raw AS INTEGER) <= 0 OR CAST(quote_amount_raw AS INTEGER) <= 0 OR price_quote_per_base <= 0 ) AS invalid_trade_event_count, (SELECT COUNT(*) FROM k_sol_pair_candles) AS pair_candle_count, ( SELECT COUNT(*) FROM ( SELECT decoded_event_id FROM k_sol_trade_events WHERE decoded_event_id IS NOT NULL GROUP BY decoded_event_id HAVING COUNT(*) > 1 ) ) AS duplicate_decoded_event_trade_count, ( SELECT COUNT(*) FROM ( SELECT signature, pair_id FROM k_sol_trade_events GROUP BY signature, pair_id HAVING COUNT(*) > 1 ) ) AS multi_trade_signature_pair_count, ( SELECT COUNT(*) FROM ( SELECT pair_id, timeframe_seconds, bucket_start_unix FROM k_sol_pair_candles GROUP BY pair_id, timeframe_seconds, bucket_start_unix HAVING COUNT(*) > 1 ) ) AS duplicate_candle_bucket_count, (SELECT COUNT(*) FROM k_sol_tokens) AS token_count, ( SELECT COUNT(*) FROM k_sol_tokens WHERE symbol IS NULL OR symbol = '' OR name IS NULL OR name = '' ) AS token_metadata_missing_count, (SELECT COUNT(*) FROM k_sol_pools) AS pool_count, (SELECT COUNT(*) FROM k_sol_pairs) AS pair_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair LEFT JOIN k_sol_trade_events te ON te.pair_id = pair.id GROUP BY pair.id HAVING COUNT(DISTINCT te.id) = 0 ) ) AS literal_pair_without_trade_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair LEFT JOIN k_sol_pair_candles pc ON pc.pair_id = pair.id GROUP BY pair.id HAVING COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) = 0 ) ) AS literal_pair_without_candle_count, ( SELECT COUNT(DISTINCT pair_id) FROM k_sol_trade_events ) AS trade_materialized_pair_count, ( SELECT COUNT(DISTINCT pair_id) FROM k_sol_pair_candles ) AS candle_materialized_pair_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND ct.err_json IS NULL GROUP BY pair.id ) ) AS actionable_pair_count, ( SELECT COUNT(DISTINCT timeframe_seconds) FROM k_sol_pair_candles ) AS candle_bucket_timeframe_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL GROUP BY pair.id HAVING COUNT(DISTINCT CASE WHEN ct.id IS NOT NULL AND ct.err_json IS NULL AND dde.pool_account IS NOT NULL AND dde.token_a_mint IS NOT NULL AND dde.token_b_mint IS NOT NULL AND EXISTS ( SELECT 1 FROM k_sol_pools known_pool JOIN k_sol_pairs known_pair ON known_pair.pool_id = known_pool.id WHERE known_pool.address = dde.pool_account ) THEN dde.id END) = 0 AND COUNT(DISTINCT dde.id) > 0 ) ) AS non_actionable_pair_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id LEFT JOIN k_sol_trade_events te ON te.pair_id = pair.id GROUP BY pair.id HAVING COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) > 0 AND COUNT(DISTINCT te.id) = 0 ) ) AS blocking_pair_without_trade_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id LEFT JOIN k_sol_trade_events te ON te.pair_id = pair.id GROUP BY pair.id HAVING COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) > 0 AND COUNT(DISTINCT te.id) = 0 ) ) AS pair_without_trade_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id LEFT JOIN k_sol_pair_candles pc ON pc.pair_id = pair.id GROUP BY pair.id HAVING COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.candleCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) > 0 AND COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) = 0 ) ) AS blocking_pair_without_candle_count, ( SELECT COUNT(*) FROM ( SELECT pair.id FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id LEFT JOIN k_sol_pair_candles pc ON pc.pair_id = pair.id GROUP BY pair.id HAVING COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.candleCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) > 0 AND COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) = 0 ) ) AS pair_without_candle_count "#, ) .fetch_one(pool) .await; let row = match row_result { Ok(row) => row, Err(error) => { return Err(crate::Error::Db(format!( "cannot read local pipeline diagnostic counters on sqlite: {}", error ))); }, }; return Ok(crate::LocalPipelineDiagnosticCountersDto { transaction_count: row.transaction_count, ok_transaction_count: row.ok_transaction_count, failed_transaction_count: row.failed_transaction_count, decoded_event_count: row.decoded_event_count, decoded_trade_candidate_count: row.decoded_trade_candidate_count, decoded_candle_candidate_count: row.decoded_candle_candidate_count, decoded_non_trade_useful_event_count: row.decoded_non_trade_useful_event_count, decoded_non_actionable_trade_event_count: row .decoded_non_actionable_trade_event_count, decoded_unknown_event_count: row.decoded_unknown_event_count, missing_trade_event_count: row.missing_trade_event_count, decoded_trade_candidate_without_trade_event_count: row .decoded_trade_candidate_without_trade_event_count, decoded_trade_candidate_without_trade_event_on_ok_transaction_count: row .decoded_trade_candidate_without_trade_event_on_ok_transaction_count, decoded_trade_candidate_without_trade_event_on_failed_transaction_count: row .decoded_trade_candidate_without_trade_event_on_failed_transaction_count, decoded_trade_candidate_without_amount_payload_count: row .decoded_trade_candidate_without_amount_payload_count, trade_event_count: row.trade_event_count, invalid_trade_event_count: row.invalid_trade_event_count, pair_candle_count: row.pair_candle_count, duplicate_decoded_event_trade_count: row.duplicate_decoded_event_trade_count, multi_trade_signature_pair_count: row.multi_trade_signature_pair_count, duplicate_candle_bucket_count: row.duplicate_candle_bucket_count, token_count: row.token_count, token_metadata_missing_count: row.token_metadata_missing_count, pool_count: row.pool_count, pair_count: row.pair_count, literal_pair_without_trade_count: row.literal_pair_without_trade_count, literal_pair_without_candle_count: row.literal_pair_without_candle_count, trade_materialized_pair_count: row.trade_materialized_pair_count, candle_materialized_pair_count: row.candle_materialized_pair_count, actionable_pair_count: row.actionable_pair_count, candle_bucket_timeframe_count: row.candle_bucket_timeframe_count, non_actionable_pair_count: row.non_actionable_pair_count, blocking_pair_without_trade_count: row.blocking_pair_without_trade_count, blocking_pair_without_candle_count: row.blocking_pair_without_candle_count, pair_without_trade_count: row.pair_without_trade_count, pair_without_candle_count: row.pair_without_candle_count, actionable_missing_trade_event_count: row.actionable_missing_trade_event_count, ignored_failed_transaction_trade_candidate_count: row .ignored_failed_transaction_trade_candidate_count, }); }, } } /// Lists local DEX diagnostic summaries. pub async fn query_local_pipeline_diagnostic_list_summaries( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::( r#" WITH decoded AS ( SELECT protocol_name AS dex_code, COUNT(*) AS decoded_event_count, SUM(CASE WHEN json_extract(payload_json, '$.tradeCandidate') = 1 THEN 1 ELSE 0 END) AS decoded_trade_candidate_count, SUM(CASE WHEN json_extract(payload_json, '$.candleCandidate') = 1 THEN 1 ELSE 0 END) AS decoded_candle_candidate_count FROM k_sol_dex_decoded_events GROUP BY protocol_name ), dex_pool_pairs AS ( SELECT d.code AS dex_code, COUNT(DISTINCT p.id) AS pool_count, COUNT(DISTINCT pair.id) AS pair_count FROM k_sol_dexes d LEFT JOIN k_sol_pools p ON p.dex_id = d.id LEFT JOIN k_sol_pairs pair ON pair.pool_id = p.id GROUP BY d.code ), trades AS ( SELECT d.code AS dex_code, COUNT(te.id) AS trade_event_count FROM k_sol_trade_events te JOIN k_sol_pairs pair ON pair.id = te.pair_id JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dexes d ON d.id = p.dex_id GROUP BY d.code ), candles AS ( SELECT d.code AS dex_code, COUNT(pc.pair_id) AS pair_candle_count FROM k_sol_pair_candles pc JOIN k_sol_pairs pair ON pair.id = pc.pair_id JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dexes d ON d.id = p.dex_id GROUP BY d.code ) SELECT COALESCE(dex_pool_pairs.dex_code, decoded.dex_code) AS dex_code, COALESCE(dex_pool_pairs.pool_count, 0) AS pool_count, COALESCE(dex_pool_pairs.pair_count, 0) AS pair_count, COALESCE(decoded.decoded_event_count, 0) AS decoded_event_count, COALESCE(decoded.decoded_trade_candidate_count, 0) AS decoded_trade_candidate_count, COALESCE(decoded.decoded_candle_candidate_count, 0) AS decoded_candle_candidate_count, COALESCE(trades.trade_event_count, 0) AS trade_event_count, COALESCE(candles.pair_candle_count, 0) AS pair_candle_count FROM dex_pool_pairs LEFT JOIN decoded ON decoded.dex_code = dex_pool_pairs.dex_code LEFT JOIN trades ON trades.dex_code = dex_pool_pairs.dex_code LEFT JOIN candles ON candles.dex_code = dex_pool_pairs.dex_code UNION SELECT decoded.dex_code AS dex_code, 0 AS pool_count, 0 AS pair_count, decoded.decoded_event_count AS decoded_event_count, decoded.decoded_trade_candidate_count AS decoded_trade_candidate_count, decoded.decoded_candle_candidate_count AS decoded_candle_candidate_count, 0 AS trade_event_count, 0 AS pair_candle_count FROM decoded LEFT JOIN dex_pool_pairs ON dex_pool_pairs.dex_code = decoded.dex_code WHERE dex_pool_pairs.dex_code IS NULL ORDER BY dex_code "#, ) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list local dex diagnostic summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalDexDiagnosticSummaryDto { dex_code: row.dex_code, pool_count: row.pool_count, pair_count: row.pair_count, decoded_event_count: row.decoded_event_count, decoded_trade_candidate_count: row.decoded_trade_candidate_count, decoded_candle_candidate_count: row.decoded_candle_candidate_count, trade_event_count: row.trade_event_count, pair_candle_count: row.pair_candle_count, }); } return Ok(summaries); }, } } /// Lists local pair diagnostic summaries. pub async fn query_local_pair_diagnostic_list_summaries( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::( r#" SELECT pair.id AS pair_id, p.address AS pool_address, d.code AS dex_code, base_token.mint AS base_mint, base_token.symbol AS base_symbol, quote_token.mint AS quote_mint, quote_token.symbol AS quote_symbol, COALESCE( pair.symbol, CASE WHEN base_token.symbol IS NOT NULL AND base_token.symbol != '' AND quote_token.symbol IS NOT NULL AND quote_token.symbol != '' THEN base_token.symbol || '/' || quote_token.symbol ELSE NULL END ) AS pair_symbol, COUNT(DISTINCT dde.id) AS decoded_event_count, COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 THEN dde.id END) AS decoded_trade_candidate_count, COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.candleCandidate') = 1 THEN dde.id END) AS decoded_candle_candidate_count, COUNT(DISTINCT te.id) AS trade_event_count, COUNT( DISTINCT CASE WHEN te.id IS NOT NULL AND ( te.base_amount_raw IS NULL OR te.quote_amount_raw IS NULL OR te.price_quote_per_base IS NULL OR CAST(te.base_amount_raw AS INTEGER) <= 0 OR CAST(te.quote_amount_raw AS INTEGER) <= 0 OR te.price_quote_per_base <= 0 ) THEN te.id END ) AS invalid_trade_event_count, COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) AS pair_candle_count, ( SELECT te_last.price_quote_per_base FROM k_sol_trade_events te_last WHERE te_last.pair_id = pair.id ORDER BY te_last.id DESC LIMIT 1 ) AS last_price_quote_per_base FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dexes d ON d.id = p.dex_id JOIN k_sol_tokens base_token ON base_token.id = pair.base_token_id JOIN k_sol_tokens quote_token ON quote_token.id = pair.quote_token_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_trade_events te ON te.pair_id = pair.id LEFT JOIN k_sol_pair_candles pc ON pc.pair_id = pair.id GROUP BY pair.id, p.address, d.code, base_token.mint, base_token.symbol, quote_token.mint, quote_token.symbol, pair.symbol ORDER BY pair.id "#, ) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list local pair diagnostic summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalPairDiagnosticSummaryDto { pair_id: row.pair_id, pool_address: row.pool_address, dex_code: row.dex_code, base_mint: row.base_mint, base_symbol: row.base_symbol, quote_mint: row.quote_mint, quote_symbol: row.quote_symbol, pair_symbol: row.pair_symbol, decoded_event_count: row.decoded_event_count, decoded_trade_candidate_count: row.decoded_trade_candidate_count, decoded_candle_candidate_count: row.decoded_candle_candidate_count, trade_event_count: row.trade_event_count, invalid_trade_event_count: row.invalid_trade_event_count, pair_candle_count: row.pair_candle_count, last_price_quote_per_base: row.last_price_quote_per_base, }); } return Ok(summaries); }, } } /// Lists local pair materialization/actionability summaries. pub async fn query_local_pair_actionability_diagnostic_list_summaries( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalPairActionabilityDiagnosticSummaryRow, >( r#" WITH pair_state AS ( SELECT pair.id AS pair_id, COUNT(DISTINCT dde.id) AS decoded_event_count, COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 THEN dde.id END) AS decoded_trade_candidate_count, COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) AS actionable_trade_candidate_count, COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NOT NULL THEN dde.id END) AS failed_trade_candidate_count, COUNT(DISTINCT te.id) AS trade_event_count, COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) AS pair_candle_count FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id LEFT JOIN k_sol_trade_events te ON te.pair_id = pair.id LEFT JOIN k_sol_pair_candles pc ON pc.pair_id = pair.id GROUP BY pair.id ), classified AS ( SELECT CASE WHEN trade_event_count > 0 THEN 'trade_materialized' WHEN actionable_trade_candidate_count > 0 THEN 'actionable_without_materialized_trade' WHEN failed_trade_candidate_count > 0 THEN 'failed_trade_candidate_only' WHEN decoded_trade_candidate_count > 0 THEN 'non_actionable_trade_candidate_only' WHEN decoded_event_count > 0 THEN 'decoded_without_trade_candidate' ELSE 'catalog_only' END AS pair_actionability, pair_id, decoded_event_count, decoded_trade_candidate_count, actionable_trade_candidate_count, failed_trade_candidate_count, trade_event_count, pair_candle_count FROM pair_state ) SELECT pair_actionability AS pair_actionability, COUNT(pair_id) AS pair_count, SUM(decoded_event_count) AS decoded_event_count, SUM(decoded_trade_candidate_count) AS decoded_trade_candidate_count, SUM(actionable_trade_candidate_count) AS actionable_trade_candidate_count, SUM(failed_trade_candidate_count) AS failed_trade_candidate_count, SUM(trade_event_count) AS trade_event_count, SUM(pair_candle_count) AS pair_candle_count FROM classified GROUP BY pair_actionability ORDER BY CASE pair_actionability WHEN 'trade_materialized' THEN 1 WHEN 'actionable_without_materialized_trade' THEN 2 WHEN 'failed_trade_candidate_only' THEN 3 WHEN 'non_actionable_trade_candidate_only' THEN 4 WHEN 'decoded_without_trade_candidate' THEN 5 ELSE 6 END, pair_actionability "#, ) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list local pair actionability diagnostic summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalPairActionabilityDiagnosticSummaryDto { pair_actionability: row.pair_actionability, pair_count: row.pair_count, decoded_event_count: row.decoded_event_count, decoded_trade_candidate_count: row.decoded_trade_candidate_count, actionable_trade_candidate_count: row.actionable_trade_candidate_count, failed_trade_candidate_count: row.failed_trade_candidate_count, trade_event_count: row.trade_event_count, pair_candle_count: row.pair_candle_count, }); } return Ok(summaries); }, } } /// Lists local decoded-event diagnostic summaries. pub async fn query_local_decoded_event_diagnostic_list_summaries( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalDecodedEventDiagnosticSummaryRow, >( r#" SELECT dde.protocol_name AS protocol_name, dde.event_kind AS event_kind, json_extract(dde.payload_json, '$.eventCategory') AS event_category, json_extract(dde.payload_json, '$.eventLifecycleKind') AS event_lifecycle_kind, json_extract(dde.payload_json, '$.eventActionability') AS event_actionability, json_extract(dde.payload_json, '$.nonTradeUseful') AS non_trade_useful, json_extract(dde.payload_json, '$.tradeCandidate') AS trade_candidate, json_extract(dde.payload_json, '$.candleCandidate') AS candle_candidate, COUNT(dde.id) AS event_count, COUNT(te.id) AS trade_event_count FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id GROUP BY dde.protocol_name, dde.event_kind, event_category, event_lifecycle_kind, event_actionability, non_trade_useful, trade_candidate, candle_candidate ORDER BY dde.protocol_name, dde.event_kind "#, ) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list local decoded event diagnostic summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalDecodedEventDiagnosticSummaryDto { protocol_name: row.protocol_name, event_kind: row.event_kind, event_category: row.event_category, event_lifecycle_kind: row.event_lifecycle_kind, event_actionability: row.event_actionability, non_trade_useful: sqlite_bool_to_option(row.non_trade_useful), trade_candidate: sqlite_bool_to_option(row.trade_candidate), candle_candidate: sqlite_bool_to_option(row.candle_candidate), event_count: row.event_count, trade_event_count: row.trade_event_count, }); } return Ok(summaries); }, } } /// Lists local decoded-event classification diagnostic summaries. pub async fn query_local_event_classification_diagnostic_list_summaries( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalEventClassificationDiagnosticSummaryRow, >( r#" SELECT COALESCE(json_extract(dde.payload_json, '$.eventCategory'), 'unknown') AS event_category, COALESCE(json_extract(dde.payload_json, '$.eventLifecycleKind'), 'unknown') AS event_lifecycle_kind, COALESCE(json_extract(dde.payload_json, '$.eventActionability'), 'unknown') AS event_actionability, CASE WHEN COALESCE(json_extract(dde.payload_json, '$.nonTradeUseful'), 0) = 1 THEN 1 ELSE 0 END AS non_trade_useful, COUNT(dde.id) AS event_count, COUNT(CASE WHEN COALESCE(json_extract(dde.payload_json, '$.tradeCandidate'), 0) = 1 THEN dde.id END) AS decoded_trade_candidate_count, COUNT(CASE WHEN COALESCE(json_extract(dde.payload_json, '$.candleCandidate'), 0) = 1 THEN dde.id END) AS decoded_candle_candidate_count, COUNT(te.id) AS trade_event_count FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id GROUP BY event_category, event_lifecycle_kind, event_actionability, non_trade_useful ORDER BY event_category, event_lifecycle_kind, event_actionability "#, ) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list local decoded event classification summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalEventClassificationDiagnosticSummaryDto { event_category: row.event_category, event_lifecycle_kind: row.event_lifecycle_kind, event_actionability: row.event_actionability, non_trade_useful: row.non_trade_useful != 0, event_count: row.event_count, decoded_trade_candidate_count: row.decoded_trade_candidate_count, decoded_candle_candidate_count: row.decoded_candle_candidate_count, trade_event_count: row.trade_event_count, }); } return Ok(summaries); }, } } /// Lists missing trade events grouped by diagnostic reason. pub async fn query_local_missing_trade_event_reason_list_summaries( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalMissingTradeEventReasonSummaryRow, >( r#" WITH missing AS ( SELECT CASE WHEN ct.err_json IS NOT NULL THEN 1 ELSE 0 END AS transaction_failed, CASE WHEN ct.id IS NOT NULL AND ct.err_json IS NULL AND dde.pool_account IS NOT NULL AND dde.token_a_mint IS NOT NULL AND dde.token_b_mint IS NOT NULL AND EXISTS ( SELECT 1 FROM k_sol_pools p JOIN k_sol_pairs pair ON pair.pool_id = p.id WHERE p.address = dde.pool_account ) THEN 1 ELSE 0 END AS actionable, CASE WHEN ct.err_json IS NOT NULL THEN 'failed_transaction' WHEN ct.id IS NULL THEN 'missing_transaction' WHEN dde.pool_account IS NULL THEN 'ok_transaction_without_pool_account' WHEN dde.token_a_mint IS NULL OR dde.token_b_mint IS NULL THEN 'ok_transaction_without_token_mints' WHEN NOT EXISTS ( SELECT 1 FROM k_sol_pools p JOIN k_sol_pairs pair ON pair.pool_id = p.id WHERE p.address = dde.pool_account ) THEN 'ok_transaction_without_known_pair' WHEN ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL AND json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL AND json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL ) THEN 'ok_transaction_without_amount_payload' WHEN ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL ) THEN 'ok_transaction_without_base_amount_payload' WHEN ( json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL ) THEN 'ok_transaction_without_quote_amount_payload' WHEN ( json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL ) THEN 'ok_transaction_without_price_payload' ELSE 'ok_transaction_unclassified' END AS reason, CASE WHEN json_extract(dde.payload_json, '$.baseAmountRaw') IS NOT NULL OR json_extract(dde.payload_json, '$.base_amount_raw') IS NOT NULL THEN 1 ELSE 0 END AS has_base_amount_payload, CASE WHEN json_extract(dde.payload_json, '$.quoteAmountRaw') IS NOT NULL OR json_extract(dde.payload_json, '$.quote_amount_raw') IS NOT NULL THEN 1 ELSE 0 END AS has_quote_amount_payload, CASE WHEN json_extract(dde.payload_json, '$.priceQuotePerBase') IS NOT NULL OR json_extract(dde.payload_json, '$.price_quote_per_base') IS NOT NULL THEN 1 ELSE 0 END AS has_price_payload FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL ) SELECT reason AS reason, transaction_failed AS transaction_failed, actionable AS actionable, COUNT(*) AS event_count, SUM(has_base_amount_payload) AS has_base_amount_payload_count, SUM(has_quote_amount_payload) AS has_quote_amount_payload_count, SUM(has_price_payload) AS has_price_payload_count FROM missing GROUP BY reason, transaction_failed, actionable ORDER BY actionable DESC, transaction_failed ASC, event_count DESC, reason "#, ) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list missing trade event reason summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalMissingTradeEventReasonSummaryDto { reason: row.reason, transaction_failed: sqlite_i64_to_bool(row.transaction_failed), actionable: sqlite_i64_to_bool(row.actionable), event_count: row.event_count, has_base_amount_payload_count: row.has_base_amount_payload_count, has_quote_amount_payload_count: row.has_quote_amount_payload_count, has_price_payload_count: row.has_price_payload_count, }); } return Ok(summaries); }, } } /// Lists pair summaries for non-actionable missing trade events. pub async fn query_local_non_actionable_pair_diagnostic_list_summaries( database: &crate::Database, limit: i64, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalNonActionablePairDiagnosticSummaryRow, >( r#" WITH missing AS ( SELECT pair.id AS pair_id, p.address AS pool_address, d.code AS dex_code, base_token.mint AS base_mint, base_token.symbol AS base_symbol, quote_token.mint AS quote_mint, quote_token.symbol AS quote_symbol, pair.symbol AS pair_symbol, dde.id AS decoded_event_id, CASE WHEN ct.id IS NOT NULL AND ct.err_json IS NULL AND dde.pool_account IS NOT NULL AND dde.token_a_mint IS NOT NULL AND dde.token_b_mint IS NOT NULL AND EXISTS ( SELECT 1 FROM k_sol_pools known_pool JOIN k_sol_pairs known_pair ON known_pair.pool_id = known_pool.id WHERE known_pool.address = dde.pool_account ) THEN 1 ELSE 0 END AS actionable, CASE WHEN ct.err_json IS NOT NULL THEN 'failed_transaction' WHEN ct.id IS NULL THEN 'missing_transaction' WHEN dde.pool_account IS NULL THEN 'ok_transaction_without_pool_account' WHEN dde.token_a_mint IS NULL OR dde.token_b_mint IS NULL THEN 'ok_transaction_without_token_mints' WHEN NOT EXISTS ( SELECT 1 FROM k_sol_pools known_pool JOIN k_sol_pairs known_pair ON known_pair.pool_id = known_pool.id WHERE known_pool.address = dde.pool_account ) THEN 'ok_transaction_without_known_pair' WHEN ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL AND json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL AND json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL ) THEN 'ok_transaction_without_amount_payload' WHEN ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL ) THEN 'ok_transaction_without_base_amount_payload' WHEN ( json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL ) THEN 'ok_transaction_without_quote_amount_payload' WHEN ( json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL ) THEN 'ok_transaction_without_price_payload' ELSE 'ok_transaction_unclassified' END AS reason FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dexes d ON d.id = p.dex_id JOIN k_sol_tokens base_token ON base_token.id = pair.base_token_id JOIN k_sol_tokens quote_token ON quote_token.id = pair.quote_token_id JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL ) SELECT pair_id AS pair_id, pool_address AS pool_address, dex_code AS dex_code, base_mint AS base_mint, base_symbol AS base_symbol, quote_mint AS quote_mint, quote_symbol AS quote_symbol, pair_symbol AS pair_symbol, ( SELECT COUNT(*) FROM k_sol_dex_decoded_events dde_all WHERE dde_all.pool_account = missing.pool_address AND json_extract(dde_all.payload_json, '$.tradeCandidate') = 1 ) AS decoded_trade_candidate_count, COUNT(CASE WHEN actionable = 0 THEN decoded_event_id END) AS non_actionable_missing_trade_event_count, SUM(CASE WHEN actionable = 0 AND reason = 'failed_transaction' THEN 1 ELSE 0 END) AS failed_transaction_candidate_count, SUM(CASE WHEN actionable = 0 AND reason = 'ok_transaction_without_token_mints' THEN 1 ELSE 0 END) AS ok_transaction_without_token_mints_count, SUM(CASE WHEN actionable = 0 AND reason = 'ok_transaction_without_amount_payload' THEN 1 ELSE 0 END) AS ok_transaction_without_amount_payload_count, GROUP_CONCAT(DISTINCT CASE WHEN actionable = 0 THEN reason END) AS reason_summary, ( SELECT COUNT(*) FROM k_sol_trade_events te_pair WHERE te_pair.pair_id = missing.pair_id ) AS trade_event_count, ( SELECT COUNT(*) FROM k_sol_pair_candles pc_pair WHERE pc_pair.pair_id = missing.pair_id ) AS pair_candle_count FROM missing GROUP BY pair_id, pool_address, dex_code, base_mint, base_symbol, quote_mint, quote_symbol, pair_symbol HAVING COUNT(CASE WHEN actionable = 0 THEN decoded_event_id END) > 0 AND COUNT(CASE WHEN actionable = 1 THEN decoded_event_id END) = 0 ORDER BY non_actionable_missing_trade_event_count DESC, pair_id LIMIT ? "#, ) .bind(limit) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list non-actionable pair diagnostic summaries on sqlite: {}", error ))); }, }; let mut summaries = std::vec::Vec::new(); for row in rows { summaries.push(crate::LocalNonActionablePairDiagnosticSummaryDto { pair_id: row.pair_id, pool_address: row.pool_address, dex_code: row.dex_code, base_mint: row.base_mint, base_symbol: row.base_symbol, quote_mint: row.quote_mint, quote_symbol: row.quote_symbol, pair_symbol: row.pair_symbol, decoded_trade_candidate_count: row.decoded_trade_candidate_count, non_actionable_missing_trade_event_count: row .non_actionable_missing_trade_event_count, failed_transaction_candidate_count: row.failed_transaction_candidate_count, ok_transaction_without_token_mints_count: row .ok_transaction_without_token_mints_count, ok_transaction_without_amount_payload_count: row .ok_transaction_without_amount_payload_count, reason_summary: row.reason_summary, trade_event_count: row.trade_event_count, pair_candle_count: row.pair_candle_count, }); } return Ok(summaries); }, } } /// Lists samples of decoded trade candidates without linked trade event. pub async fn query_local_missing_trade_event_diagnostic_list_samples( database: &crate::Database, limit: i64, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::( r#" SELECT dde.id AS decoded_event_id, dde.transaction_id AS transaction_id, ct.signature AS signature, dde.protocol_name AS protocol_name, dde.event_kind AS event_kind, dde.pool_account AS pool_account, CASE WHEN ct.err_json IS NOT NULL THEN 1 ELSE 0 END AS transaction_failed, CASE WHEN ct.id IS NOT NULL AND ct.err_json IS NULL AND dde.pool_account IS NOT NULL AND dde.token_a_mint IS NOT NULL AND dde.token_b_mint IS NOT NULL AND EXISTS ( SELECT 1 FROM k_sol_pools p JOIN k_sol_pairs pair ON pair.pool_id = p.id WHERE p.address = dde.pool_account ) THEN 1 ELSE 0 END AS actionable, CASE WHEN ct.err_json IS NOT NULL THEN 'failed_transaction' WHEN ct.id IS NULL THEN 'missing_transaction' WHEN dde.pool_account IS NULL THEN 'ok_transaction_without_pool_account' WHEN dde.token_a_mint IS NULL OR dde.token_b_mint IS NULL THEN 'ok_transaction_without_token_mints' WHEN NOT EXISTS ( SELECT 1 FROM k_sol_pools p JOIN k_sol_pairs pair ON pair.pool_id = p.id WHERE p.address = dde.pool_account ) THEN 'ok_transaction_without_known_pair' WHEN ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL AND json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL AND json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL ) THEN 'ok_transaction_without_amount_payload' WHEN ( json_extract(dde.payload_json, '$.baseAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.base_amount_raw') IS NULL ) THEN 'ok_transaction_without_base_amount_payload' WHEN ( json_extract(dde.payload_json, '$.quoteAmountRaw') IS NULL AND json_extract(dde.payload_json, '$.quote_amount_raw') IS NULL ) THEN 'ok_transaction_without_quote_amount_payload' WHEN ( json_extract(dde.payload_json, '$.priceQuotePerBase') IS NULL AND json_extract(dde.payload_json, '$.price_quote_per_base') IS NULL ) THEN 'ok_transaction_without_price_payload' ELSE 'ok_transaction_unclassified' END AS reason, CASE WHEN json_extract(dde.payload_json, '$.baseAmountRaw') IS NOT NULL OR json_extract(dde.payload_json, '$.base_amount_raw') IS NOT NULL THEN 1 ELSE 0 END AS has_base_amount_payload, CASE WHEN json_extract(dde.payload_json, '$.quoteAmountRaw') IS NOT NULL OR json_extract(dde.payload_json, '$.quote_amount_raw') IS NOT NULL THEN 1 ELSE 0 END AS has_quote_amount_payload, CASE WHEN json_extract(dde.payload_json, '$.priceQuotePerBase') IS NOT NULL OR json_extract(dde.payload_json, '$.price_quote_per_base') IS NOT NULL THEN 1 ELSE 0 END AS has_price_payload FROM k_sol_dex_decoded_events dde LEFT JOIN k_sol_trade_events te ON te.decoded_event_id = dde.id LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id WHERE json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND te.id IS NULL ORDER BY actionable DESC, transaction_failed ASC, dde.protocol_name, dde.event_kind, dde.id LIMIT ? "#, ) .bind(limit) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list missing trade event diagnostic samples on sqlite: {}", error ))); }, }; let mut samples = std::vec::Vec::new(); for row in rows { samples.push(crate::LocalMissingTradeEventDiagnosticSampleDto { decoded_event_id: row.decoded_event_id, transaction_id: row.transaction_id, signature: row.signature, protocol_name: row.protocol_name, event_kind: row.event_kind, pool_account: row.pool_account, transaction_failed: sqlite_i64_to_bool(row.transaction_failed), actionable: sqlite_i64_to_bool(row.actionable), reason: row.reason, has_base_amount_payload: sqlite_i64_to_bool(row.has_base_amount_payload), has_quote_amount_payload: sqlite_i64_to_bool(row.has_quote_amount_payload), has_price_payload: sqlite_i64_to_bool(row.has_price_payload), }); } return Ok(samples); }, } } /// Lists samples of duplicated trade rows by decoded event id. pub async fn query_local_duplicate_decoded_event_trade_diagnostic_list_samples( database: &crate::Database, limit: i64, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalDuplicateDecodedEventTradeDiagnosticSampleRow, >( r#" SELECT te.decoded_event_id AS decoded_event_id, dde.protocol_name AS protocol_name, dde.event_kind AS event_kind, dde.pool_account AS pool_account, COUNT(te.id) AS trade_event_count, GROUP_CONCAT(te.id) AS trade_event_ids, GROUP_CONCAT(te.signature) AS signatures FROM k_sol_trade_events te LEFT JOIN k_sol_dex_decoded_events dde ON dde.id = te.decoded_event_id WHERE te.decoded_event_id IS NOT NULL GROUP BY te.decoded_event_id, dde.protocol_name, dde.event_kind, dde.pool_account HAVING COUNT(te.id) > 1 ORDER BY trade_event_count DESC, te.decoded_event_id LIMIT ? "#, ) .bind(limit) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list duplicate decoded event trade diagnostic samples on sqlite: {}", error ))); }, }; let mut samples = std::vec::Vec::new(); for row in rows { samples.push(crate::LocalDuplicateDecodedEventTradeDiagnosticSampleDto { decoded_event_id: row.decoded_event_id, protocol_name: row.protocol_name, event_kind: row.event_kind, pool_account: row.pool_account, trade_event_count: row.trade_event_count, trade_event_ids: row.trade_event_ids, signatures: row.signatures, }); } return Ok(samples); }, } } /// Lists samples of multi-trade signature/pair groups. pub async fn query_local_multi_trade_signature_pair_diagnostic_list_samples( database: &crate::Database, limit: i64, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalMultiTradeSignaturePairDiagnosticSampleRow, >( r#" SELECT te.signature AS signature, te.pair_id AS pair_id, p.address AS pool_address, d.code AS dex_code, COUNT(te.id) AS trade_event_count, COUNT(DISTINCT te.decoded_event_id) AS decoded_event_count, GROUP_CONCAT(te.id) AS trade_event_ids, GROUP_CONCAT(te.decoded_event_id) AS decoded_event_ids FROM k_sol_trade_events te LEFT JOIN k_sol_pairs pair ON pair.id = te.pair_id LEFT JOIN k_sol_pools p ON p.id = pair.pool_id LEFT JOIN k_sol_dexes d ON d.id = p.dex_id GROUP BY te.signature, te.pair_id, p.address, d.code HAVING COUNT(te.id) > 1 ORDER BY trade_event_count DESC, te.signature, te.pair_id LIMIT ? "#, ) .bind(limit) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list multi-trade signature/pair diagnostic samples on sqlite: {}", error ))); }, }; let mut samples = std::vec::Vec::new(); for row in rows { samples.push(crate::LocalMultiTradeSignaturePairDiagnosticSampleDto { signature: row.signature, pair_id: row.pair_id, pool_address: row.pool_address, dex_code: row.dex_code, trade_event_count: row.trade_event_count, decoded_event_count: row.decoded_event_count, trade_event_ids: row.trade_event_ids, decoded_event_ids: row.decoded_event_ids, }); } return Ok(samples); }, } } /// Lists samples of pairs without trade events. pub async fn query_local_pair_without_trade_diagnostic_list_samples( database: &crate::Database, limit: i64, ) -> Result, crate::Error> { return list_local_pair_gap_diagnostic_samples(database, limit, true).await; } /// Lists samples of pairs without candles. pub async fn query_local_pair_without_candle_diagnostic_list_samples( database: &crate::Database, limit: i64, ) -> Result, crate::Error> { return list_local_pair_gap_diagnostic_samples(database, limit, false).await; } async fn list_local_pair_gap_diagnostic_samples( database: &crate::Database, limit: i64, without_trade: bool, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let having_clause = if without_trade { r#" HAVING COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) > 0 AND COUNT(DISTINCT te.id) = 0 "# } else { r#" HAVING COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.candleCandidate') = 1 AND ct.id IS NOT NULL AND ct.err_json IS NULL THEN dde.id END) > 0 AND COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) = 0 "# }; let sql = format!( r#" SELECT pair.id AS pair_id, p.address AS pool_address, d.code AS dex_code, base_token.mint AS base_mint, base_token.symbol AS base_symbol, quote_token.mint AS quote_mint, quote_token.symbol AS quote_symbol, COALESCE( pair.symbol, CASE WHEN base_token.symbol IS NOT NULL AND base_token.symbol != '' AND quote_token.symbol IS NOT NULL AND quote_token.symbol != '' THEN base_token.symbol || '/' || quote_token.symbol ELSE NULL END ) AS pair_symbol, COUNT(DISTINCT dde.id) AS decoded_event_count, COUNT(DISTINCT CASE WHEN json_extract(dde.payload_json, '$.tradeCandidate') = 1 THEN dde.id END) AS decoded_trade_candidate_count, COUNT(DISTINCT te.id) AS trade_event_count, COUNT(DISTINCT pc.bucket_start_unix || ':' || pc.timeframe_seconds) AS pair_candle_count FROM k_sol_pairs pair JOIN k_sol_pools p ON p.id = pair.pool_id JOIN k_sol_dexes d ON d.id = p.dex_id JOIN k_sol_tokens base_token ON base_token.id = pair.base_token_id JOIN k_sol_tokens quote_token ON quote_token.id = pair.quote_token_id LEFT JOIN k_sol_dex_decoded_events dde ON dde.pool_account = p.address LEFT JOIN k_sol_chain_transactions ct ON ct.id = dde.transaction_id LEFT JOIN k_sol_trade_events te ON te.pair_id = pair.id LEFT JOIN k_sol_pair_candles pc ON pc.pair_id = pair.id GROUP BY pair.id, p.address, d.code, base_token.mint, base_token.symbol, quote_token.mint, quote_token.symbol, pair.symbol {} ORDER BY decoded_trade_candidate_count DESC, pair.id LIMIT ? "#, having_clause ); let rows_result = sqlx::query_as::< sqlx::Sqlite, crate::db::dtos::LocalPairGapDiagnosticSampleRow, >(sql.as_str()) .bind(limit) .fetch_all(pool) .await; let rows = match rows_result { Ok(rows) => rows, Err(error) => { return Err(crate::Error::Db(format!( "cannot list pair gap diagnostic samples on sqlite: {}", error ))); }, }; let mut samples = std::vec::Vec::new(); for row in rows { samples.push(crate::LocalPairGapDiagnosticSampleDto { pair_id: row.pair_id, pool_address: row.pool_address, dex_code: row.dex_code, base_mint: row.base_mint, base_symbol: row.base_symbol, quote_mint: row.quote_mint, quote_symbol: row.quote_symbol, pair_symbol: row.pair_symbol, decoded_event_count: row.decoded_event_count, decoded_trade_candidate_count: row.decoded_trade_candidate_count, trade_event_count: row.trade_event_count, pair_candle_count: row.pair_candle_count, }); } return Ok(samples); }, } } fn sqlite_bool_to_option(value: std::option::Option) -> std::option::Option { match value { Some(0) => return Some(false), Some(_) => return Some(true), None => return None, } } fn sqlite_i64_to_bool(value: i64) -> bool { return value != 0; }