This commit is contained in:
2026-05-20 23:57:15 +02:00
parent fad7ec5107
commit 62831a0abe
56 changed files with 6603 additions and 114 deletions

View File

@@ -0,0 +1,548 @@
// file: kb_lib/src/db/queries/local_dex_corpus_search.rs
//! Queries for local DEX corpus searches used by Demo3.
/// Returns aggregate counts for a local DEX corpus search.
pub async fn query_local_dex_corpus_search_get_summary(
database: &crate::Database,
request: &crate::LocalDexCorpusSearchRequestDto,
) -> Result<crate::LocalDexCorpusSearchSummaryDto, crate::Error> {
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let dex_code = normalized_filter_text(request.dex_code.as_deref());
let program_id = normalized_filter_text(request.program_id.as_deref());
let pool_address = normalized_filter_text(request.pool_address.as_deref());
let token_mint = normalized_filter_text(request.token_mint.as_deref());
let signature = normalized_filter_text(request.signature.as_deref());
let pool_address_like = like_filter_text(pool_address.as_str());
let token_mint_like = like_filter_text(token_mint.as_str());
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::db::dtos::LocalDexCorpusSearchSummaryRow>(
r#"
SELECT
COUNT(DISTINCT tx.id) AS transaction_count,
COUNT(DISTINCT ix.id) AS instruction_count,
COUNT(DISTINCT dde.id) AS decoded_event_count,
COUNT(DISTINCT pool.id) AS pool_count,
COUNT(DISTINCT pair.id) AS pair_count,
COUNT(DISTINCT te.id) AS trade_event_count,
COUNT(DISTINCT candle.id) AS pair_candle_count,
COUNT(DISTINCT pc.id) AS protocol_candidate_count
FROM k_sol_chain_transactions tx
LEFT JOIN k_sol_chain_instructions ix
ON ix.transaction_id = tx.id
LEFT JOIN k_sol_dex_decoded_events dde
ON dde.transaction_id = tx.id
LEFT JOIN k_sol_pools pool
ON pool.address = dde.pool_account
LEFT JOIN k_sol_pairs pair
ON pair.pool_id = pool.id
LEFT JOIN k_sol_dexes dex
ON dex.id = pair.dex_id
LEFT JOIN k_sol_tokens base_token
ON base_token.id = pair.base_token_id
LEFT JOIN k_sol_tokens quote_token
ON quote_token.id = pair.quote_token_id
LEFT JOIN k_sol_trade_events te
ON te.transaction_id = tx.id
LEFT JOIN k_sol_pair_candles candle
ON candle.pair_id = pair.id
LEFT JOIN k_sol_protocol_candidates pc
ON pc.transaction_id = tx.id
WHERE
(NULLIF(TRIM(?), '') IS NULL
OR dex.code = ?
OR dde.protocol_name = ?
OR pc.candidate_protocol = ?
OR pc.candidate_surface = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR ix.program_id = ?
OR dde.program_id = ?
OR pc.program_id = ?)
AND (? IS NULL OR pair.id = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR pool.address = ?
OR dde.pool_account = ?
OR ix.accounts_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR base_token.mint = ?
OR quote_token.mint = ?
OR dde.token_a_mint = ?
OR dde.token_b_mint = ?
OR ix.accounts_json LIKE ?
OR tx.transaction_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL OR tx.signature = ?)
"#,
)
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(request.pair_id)
.bind(request.pair_id)
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address_like.clone())
.bind(pool_address_like.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(signature.clone())
.bind(signature.clone())
.fetch_one(pool)
.await;
let row = match query_result {
Ok(row) => row,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot query local DEX corpus search summary on sqlite: {}",
error
)));
},
};
return Ok(crate::LocalDexCorpusSearchSummaryDto::from(row));
},
}
}
/// Lists matching transaction samples for a local DEX corpus search.
pub async fn query_local_dex_corpus_search_list_transaction_samples(
database: &crate::Database,
request: &crate::LocalDexCorpusSearchRequestDto,
) -> Result<std::vec::Vec<crate::LocalDexCorpusTransactionSampleDto>, crate::Error> {
let limit = normalized_limit(request.limit);
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let dex_code = normalized_filter_text(request.dex_code.as_deref());
let program_id = normalized_filter_text(request.program_id.as_deref());
let pool_address = normalized_filter_text(request.pool_address.as_deref());
let token_mint = normalized_filter_text(request.token_mint.as_deref());
let signature = normalized_filter_text(request.signature.as_deref());
let pool_address_like = like_filter_text(pool_address.as_str());
let token_mint_like = like_filter_text(token_mint.as_str());
let query_result = sqlx::query_as::<
sqlx::Sqlite,
crate::db::dtos::LocalDexCorpusTransactionSampleRow,
>(
r#"
SELECT
tx.id AS transaction_id,
tx.signature AS signature,
tx.slot AS slot,
CASE WHEN tx.err_json IS NULL THEN 0 ELSE 1 END AS failed,
(
SELECT COUNT(*)
FROM k_sol_chain_instructions count_ix
WHERE count_ix.transaction_id = tx.id
) AS instruction_count,
(
SELECT COUNT(*)
FROM k_sol_dex_decoded_events count_dde
WHERE count_dde.transaction_id = tx.id
) AS decoded_event_count,
(
SELECT COUNT(*)
FROM k_sol_trade_events count_te
WHERE count_te.transaction_id = tx.id
) AS trade_event_count,
COALESCE((
SELECT GROUP_CONCAT(DISTINCT program_ix.program_id)
FROM k_sol_chain_instructions program_ix
WHERE program_ix.transaction_id = tx.id
AND program_ix.program_id IS NOT NULL
), '') AS program_ids_csv
FROM k_sol_chain_transactions tx
LEFT JOIN k_sol_chain_instructions ix
ON ix.transaction_id = tx.id
LEFT JOIN k_sol_dex_decoded_events dde
ON dde.transaction_id = tx.id
LEFT JOIN k_sol_pools pool
ON pool.address = dde.pool_account
LEFT JOIN k_sol_pairs pair
ON pair.pool_id = pool.id
LEFT JOIN k_sol_dexes dex
ON dex.id = pair.dex_id
LEFT JOIN k_sol_tokens base_token
ON base_token.id = pair.base_token_id
LEFT JOIN k_sol_tokens quote_token
ON quote_token.id = pair.quote_token_id
LEFT JOIN k_sol_protocol_candidates pc
ON pc.transaction_id = tx.id
WHERE
(NULLIF(TRIM(?), '') IS NULL
OR dex.code = ?
OR dde.protocol_name = ?
OR pc.candidate_protocol = ?
OR pc.candidate_surface = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR ix.program_id = ?
OR dde.program_id = ?
OR pc.program_id = ?)
AND (? IS NULL OR pair.id = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR pool.address = ?
OR dde.pool_account = ?
OR ix.accounts_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR base_token.mint = ?
OR quote_token.mint = ?
OR dde.token_a_mint = ?
OR dde.token_b_mint = ?
OR ix.accounts_json LIKE ?
OR tx.transaction_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL OR tx.signature = ?)
GROUP BY tx.id
ORDER BY tx.slot DESC, tx.id DESC
LIMIT ?
"#,
)
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(request.pair_id)
.bind(request.pair_id)
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address_like.clone())
.bind(pool_address_like.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(signature.clone())
.bind(signature.clone())
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let rows = match query_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot list local DEX corpus transaction samples on sqlite: {}",
error
)));
},
};
let mut dtos = std::vec::Vec::new();
for row in rows {
dtos.push(crate::LocalDexCorpusTransactionSampleDto::from(row));
}
return Ok(dtos);
},
}
}
/// Lists matching pool/pair samples for a local DEX corpus search.
pub async fn query_local_dex_corpus_search_list_pool_pair_samples(
database: &crate::Database,
request: &crate::LocalDexCorpusSearchRequestDto,
) -> Result<std::vec::Vec<crate::LocalDexCorpusPoolPairSampleDto>, crate::Error> {
let limit = normalized_limit(request.limit);
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let dex_code = normalized_filter_text(request.dex_code.as_deref());
let program_id = normalized_filter_text(request.program_id.as_deref());
let pool_address = normalized_filter_text(request.pool_address.as_deref());
let token_mint = normalized_filter_text(request.token_mint.as_deref());
let signature = normalized_filter_text(request.signature.as_deref());
let pool_address_like = like_filter_text(pool_address.as_str());
let token_mint_like = like_filter_text(token_mint.as_str());
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::db::dtos::LocalDexCorpusPoolPairSampleRow>(
r#"
SELECT
pool.id AS pool_id,
pool.address AS pool_address,
pair.id AS pair_id,
dex.code AS dex_code,
pair.symbol AS pair_symbol,
base_token.mint AS base_mint,
base_token.symbol AS base_symbol,
quote_token.mint AS quote_mint,
quote_token.symbol AS quote_symbol,
COUNT(DISTINCT dde.id) AS decoded_event_count,
COUNT(DISTINCT te.id) AS trade_event_count,
COUNT(DISTINCT candle.id) AS pair_candle_count,
MAX(COALESCE(te.signature, tx.signature, pc.signature)) AS latest_signature
FROM k_sol_pools pool
LEFT JOIN k_sol_pairs pair
ON pair.pool_id = pool.id
LEFT JOIN k_sol_dexes dex
ON dex.id = pool.dex_id
LEFT JOIN k_sol_tokens base_token
ON base_token.id = pair.base_token_id
LEFT JOIN k_sol_tokens quote_token
ON quote_token.id = pair.quote_token_id
LEFT JOIN k_sol_dex_decoded_events dde
ON dde.pool_account = pool.address
LEFT JOIN k_sol_chain_transactions tx
ON tx.id = dde.transaction_id
LEFT JOIN k_sol_chain_instructions ix
ON ix.transaction_id = tx.id
LEFT JOIN k_sol_trade_events te
ON te.pair_id = pair.id
LEFT JOIN k_sol_pair_candles candle
ON candle.pair_id = pair.id
LEFT JOIN k_sol_protocol_candidates pc
ON pc.transaction_id = tx.id
WHERE
(NULLIF(TRIM(?), '') IS NULL
OR dex.code = ?
OR dde.protocol_name = ?
OR pc.candidate_protocol = ?
OR pc.candidate_surface = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR ix.program_id = ?
OR dde.program_id = ?
OR pc.program_id = ?)
AND (? IS NULL OR pair.id = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR pool.address = ?
OR dde.pool_account = ?
OR ix.accounts_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR base_token.mint = ?
OR quote_token.mint = ?
OR dde.token_a_mint = ?
OR dde.token_b_mint = ?
OR ix.accounts_json LIKE ?
OR tx.transaction_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL OR tx.signature = ? OR te.signature = ? OR pc.signature = ?)
GROUP BY pool.id, pair.id
ORDER BY MAX(COALESCE(tx.slot, te.slot, pc.slot)) DESC, pool.id DESC
LIMIT ?
"#,
)
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(request.pair_id)
.bind(request.pair_id)
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address_like.clone())
.bind(pool_address_like.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(signature.clone())
.bind(signature.clone())
.bind(signature.clone())
.bind(signature.clone())
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let rows = match query_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot list local DEX corpus pool/pair samples on sqlite: {}",
error
)));
},
};
let mut dtos = std::vec::Vec::new();
for row in rows {
dtos.push(crate::LocalDexCorpusPoolPairSampleDto::from(row));
}
return Ok(dtos);
},
}
}
/// Lists matching decoded-event samples for a local DEX corpus search.
pub async fn query_local_dex_corpus_search_list_decoded_event_samples(
database: &crate::Database,
request: &crate::LocalDexCorpusSearchRequestDto,
) -> Result<std::vec::Vec<crate::LocalDexCorpusDecodedEventSampleDto>, crate::Error> {
let limit = normalized_limit(request.limit);
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let dex_code = normalized_filter_text(request.dex_code.as_deref());
let program_id = normalized_filter_text(request.program_id.as_deref());
let pool_address = normalized_filter_text(request.pool_address.as_deref());
let token_mint = normalized_filter_text(request.token_mint.as_deref());
let signature = normalized_filter_text(request.signature.as_deref());
let pool_address_like = like_filter_text(pool_address.as_str());
let token_mint_like = like_filter_text(token_mint.as_str());
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::db::dtos::LocalDexCorpusDecodedEventSampleRow>(
r#"
SELECT
dde.id AS decoded_event_id,
tx.id AS transaction_id,
tx.signature AS signature,
tx.slot AS slot,
dde.protocol_name AS protocol_name,
dde.program_id AS program_id,
dde.event_kind AS event_kind,
dde.pool_account AS pool_account,
dde.token_a_mint AS token_a_mint,
dde.token_b_mint AS token_b_mint,
json_extract(dde.payload_json, '$.eventCategory') AS event_category,
json_extract(dde.payload_json, '$.eventLifecycleKind') AS event_lifecycle_kind,
json_extract(dde.payload_json, '$.eventActionability') AS event_actionability,
json_extract(dde.payload_json, '$.tradeCandidate') AS trade_candidate,
json_extract(dde.payload_json, '$.candleCandidate') AS candle_candidate
FROM k_sol_dex_decoded_events dde
JOIN k_sol_chain_transactions tx
ON tx.id = dde.transaction_id
LEFT JOIN k_sol_chain_instructions ix
ON ix.transaction_id = tx.id
LEFT JOIN k_sol_pools pool
ON pool.address = dde.pool_account
LEFT JOIN k_sol_pairs pair
ON pair.pool_id = pool.id
LEFT JOIN k_sol_dexes dex
ON dex.id = pair.dex_id
LEFT JOIN k_sol_tokens base_token
ON base_token.id = pair.base_token_id
LEFT JOIN k_sol_tokens quote_token
ON quote_token.id = pair.quote_token_id
LEFT JOIN k_sol_protocol_candidates pc
ON pc.transaction_id = tx.id
WHERE
(NULLIF(TRIM(?), '') IS NULL
OR dex.code = ?
OR dde.protocol_name = ?
OR pc.candidate_protocol = ?
OR pc.candidate_surface = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR ix.program_id = ?
OR dde.program_id = ?
OR pc.program_id = ?)
AND (? IS NULL OR pair.id = ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR pool.address = ?
OR dde.pool_account = ?
OR ix.accounts_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL
OR base_token.mint = ?
OR quote_token.mint = ?
OR dde.token_a_mint = ?
OR dde.token_b_mint = ?
OR ix.accounts_json LIKE ?
OR tx.transaction_json LIKE ?
OR pc.evidence_json LIKE ?)
AND (NULLIF(TRIM(?), '') IS NULL OR tx.signature = ? OR pc.signature = ?)
GROUP BY dde.id
ORDER BY tx.slot DESC, dde.id DESC
LIMIT ?
"#,
)
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(dex_code.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(program_id.clone())
.bind(request.pair_id)
.bind(request.pair_id)
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address.clone())
.bind(pool_address_like.clone())
.bind(pool_address_like.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(token_mint_like.clone())
.bind(signature.clone())
.bind(signature.clone())
.bind(signature.clone())
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let rows = match query_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot list local DEX corpus decoded-event samples on sqlite: {}",
error
)));
},
};
let mut dtos = std::vec::Vec::new();
for row in rows {
dtos.push(crate::LocalDexCorpusDecodedEventSampleDto::from(row));
}
return Ok(dtos);
},
}
}
fn normalized_limit(limit: u32) -> u32 {
if limit > 200 {
return 200;
}
return limit;
}
fn normalized_filter_text(value: std::option::Option<&str>) -> std::string::String {
let value = match value {
Some(value) => value.trim(),
None => return std::string::String::new(),
};
return value.to_string();
}
fn like_filter_text(value: &str) -> std::string::String {
if value.is_empty() {
return std::string::String::new();
}
return format!("%{}%", value);
}

View File

@@ -584,6 +584,80 @@ ORDER BY dex_code
}
}
/// Lists observed Raydium program instruction diagnostics.
pub async fn query_local_raydium_program_instruction_diagnostic_list_summaries(
database: &crate::Database,
) -> Result<
std::vec::Vec<crate::LocalRaydiumProgramInstructionDiagnosticSummaryDto>,
crate::Error,
> {
match database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let rows_result = sqlx::query_as::<
sqlx::Sqlite,
crate::db::dtos::LocalRaydiumProgramInstructionDiagnosticSummaryRow,
>(
r#"
SELECT
ix.program_id AS program_id,
COUNT(ix.id) AS instruction_count,
COUNT(DISTINCT tx.signature) AS transaction_count,
MAX(tx.slot) AS latest_slot,
(
SELECT tx_latest.signature
FROM k_sol_chain_instructions ix_latest
JOIN k_sol_chain_transactions tx_latest
ON tx_latest.id = ix_latest.transaction_id
WHERE ix_latest.program_id = ix.program_id
ORDER BY
tx_latest.slot DESC,
tx_latest.id DESC,
ix_latest.instruction_index DESC,
COALESCE(ix_latest.inner_instruction_index, -1) DESC
LIMIT 1
) AS latest_signature
FROM k_sol_chain_instructions ix
JOIN k_sol_chain_transactions tx
ON tx.id = ix.transaction_id
WHERE ix.program_id IN (
'CPMMoo8L3F4NbTegBCKVNunggL7H1ZpdTHKxQB5qKP1C',
'CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK',
'675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8',
'5quBtoiQqxF9Jv6KYKctB59NT3gtJD2Y65kdnB1Uev3h',
'routeUGWgWzqBWFcrCfv8tritsqukccJPu3q5GPP3xS',
'LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj'
)
GROUP BY ix.program_id
ORDER BY transaction_count DESC, instruction_count DESC, program_id
"#,
)
.fetch_all(pool)
.await;
let rows = match rows_result {
Ok(rows) => rows,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot list local Raydium program instruction diagnostics on sqlite: {}",
error
)));
},
};
let mut summaries = std::vec::Vec::new();
for row in rows {
summaries.push(crate::LocalRaydiumProgramInstructionDiagnosticSummaryDto {
program_id: row.program_id,
instruction_count: row.instruction_count,
transaction_count: row.transaction_count,
latest_slot: row.latest_slot,
latest_signature: row.latest_signature,
});
}
return Ok(summaries);
},
}
}
/// Lists local pair diagnostic summaries.
pub async fn query_local_pair_diagnostic_list_summaries(
database: &crate::Database,