From ac5bf10af645b6e96a9cddd535dad9ae24d8f45b Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sun, 26 Apr 2026 11:44:58 +0200 Subject: [PATCH] 0.7.1 for Real ! - it was 0.7.0 before, not 0.7.1 !! --- CHANGELOG.md | 1 + Cargo.toml | 2 +- ROADMAP.md | 27 +- kb_app/src/demo_ws.rs | 123 +++- kb_app/src/lib.rs | 2 +- kb_lib/src/db.rs | 192 +++--- kb_lib/src/db/dtos.rs | 40 +- kb_lib/src/db/dtos/chain_instruction.rs | 140 ++++ kb_lib/src/db/dtos/chain_slot.rs | 95 +++ kb_lib/src/db/dtos/chain_transaction.rs | 115 ++++ kb_lib/src/db/entities.rs | 40 +- kb_lib/src/db/entities/chain_instruction.rs | 34 + kb_lib/src/db/entities/chain_slot.rs | 19 + kb_lib/src/db/entities/chain_transaction.rs | 30 + kb_lib/src/db/queries.rs | 96 +-- kb_lib/src/db/queries/chain_instruction.rs | 246 +++++++ kb_lib/src/db/queries/chain_slot.rs | 221 +++++++ kb_lib/src/db/queries/chain_transaction.rs | 265 ++++++++ kb_lib/src/db/schema.rs | 145 ++++ kb_lib/src/db/types.rs | 20 +- kb_lib/src/detect.rs | 24 +- kb_lib/src/lib.rs | 321 ++++----- kb_lib/src/tx_model.rs | 695 ++++++++++++++++++++ kb_lib/src/tx_resolution.rs | 19 +- kb_lib/src/ws_client.rs | 28 +- kb_lib/src/ws_manager.rs | 8 +- 26 files changed, 2560 insertions(+), 388 deletions(-) create mode 100644 kb_lib/src/db/dtos/chain_instruction.rs create mode 100644 kb_lib/src/db/dtos/chain_slot.rs create mode 100644 kb_lib/src/db/dtos/chain_transaction.rs create mode 100644 kb_lib/src/db/entities/chain_instruction.rs create mode 100644 kb_lib/src/db/entities/chain_slot.rs create mode 100644 kb_lib/src/db/entities/chain_transaction.rs create mode 100644 kb_lib/src/db/queries/chain_instruction.rs create mode 100644 kb_lib/src/db/queries/chain_slot.rs create mode 100644 kb_lib/src/db/queries/chain_transaction.rs create mode 100644 kb_lib/src/tx_model.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ab9f7b..8e85482 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,3 +31,4 @@ 0.6.5 - Ajout de ws_manager.rs pour l’orchestration multi-clients WebSocket, le bus d’événements unifié et le branchement centralisé du relais de détection 0.6.6 - Ajout de la fenêtre Demo Ws Manager dans kb_app pour piloter plusieurs WsClient, visualiser le snapshot consolidé, tester le démarrage/arrêt par rôle et valider le flux unifié de WsEvent 0.7.0 - Ajout du socle de résolution transactionnelle orientée DEX : relais WS vers file de résolution, récupération getTransaction via HttpEndpointPool et persistance des résolutions dans les observations/signaux +0.7.1 - Ajout du modèle transactionnel enrichi : tables slots/transactions/instructions, requêtes d’accès et projection structurée des transactions résolues diff --git a/Cargo.toml b/Cargo.toml index 120c162..2e58e28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.0" +version = "0.7.1" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 185e448..5c9da5a 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -457,14 +457,14 @@ Réalisé : - préparation du futur modèle transactionnel enrichi sans bloquer les flux temps réel. ### 6.033. Version `0.7.1` — Modèle transactionnel Solana enrichi -Objectif : préparer un modèle interne plus riche, inspiré d’une vision `slot -> signature -> instruction`. +Réalisé : -À faire : - -- préparer les structures et tables permettant de relier blocs/slots, signatures et instructions, -- distinguer clairement transaction, instruction principale et éventuelles inner instructions, -- conserver la possibilité de relier plus tard un pool, un token ou un wallet à une signature fondatrice, -- préparer l’historique transactionnel nécessaire aux futurs décodeurs DEX. +- ajout des tables techniques `kb_chain_slots`, `kb_chain_transactions` et `kb_chain_instructions`, +- distinction claire entre slot, transaction résolue et instructions normalisées, +- support des instructions principales et inner instructions, +- ajout des entités, DTOs et requêtes associées, +- ajout d’un service de projection pour transformer une transaction JSON-RPC résolue en modèle transactionnel interne, +- ajout des tests de roundtrip et de projection. ### 6.034. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés. @@ -656,10 +656,9 @@ Le projet doit maintenir au minimum : ## 12. Priorité immédiate La priorité immédiate est désormais la suivante : -1. démarrer la version `0.7.1` avec le modèle transactionnel Solana enrichi, -2. préparer les structures et tables reliant slots, signatures et instructions, -3. distinguer clairement transaction principale et inner instructions, -4. préparer l’historique transactionnel exploitable par les futurs décodeurs DEX, -5. conserver le découplage entre transport, résolution transactionnelle, détection métier et stockage, -6. préparer ensuite la version `0.7.2` pour les décodeurs DEX spécifiques par programme et version. - +1. brancher automatiquement `tx_resolution.rs` vers `KbTransactionModelService`, +2. stabiliser la chaîne complète `WS -> résolution HTTP -> projection transactionnelle`, +3. démarrer la version `0.7.2` avec les décodeurs DEX spécifiques par programme et version, +4. introduire les premières règles de décodage dédiées à Raydium / Pump.fun / PumpSwap, +5. conserver le découplage entre transport, résolution transactionnelle, projection et décodage métier, +6. préparer ensuite l’enrichissement des objets métier DEX à partir du modèle transactionnel. diff --git a/kb_app/src/demo_ws.rs b/kb_app/src/demo_ws.rs index b13d85c..cbe72ff 100644 --- a/kb_app/src/demo_ws.rs +++ b/kb_app/src/demo_ws.rs @@ -427,60 +427,122 @@ async fn kb_execute_demo_ws_subscribe( let method = request.method.trim(); let mode = request.mode.trim(); if method == "account" { - let target = kb_required_target(request, "account pubkey")?; + let target_result = kb_required_target(request, "account pubkey"); + let target = match target_result { + Ok(target) => target, + Err(error) => return Err(error), + }; if mode == "typed" { - let config = kb_parse_optional_json_typed::< + let config_result = kb_parse_optional_json_typed::< solana_rpc_client_api::config::RpcAccountInfoConfig, - >(&request.config_json, "account typed config")?; + >(&request.config_json, "account typed config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.account_subscribe_typed(target, config).await; return result.map_err(|error| format!("account typed subscribe failed: {error}")); } - let config = kb_parse_optional_json_value(&request.config_json, "account raw config")?; + let config_result = + kb_parse_optional_json_value(&request.config_json, "account raw config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.account_subscribe_raw(target, config).await; return result.map_err(|error| format!("account raw subscribe failed: {error}")); } if method == "block" { if mode == "typed" { - let filter = kb_parse_required_json_typed::< + let filter_result = kb_parse_required_json_typed::< solana_rpc_client_api::config::RpcBlockSubscribeFilter, - >(&request.filter_json, "block typed filter")?; - let config = kb_parse_optional_json_typed::< + >(&request.filter_json, "block typed filter"); + let filter = match filter_result { + Ok(filter) => filter, + Err(error) => return Err(error), + }; + let config_result = kb_parse_optional_json_typed::< solana_rpc_client_api::config::RpcBlockSubscribeConfig, - >(&request.config_json, "block typed config")?; + >(&request.config_json, "block typed config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.block_subscribe_typed(filter, config).await; return result.map_err(|error| format!("block typed subscribe failed: {error}")); } - let filter = kb_parse_required_json_value(&request.filter_json, "block raw filter")?; - let config = kb_parse_optional_json_value(&request.config_json, "block raw config")?; + let filter_result = + kb_parse_required_json_value(&request.filter_json, "block raw filter"); + let filter = match filter_result { + Ok(filter) => filter, + Err(error) => return Err(error), + }; + let config_result = + kb_parse_optional_json_value(&request.config_json, "block raw config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.block_subscribe_raw(filter, config).await; return result.map_err(|error| format!("block raw subscribe failed: {error}")); } if method == "logs" { if mode == "typed" { - let filter = kb_parse_required_json_typed::< + let filter_result = kb_parse_required_json_typed::< solana_rpc_client_api::config::RpcTransactionLogsFilter, - >(&request.filter_json, "logs typed filter")?; - let config = kb_parse_optional_json_typed::< + >(&request.filter_json, "logs typed filter"); + let filter = match filter_result { + Ok(filter) => filter, + Err(error) => return Err(error), + }; + let config_result = kb_parse_optional_json_typed::< solana_rpc_client_api::config::RpcTransactionLogsConfig, - >(&request.config_json, "logs typed config")?; + >(&request.config_json, "logs typed config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.logs_subscribe_typed(filter, config).await; return result.map_err(|error| format!("logs typed subscribe failed: {error}")); } - let filter = kb_parse_required_json_value(&request.filter_json, "logs raw filter")?; - let config = kb_parse_optional_json_value(&request.config_json, "logs raw config")?; + let filter_result = + kb_parse_required_json_value(&request.filter_json, "logs raw filter"); + let filter = match filter_result { + Ok(filter) => filter, + Err(error) => return Err(error), + }; + let config_result = + kb_parse_optional_json_value(&request.config_json, "logs raw config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.logs_subscribe_raw(filter, config).await; return result.map_err(|error| format!("logs raw subscribe failed: {error}")); } if method == "program" { - let target = kb_required_target(request, "program id")?; + let target_result = kb_required_target(request, "program id"); + let target = match target_result { + Ok(target) => target, + Err(error) => return Err(error), + }; if mode == "typed" { - let config = kb_parse_optional_json_typed::< + let config_result = kb_parse_optional_json_typed::< solana_rpc_client_api::config::RpcProgramAccountsConfig, - >(&request.config_json, "program typed config")?; + >(&request.config_json, "program typed config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.program_subscribe_typed(target, config).await; return result.map_err(|error| format!("program typed subscribe failed: {error}")); } - let config = kb_parse_optional_json_value(&request.config_json, "program raw config")?; + let config_result = + kb_parse_optional_json_value(&request.config_json, "program raw config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.program_subscribe_raw(target, config).await; return result.map_err(|error| format!("program raw subscribe failed: {error}")); } @@ -489,15 +551,28 @@ async fn kb_execute_demo_ws_subscribe( return result.map_err(|error| format!("root subscribe failed: {error}")); } if method == "signature" { - let target = kb_required_target(request, "signature")?; + let target_result = kb_required_target(request, "signature"); + let target = match target_result { + Ok(target) => target, + Err(error) => return Err(error), + }; if mode == "typed" { - let config = kb_parse_optional_json_typed::< + let config_result = kb_parse_optional_json_typed::< solana_rpc_client_api::config::RpcSignatureSubscribeConfig, - >(&request.config_json, "signature typed config")?; + >(&request.config_json, "signature typed config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.signature_subscribe_typed(target, config).await; return result.map_err(|error| format!("signature typed subscribe failed: {error}")); } - let config = kb_parse_optional_json_value(&request.config_json, "signature raw config")?; + let config_result = + kb_parse_optional_json_value(&request.config_json, "signature raw config"); + let config = match config_result { + Ok(config) => config, + Err(error) => return Err(error), + }; let result = client.signature_subscribe_raw(target, config).await; return result.map_err(|error| format!("signature raw subscribe failed: {error}")); } diff --git a/kb_app/src/lib.rs b/kb_app/src/lib.rs index c37969a..cc83ad9 100644 --- a/kb_app/src/lib.rs +++ b/kb_app/src/lib.rs @@ -14,7 +14,7 @@ mod demo_ws; mod demo_ws_manager; mod splash; -pub use crate::splash::SplashOrder; +pub use splash::SplashOrder; use tauri::Emitter; use tauri::Manager; diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs index f7861e3..c9c9119 100644 --- a/kb_lib/src/db.rs +++ b/kb_lib/src/db.rs @@ -13,91 +13,107 @@ mod schema; mod sqlite; mod types; -pub use crate::db::connection::KbDatabase; -pub use crate::db::connection::KbDatabaseConnection; -pub use crate::db::dtos::KbAnalysisSignalDto; -pub use crate::db::dtos::KbDbMetadataDto; -pub use crate::db::dtos::KbDbRuntimeEventDto; -pub use crate::db::dtos::KbDexDto; -pub use crate::db::dtos::KbKnownHttpEndpointDto; -pub use crate::db::dtos::KbKnownWsEndpointDto; -pub use crate::db::dtos::KbLiquidityEventDto; -pub use crate::db::dtos::KbObservedTokenDto; -pub use crate::db::dtos::KbOnchainObservationDto; -pub use crate::db::dtos::KbPairDto; -pub use crate::db::dtos::KbPoolDto; -pub use crate::db::dtos::KbPoolListingDto; -pub use crate::db::dtos::KbPoolTokenDto; -pub use crate::db::dtos::KbSwapDto; -pub use crate::db::dtos::KbTokenBurnEventDto; -pub use crate::db::dtos::KbTokenDto; -pub use crate::db::dtos::KbTokenMintEventDto; -pub use crate::db::entities::KbAnalysisSignalEntity; -pub use crate::db::entities::KbDbMetadataEntity; -pub use crate::db::entities::KbDbRuntimeEventEntity; -pub use crate::db::entities::KbDexEntity; -pub use crate::db::entities::KbKnownHttpEndpointEntity; -pub use crate::db::entities::KbKnownWsEndpointEntity; -pub use crate::db::entities::KbLiquidityEventEntity; -pub use crate::db::entities::KbObservedTokenEntity; -pub use crate::db::entities::KbOnchainObservationEntity; -pub use crate::db::entities::KbPairEntity; -pub use crate::db::entities::KbPoolEntity; -pub use crate::db::entities::KbPoolListingEntity; -pub use crate::db::entities::KbPoolTokenEntity; -pub use crate::db::entities::KbSwapEntity; -pub use crate::db::entities::KbTokenBurnEventEntity; -pub use crate::db::entities::KbTokenEntity; -pub use crate::db::entities::KbTokenMintEventEntity; -pub use crate::db::queries::get_db_metadata; -pub use crate::db::queries::get_known_http_endpoint; -pub use crate::db::queries::get_known_ws_endpoint; -pub use crate::db::queries::get_observed_token_by_mint; -pub use crate::db::queries::get_token_by_mint; -pub use crate::db::queries::insert_analysis_signal; -pub use crate::db::queries::insert_db_runtime_event; -pub use crate::db::queries::insert_onchain_observation; -pub use crate::db::queries::list_db_metadata; -pub use crate::db::queries::list_dexes; -pub use crate::db::queries::list_known_http_endpoints; -pub use crate::db::queries::list_known_ws_endpoints; -pub use crate::db::queries::list_observed_tokens; -pub use crate::db::queries::list_recent_analysis_signals; -pub use crate::db::queries::list_recent_db_runtime_events; -pub use crate::db::queries::list_recent_liquidity_events; -pub use crate::db::queries::list_recent_onchain_observations; -pub use crate::db::queries::list_recent_swaps; -pub use crate::db::queries::list_recent_token_burn_events; -pub use crate::db::queries::list_recent_token_mint_events; -pub use crate::db::queries::upsert_db_metadata; -pub use crate::db::queries::upsert_dex; -pub use crate::db::queries::upsert_known_http_endpoint; -pub use crate::db::queries::upsert_known_ws_endpoint; -pub use crate::db::queries::upsert_liquidity_event; -pub use crate::db::queries::upsert_observed_token; -pub use crate::db::queries::upsert_pair; -pub use crate::db::queries::upsert_pool; -pub use crate::db::queries::upsert_pool_listing; -pub use crate::db::queries::upsert_pool_token; -pub use crate::db::queries::upsert_swap; -pub use crate::db::queries::upsert_token; -pub use crate::db::queries::upsert_token_burn_event; -pub use crate::db::queries::upsert_token_mint_event; -pub use crate::db::queries::get_dex_by_code; -pub use crate::db::queries::get_pair_by_pool_id; -pub use crate::db::queries::get_pool_by_address; -pub use crate::db::queries::get_pool_listing_by_pool_id; -pub use crate::db::queries::list_pairs; -pub use crate::db::queries::list_pool_listings; -pub use crate::db::queries::list_pool_tokens_by_pool_id; -pub use crate::db::queries::list_pools; -pub use crate::db::types::KbAnalysisSignalSeverity; -pub use crate::db::types::KbDatabaseBackend; -pub use crate::db::types::KbDbRuntimeEventLevel; -pub use crate::db::types::KbLiquidityEventKind; -pub use crate::db::types::KbObservationSourceKind; -pub use crate::db::types::KbObservedTokenStatus; -pub use crate::db::types::KbPoolKind; -pub use crate::db::types::KbPoolStatus; -pub use crate::db::types::KbPoolTokenRole; -pub use crate::db::types::KbSwapTradeSide; +pub use connection::KbDatabase; +pub use connection::KbDatabaseConnection; +pub use dtos::KbAnalysisSignalDto; +pub use dtos::KbDbMetadataDto; +pub use dtos::KbDbRuntimeEventDto; +pub use dtos::KbDexDto; +pub use dtos::KbKnownHttpEndpointDto; +pub use dtos::KbKnownWsEndpointDto; +pub use dtos::KbLiquidityEventDto; +pub use dtos::KbObservedTokenDto; +pub use dtos::KbOnchainObservationDto; +pub use dtos::KbPairDto; +pub use dtos::KbPoolDto; +pub use dtos::KbPoolListingDto; +pub use dtos::KbPoolTokenDto; +pub use dtos::KbSwapDto; +pub use dtos::KbTokenBurnEventDto; +pub use dtos::KbTokenDto; +pub use dtos::KbTokenMintEventDto; +pub use dtos::KbChainInstructionDto; +pub use dtos::KbChainSlotDto; +pub use dtos::KbChainTransactionDto; +pub use entities::KbAnalysisSignalEntity; +pub use entities::KbDbMetadataEntity; +pub use entities::KbDbRuntimeEventEntity; +pub use entities::KbDexEntity; +pub use entities::KbKnownHttpEndpointEntity; +pub use entities::KbKnownWsEndpointEntity; +pub use entities::KbLiquidityEventEntity; +pub use entities::KbObservedTokenEntity; +pub use entities::KbOnchainObservationEntity; +pub use entities::KbPairEntity; +pub use entities::KbPoolEntity; +pub use entities::KbPoolListingEntity; +pub use entities::KbPoolTokenEntity; +pub use entities::KbSwapEntity; +pub use entities::KbTokenBurnEventEntity; +pub use entities::KbTokenEntity; +pub use entities::KbTokenMintEventEntity; +pub use entities::KbChainInstructionEntity; +pub use entities::KbChainSlotEntity; +pub use entities::KbChainTransactionEntity; + +pub use queries::get_db_metadata; +pub use queries::get_known_http_endpoint; +pub use queries::get_known_ws_endpoint; +pub use queries::get_observed_token_by_mint; +pub use queries::get_token_by_mint; +pub use queries::insert_analysis_signal; +pub use queries::insert_db_runtime_event; +pub use queries::insert_onchain_observation; +pub use queries::list_db_metadata; +pub use queries::list_dexes; +pub use queries::list_known_http_endpoints; +pub use queries::list_known_ws_endpoints; +pub use queries::list_observed_tokens; +pub use queries::list_recent_analysis_signals; +pub use queries::list_recent_db_runtime_events; +pub use queries::list_recent_liquidity_events; +pub use queries::list_recent_onchain_observations; +pub use queries::list_recent_swaps; +pub use queries::list_recent_token_burn_events; +pub use queries::list_recent_token_mint_events; +pub use queries::upsert_db_metadata; +pub use queries::upsert_dex; +pub use queries::upsert_known_http_endpoint; +pub use queries::upsert_known_ws_endpoint; +pub use queries::upsert_liquidity_event; +pub use queries::upsert_observed_token; +pub use queries::upsert_pair; +pub use queries::upsert_pool; +pub use queries::upsert_pool_listing; +pub use queries::upsert_pool_token; +pub use queries::upsert_swap; +pub use queries::upsert_token; +pub use queries::upsert_token_burn_event; +pub use queries::upsert_token_mint_event; +pub use queries::get_dex_by_code; +pub use queries::get_pair_by_pool_id; +pub use queries::get_pool_by_address; +pub use queries::get_pool_listing_by_pool_id; +pub use queries::list_pairs; +pub use queries::list_pool_listings; +pub use queries::list_pool_tokens_by_pool_id; +pub use queries::list_pools; +pub use queries::delete_chain_instructions_by_transaction_id; +pub use queries::get_chain_slot; +pub use queries::get_chain_transaction_by_signature; +pub use queries::insert_chain_instruction; +pub use queries::list_chain_instructions_by_transaction_id; +pub use queries::list_recent_chain_slots; +pub use queries::list_recent_chain_transactions; +pub use queries::upsert_chain_slot; +pub use queries::upsert_chain_transaction; +pub use types::KbAnalysisSignalSeverity; +pub use types::KbDatabaseBackend; +pub use types::KbDbRuntimeEventLevel; +pub use types::KbLiquidityEventKind; +pub use types::KbObservationSourceKind; +pub use types::KbObservedTokenStatus; +pub use types::KbPoolKind; +pub use types::KbPoolStatus; +pub use types::KbPoolTokenRole; +pub use types::KbSwapTradeSide; diff --git a/kb_lib/src/db/dtos.rs b/kb_lib/src/db/dtos.rs index 5f5e82c..45441ba 100644 --- a/kb_lib/src/db/dtos.rs +++ b/kb_lib/src/db/dtos.rs @@ -19,21 +19,27 @@ mod swap; mod token; mod token_burn_event; mod token_mint_event; +mod chain_instruction; +mod chain_slot; +mod chain_transaction; -pub use crate::db::dtos::analysis_signal::KbAnalysisSignalDto; -pub use crate::db::dtos::db_metadata::KbDbMetadataDto; -pub use crate::db::dtos::db_runtime_event::KbDbRuntimeEventDto; -pub use crate::db::dtos::dex::KbDexDto; -pub use crate::db::dtos::known_http_endpoint::KbKnownHttpEndpointDto; -pub use crate::db::dtos::known_ws_endpoint::KbKnownWsEndpointDto; -pub use crate::db::dtos::liquidity_event::KbLiquidityEventDto; -pub use crate::db::dtos::observed_token::KbObservedTokenDto; -pub use crate::db::dtos::onchain_observation::KbOnchainObservationDto; -pub use crate::db::dtos::pair::KbPairDto; -pub use crate::db::dtos::pool::KbPoolDto; -pub use crate::db::dtos::pool_listing::KbPoolListingDto; -pub use crate::db::dtos::pool_token::KbPoolTokenDto; -pub use crate::db::dtos::swap::KbSwapDto; -pub use crate::db::dtos::token::KbTokenDto; -pub use crate::db::dtos::token_burn_event::KbTokenBurnEventDto; -pub use crate::db::dtos::token_mint_event::KbTokenMintEventDto; +pub use analysis_signal::KbAnalysisSignalDto; +pub use db_metadata::KbDbMetadataDto; +pub use db_runtime_event::KbDbRuntimeEventDto; +pub use dex::KbDexDto; +pub use known_http_endpoint::KbKnownHttpEndpointDto; +pub use known_ws_endpoint::KbKnownWsEndpointDto; +pub use liquidity_event::KbLiquidityEventDto; +pub use observed_token::KbObservedTokenDto; +pub use onchain_observation::KbOnchainObservationDto; +pub use pair::KbPairDto; +pub use pool::KbPoolDto; +pub use pool_listing::KbPoolListingDto; +pub use pool_token::KbPoolTokenDto; +pub use swap::KbSwapDto; +pub use token::KbTokenDto; +pub use token_burn_event::KbTokenBurnEventDto; +pub use token_mint_event::KbTokenMintEventDto; +pub use chain_instruction::KbChainInstructionDto; +pub use chain_slot::KbChainSlotDto; +pub use chain_transaction::KbChainTransactionDto; diff --git a/kb_lib/src/db/dtos/chain_instruction.rs b/kb_lib/src/db/dtos/chain_instruction.rs new file mode 100644 index 0000000..f0cdbc4 --- /dev/null +++ b/kb_lib/src/db/dtos/chain_instruction.rs @@ -0,0 +1,140 @@ +// file: kb_lib/src/db/dtos/chain_instruction.rs + +//! Application-facing normalized chain instruction DTO. + +/// Application-facing normalized chain instruction DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbChainInstructionDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Parent transaction id. + pub transaction_id: i64, + /// Optional parent instruction id for inner instructions. + pub parent_instruction_id: std::option::Option, + /// Outer instruction index. + pub instruction_index: u32, + /// Optional inner instruction index. + pub inner_instruction_index: std::option::Option, + /// Optional program id. + pub program_id: std::option::Option, + /// Optional program name. + pub program_name: std::option::Option, + /// Optional stack height. + pub stack_height: std::option::Option, + /// Serialized accounts JSON array. + pub accounts_json: std::string::String, + /// Optional serialized data JSON. + pub data_json: std::option::Option, + /// Optional parsed type. + pub parsed_type: std::option::Option, + /// Optional serialized parsed JSON. + pub parsed_json: std::option::Option, + /// Creation timestamp. + pub created_at: chrono::DateTime, +} + +impl KbChainInstructionDto { + /// Creates a new chain instruction DTO. + #[allow(clippy::too_many_arguments)] + pub fn new( + transaction_id: i64, + parent_instruction_id: std::option::Option, + instruction_index: u32, + inner_instruction_index: std::option::Option, + program_id: std::option::Option, + program_name: std::option::Option, + stack_height: std::option::Option, + accounts_json: std::string::String, + data_json: std::option::Option, + parsed_type: std::option::Option, + parsed_json: std::option::Option, + ) -> Self { + Self { + id: None, + transaction_id, + parent_instruction_id, + instruction_index, + inner_instruction_index, + program_id, + program_name, + stack_height, + accounts_json, + data_json, + parsed_type, + parsed_json, + created_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbChainInstructionDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbChainInstructionEntity) -> Result { + let instruction_index_result = u32::try_from(entity.instruction_index); + let instruction_index = match instruction_index_result { + Ok(instruction_index) => instruction_index, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain instruction instruction_index '{}' to u32: {}", + entity.instruction_index, error + ))); + } + }; + let inner_instruction_index = match entity.inner_instruction_index { + Some(inner_instruction_index) => { + let inner_instruction_index_result = u32::try_from(inner_instruction_index); + match inner_instruction_index_result { + Ok(inner_instruction_index) => Some(inner_instruction_index), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain instruction inner_instruction_index '{}' to u32: {}", + inner_instruction_index, error + ))); + } + } + } + None => None, + }; + let stack_height = match entity.stack_height { + Some(stack_height) => { + let stack_height_result = u32::try_from(stack_height); + match stack_height_result { + Ok(stack_height) => Some(stack_height), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain instruction stack_height '{}' to u32: {}", + stack_height, error + ))); + } + } + } + None => None, + }; + let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at); + let created_at = match created_at_result { + Ok(created_at) => created_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse chain instruction created_at '{}': {}", + entity.created_at, error + ))); + } + }; + Ok(Self { + id: Some(entity.id), + transaction_id: entity.transaction_id, + parent_instruction_id: entity.parent_instruction_id, + instruction_index, + inner_instruction_index, + program_id: entity.program_id, + program_name: entity.program_name, + stack_height, + accounts_json: entity.accounts_json, + data_json: entity.data_json, + parsed_type: entity.parsed_type, + parsed_json: entity.parsed_json, + created_at, + }) + } +} diff --git a/kb_lib/src/db/dtos/chain_slot.rs b/kb_lib/src/db/dtos/chain_slot.rs new file mode 100644 index 0000000..e8bf863 --- /dev/null +++ b/kb_lib/src/db/dtos/chain_slot.rs @@ -0,0 +1,95 @@ +// file: kb_lib/src/db/dtos/chain_slot.rs + +//! Application-facing normalized chain slot DTO. + +/// Application-facing normalized chain slot DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbChainSlotDto { + /// Slot number. + pub slot: u64, + /// Optional parent slot number. + pub parent_slot: std::option::Option, + /// Optional block time in unix seconds. + pub block_time_unix: std::option::Option, + /// Creation timestamp. + pub created_at: chrono::DateTime, + /// Update timestamp. + pub updated_at: chrono::DateTime, +} + +impl KbChainSlotDto { + /// Creates a new chain slot DTO. + pub fn new( + slot: u64, + parent_slot: std::option::Option, + block_time_unix: std::option::Option, + ) -> Self { + let now = chrono::Utc::now(); + Self { + slot, + parent_slot, + block_time_unix, + created_at: now, + updated_at: now, + } + } +} + +impl TryFrom for KbChainSlotDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbChainSlotEntity) -> Result { + let slot_result = u64::try_from(entity.slot); + let slot = match slot_result { + Ok(slot) => slot, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain slot '{}' to u64: {}", + entity.slot, error + ))); + } + }; + let parent_slot = match entity.parent_slot { + Some(parent_slot) => { + let parent_slot_result = u64::try_from(parent_slot); + match parent_slot_result { + Ok(parent_slot) => Some(parent_slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain parent_slot '{}' to u64: {}", + parent_slot, error + ))); + } + } + } + None => None, + }; + let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at); + let created_at = match created_at_result { + Ok(created_at) => created_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse chain slot created_at '{}': {}", + entity.created_at, error + ))); + } + }; + let updated_at_result = chrono::DateTime::parse_from_rfc3339(&entity.updated_at); + let updated_at = match updated_at_result { + Ok(updated_at) => updated_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse chain slot updated_at '{}': {}", + entity.updated_at, error + ))); + } + }; + Ok(Self { + slot, + parent_slot, + block_time_unix: entity.block_time_unix, + created_at, + updated_at, + }) + } +} diff --git a/kb_lib/src/db/dtos/chain_transaction.rs b/kb_lib/src/db/dtos/chain_transaction.rs new file mode 100644 index 0000000..8152927 --- /dev/null +++ b/kb_lib/src/db/dtos/chain_transaction.rs @@ -0,0 +1,115 @@ +// file: kb_lib/src/db/dtos/chain_transaction.rs + +//! Application-facing normalized chain transaction DTO. + +/// Application-facing normalized chain transaction DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbChainTransactionDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Transaction signature. + pub signature: std::string::String, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional block time in unix seconds. + pub block_time_unix: std::option::Option, + /// Optional source endpoint name. + pub source_endpoint_name: std::option::Option, + /// Optional version text. + pub version_text: std::option::Option, + /// Optional serialized transaction error JSON. + pub err_json: std::option::Option, + /// Optional serialized meta JSON. + pub meta_json: std::option::Option, + /// Serialized full transaction JSON. + pub transaction_json: std::string::String, + /// Creation timestamp. + pub created_at: chrono::DateTime, + /// Update timestamp. + pub updated_at: chrono::DateTime, +} + +impl KbChainTransactionDto { + /// Creates a new chain transaction DTO. + #[allow(clippy::too_many_arguments)] + pub fn new( + signature: std::string::String, + slot: std::option::Option, + block_time_unix: std::option::Option, + source_endpoint_name: std::option::Option, + version_text: std::option::Option, + err_json: std::option::Option, + meta_json: std::option::Option, + transaction_json: std::string::String, + ) -> Self { + let now = chrono::Utc::now(); + Self { + id: None, + signature, + slot, + block_time_unix, + source_endpoint_name, + version_text, + err_json, + meta_json, + transaction_json, + created_at: now, + updated_at: now, + } + } +} + +impl TryFrom for KbChainTransactionDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbChainTransactionEntity) -> Result { + let slot = match entity.slot { + Some(slot) => { + let slot_result = u64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain transaction slot '{}' to u64: {}", + slot, error + ))); + } + } + } + None => None, + }; + let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at); + let created_at = match created_at_result { + Ok(created_at) => created_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse chain transaction created_at '{}': {}", + entity.created_at, error + ))); + } + }; + let updated_at_result = chrono::DateTime::parse_from_rfc3339(&entity.updated_at); + let updated_at = match updated_at_result { + Ok(updated_at) => updated_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse chain transaction updated_at '{}': {}", + entity.updated_at, error + ))); + } + }; + Ok(Self { + id: Some(entity.id), + signature: entity.signature, + slot, + block_time_unix: entity.block_time_unix, + source_endpoint_name: entity.source_endpoint_name, + version_text: entity.version_text, + err_json: entity.err_json, + meta_json: entity.meta_json, + transaction_json: entity.transaction_json, + created_at, + updated_at, + }) + } +} diff --git a/kb_lib/src/db/entities.rs b/kb_lib/src/db/entities.rs index da4a988..586132e 100644 --- a/kb_lib/src/db/entities.rs +++ b/kb_lib/src/db/entities.rs @@ -21,21 +21,27 @@ mod swap; mod token; mod token_burn_event; mod token_mint_event; +mod chain_instruction; +mod chain_slot; +mod chain_transaction; -pub use crate::db::entities::analysis_signal::KbAnalysisSignalEntity; -pub use crate::db::entities::db_metadata::KbDbMetadataEntity; -pub use crate::db::entities::db_runtime_event::KbDbRuntimeEventEntity; -pub use crate::db::entities::dex::KbDexEntity; -pub use crate::db::entities::known_http_endpoint::KbKnownHttpEndpointEntity; -pub use crate::db::entities::known_ws_endpoint::KbKnownWsEndpointEntity; -pub use crate::db::entities::liquidity_event::KbLiquidityEventEntity; -pub use crate::db::entities::observed_token::KbObservedTokenEntity; -pub use crate::db::entities::onchain_observation::KbOnchainObservationEntity; -pub use crate::db::entities::pair::KbPairEntity; -pub use crate::db::entities::pool::KbPoolEntity; -pub use crate::db::entities::pool_listing::KbPoolListingEntity; -pub use crate::db::entities::pool_token::KbPoolTokenEntity; -pub use crate::db::entities::swap::KbSwapEntity; -pub use crate::db::entities::token::KbTokenEntity; -pub use crate::db::entities::token_burn_event::KbTokenBurnEventEntity; -pub use crate::db::entities::token_mint_event::KbTokenMintEventEntity; +pub use analysis_signal::KbAnalysisSignalEntity; +pub use db_metadata::KbDbMetadataEntity; +pub use db_runtime_event::KbDbRuntimeEventEntity; +pub use dex::KbDexEntity; +pub use known_http_endpoint::KbKnownHttpEndpointEntity; +pub use known_ws_endpoint::KbKnownWsEndpointEntity; +pub use liquidity_event::KbLiquidityEventEntity; +pub use observed_token::KbObservedTokenEntity; +pub use onchain_observation::KbOnchainObservationEntity; +pub use pair::KbPairEntity; +pub use pool::KbPoolEntity; +pub use pool_listing::KbPoolListingEntity; +pub use pool_token::KbPoolTokenEntity; +pub use swap::KbSwapEntity; +pub use token::KbTokenEntity; +pub use token_burn_event::KbTokenBurnEventEntity; +pub use token_mint_event::KbTokenMintEventEntity; +pub use chain_instruction::KbChainInstructionEntity; +pub use chain_slot::KbChainSlotEntity; +pub use chain_transaction::KbChainTransactionEntity; diff --git a/kb_lib/src/db/entities/chain_instruction.rs b/kb_lib/src/db/entities/chain_instruction.rs new file mode 100644 index 0000000..968562c --- /dev/null +++ b/kb_lib/src/db/entities/chain_instruction.rs @@ -0,0 +1,34 @@ +// file: kb_lib/src/db/entities/chain_instruction.rs + +//! Database entity for one normalized Solana instruction row. + +/// Persisted Solana instruction row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbChainInstructionEntity { + /// Internal row id. + pub id: i64, + /// Parent transaction id. + pub transaction_id: i64, + /// Optional parent instruction id for inner instructions. + pub parent_instruction_id: std::option::Option, + /// Top-level instruction index. + pub instruction_index: i64, + /// Optional inner instruction index. + pub inner_instruction_index: std::option::Option, + /// Optional program id. + pub program_id: std::option::Option, + /// Optional program name. + pub program_name: std::option::Option, + /// Optional stack height. + pub stack_height: std::option::Option, + /// Accounts JSON array. + pub accounts_json: std::string::String, + /// Optional data JSON. + pub data_json: std::option::Option, + /// Optional parsed type. + pub parsed_type: std::option::Option, + /// Optional parsed payload JSON. + pub parsed_json: std::option::Option, + /// Creation timestamp. + pub created_at: std::string::String, +} diff --git a/kb_lib/src/db/entities/chain_slot.rs b/kb_lib/src/db/entities/chain_slot.rs new file mode 100644 index 0000000..f589722 --- /dev/null +++ b/kb_lib/src/db/entities/chain_slot.rs @@ -0,0 +1,19 @@ + +// file: kb_lib/src/db/entities/chain_slot.rs + +//! Database entity for one observed Solana slot. + +/// Persisted Solana slot row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbChainSlotEntity { + /// Slot number. + pub slot: i64, + /// Optional parent slot number. + pub parent_slot: std::option::Option, + /// Optional block time in unix seconds. + pub block_time_unix: std::option::Option, + /// Creation timestamp. + pub created_at: std::string::String, + /// Update timestamp. + pub updated_at: std::string::String, +} diff --git a/kb_lib/src/db/entities/chain_transaction.rs b/kb_lib/src/db/entities/chain_transaction.rs new file mode 100644 index 0000000..020e105 --- /dev/null +++ b/kb_lib/src/db/entities/chain_transaction.rs @@ -0,0 +1,30 @@ +// file: kb_lib/src/db/entities/chain_transaction.rs + +//! Database entity for one resolved Solana transaction. + +/// Persisted Solana transaction row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbChainTransactionEntity { + /// Internal row id. + pub id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Optional slot number. + pub slot: std::option::Option, + /// Optional block time in unix seconds. + pub block_time_unix: std::option::Option, + /// Optional logical endpoint name that resolved the transaction. + pub source_endpoint_name: std::option::Option, + /// Optional version text. + pub version_text: std::option::Option, + /// Optional transaction error JSON. + pub err_json: std::option::Option, + /// Optional transaction meta JSON. + pub meta_json: std::option::Option, + /// Full transaction JSON. + pub transaction_json: std::string::String, + /// Creation timestamp. + pub created_at: std::string::String, + /// Update timestamp. + pub updated_at: std::string::String, +} diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs index 65d5c06..d55f656 100644 --- a/kb_lib/src/db/queries.rs +++ b/kb_lib/src/db/queries.rs @@ -23,46 +23,58 @@ mod swap; mod token; mod token_burn_event; mod token_mint_event; +mod chain_instruction; +mod chain_slot; +mod chain_transaction; -pub use crate::db::queries::analysis_signal::insert_analysis_signal; -pub use crate::db::queries::analysis_signal::list_recent_analysis_signals; -pub use crate::db::queries::db_metadata::get_db_metadata; -pub use crate::db::queries::db_metadata::list_db_metadata; -pub use crate::db::queries::db_metadata::upsert_db_metadata; -pub use crate::db::queries::db_runtime_event::insert_db_runtime_event; -pub use crate::db::queries::db_runtime_event::list_recent_db_runtime_events; -pub use crate::db::queries::dex::get_dex_by_code; -pub use crate::db::queries::dex::list_dexes; -pub use crate::db::queries::dex::upsert_dex; -pub use crate::db::queries::known_http_endpoint::get_known_http_endpoint; -pub use crate::db::queries::known_http_endpoint::list_known_http_endpoints; -pub use crate::db::queries::known_http_endpoint::upsert_known_http_endpoint; -pub use crate::db::queries::known_ws_endpoint::get_known_ws_endpoint; -pub use crate::db::queries::known_ws_endpoint::list_known_ws_endpoints; -pub use crate::db::queries::known_ws_endpoint::upsert_known_ws_endpoint; -pub use crate::db::queries::liquidity_event::list_recent_liquidity_events; -pub use crate::db::queries::liquidity_event::upsert_liquidity_event; -pub use crate::db::queries::observed_token::get_observed_token_by_mint; -pub use crate::db::queries::observed_token::list_observed_tokens; -pub use crate::db::queries::observed_token::upsert_observed_token; -pub use crate::db::queries::onchain_observation::insert_onchain_observation; -pub use crate::db::queries::onchain_observation::list_recent_onchain_observations; -pub use crate::db::queries::pair::get_pair_by_pool_id; -pub use crate::db::queries::pair::list_pairs; -pub use crate::db::queries::pair::upsert_pair; -pub use crate::db::queries::pool::get_pool_by_address; -pub use crate::db::queries::pool::list_pools; -pub use crate::db::queries::pool::upsert_pool; -pub use crate::db::queries::pool_listing::get_pool_listing_by_pool_id; -pub use crate::db::queries::pool_listing::list_pool_listings; -pub use crate::db::queries::pool_listing::upsert_pool_listing; -pub use crate::db::queries::pool_token::list_pool_tokens_by_pool_id; -pub use crate::db::queries::pool_token::upsert_pool_token; -pub use crate::db::queries::swap::list_recent_swaps; -pub use crate::db::queries::swap::upsert_swap; -pub use crate::db::queries::token::get_token_by_mint; -pub use crate::db::queries::token::upsert_token; -pub use crate::db::queries::token_burn_event::list_recent_token_burn_events; -pub use crate::db::queries::token_burn_event::upsert_token_burn_event; -pub use crate::db::queries::token_mint_event::list_recent_token_mint_events; -pub use crate::db::queries::token_mint_event::upsert_token_mint_event; +pub use analysis_signal::insert_analysis_signal; +pub use analysis_signal::list_recent_analysis_signals; +pub use db_metadata::get_db_metadata; +pub use db_metadata::list_db_metadata; +pub use db_metadata::upsert_db_metadata; +pub use db_runtime_event::insert_db_runtime_event; +pub use db_runtime_event::list_recent_db_runtime_events; +pub use dex::get_dex_by_code; +pub use dex::list_dexes; +pub use dex::upsert_dex; +pub use known_http_endpoint::get_known_http_endpoint; +pub use known_http_endpoint::list_known_http_endpoints; +pub use known_http_endpoint::upsert_known_http_endpoint; +pub use known_ws_endpoint::get_known_ws_endpoint; +pub use known_ws_endpoint::list_known_ws_endpoints; +pub use known_ws_endpoint::upsert_known_ws_endpoint; +pub use liquidity_event::list_recent_liquidity_events; +pub use liquidity_event::upsert_liquidity_event; +pub use observed_token::get_observed_token_by_mint; +pub use observed_token::list_observed_tokens; +pub use observed_token::upsert_observed_token; +pub use onchain_observation::insert_onchain_observation; +pub use onchain_observation::list_recent_onchain_observations; +pub use pair::get_pair_by_pool_id; +pub use pair::list_pairs; +pub use pair::upsert_pair; +pub use pool::get_pool_by_address; +pub use pool::list_pools; +pub use pool::upsert_pool; +pub use pool_listing::get_pool_listing_by_pool_id; +pub use pool_listing::list_pool_listings; +pub use pool_listing::upsert_pool_listing; +pub use pool_token::list_pool_tokens_by_pool_id; +pub use pool_token::upsert_pool_token; +pub use swap::list_recent_swaps; +pub use swap::upsert_swap; +pub use token::get_token_by_mint; +pub use token::upsert_token; +pub use token_burn_event::list_recent_token_burn_events; +pub use token_burn_event::upsert_token_burn_event; +pub use token_mint_event::list_recent_token_mint_events; +pub use token_mint_event::upsert_token_mint_event; +pub use chain_instruction::list_chain_instructions_by_transaction_id; +pub use chain_instruction::insert_chain_instruction; +pub use chain_instruction::delete_chain_instructions_by_transaction_id; +pub use chain_slot::list_recent_chain_slots; +pub use chain_slot::upsert_chain_slot; +pub use chain_slot::get_chain_slot; +pub use chain_transaction::list_recent_chain_transactions; +pub use chain_transaction::upsert_chain_transaction; +pub use chain_transaction::get_chain_transaction_by_signature; diff --git a/kb_lib/src/db/queries/chain_instruction.rs b/kb_lib/src/db/queries/chain_instruction.rs new file mode 100644 index 0000000..d17e4e5 --- /dev/null +++ b/kb_lib/src/db/queries/chain_instruction.rs @@ -0,0 +1,246 @@ +// file: kb_lib/src/db/queries/chain_instruction.rs + +//! Queries for `kb_chain_instructions`. + +/// Inserts one normalized chain instruction row. +pub async fn insert_chain_instruction( + database: &crate::KbDatabase, + dto: &crate::KbChainInstructionDto, +) -> Result { + let instruction_index_result = i64::from(dto.instruction_index); + let inner_instruction_index = match dto.inner_instruction_index { + Some(inner_instruction_index) => Some(i64::from(inner_instruction_index)), + None => None, + }; + let stack_height = match dto.stack_height { + Some(stack_height) => Some(i64::from(stack_height)), + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_chain_instructions ( + transaction_id, + parent_instruction_id, + instruction_index, + inner_instruction_index, + program_id, + program_name, + stack_height, + accounts_json, + data_json, + parsed_type, + parsed_json, + created_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(dto.transaction_id) + .bind(dto.parent_instruction_id) + .bind(instruction_index_result) + .bind(inner_instruction_index) + .bind(dto.program_id.clone()) + .bind(dto.program_name.clone()) + .bind(stack_height) + .bind(dto.accounts_json.clone()) + .bind(dto.data_json.clone()) + .bind(dto.parsed_type.clone()) + .bind(dto.parsed_json.clone()) + .bind(dto.created_at.to_rfc3339()) + .execute(pool) + .await; + let insert_result = match query_result { + Ok(insert_result) => insert_result, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot insert kb_chain_instructions on sqlite: {}", + error + ))); + } + }; + Ok(insert_result.last_insert_rowid()) + } + } +} + +/// Lists instructions for one transaction ordered from outer to inner. +pub async fn list_chain_instructions_by_transaction_id( + database: &crate::KbDatabase, + transaction_id: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + transaction_id, + parent_instruction_id, + instruction_index, + inner_instruction_index, + program_id, + program_name, + stack_height, + accounts_json, + data_json, + parsed_type, + parsed_json, + created_at +FROM kb_chain_instructions +WHERE transaction_id = ? +ORDER BY instruction_index ASC, inner_instruction_index ASC, id ASC + "#, + ) + .bind(transaction_id) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_chain_instructions for transaction_id '{}' on sqlite: {}", + transaction_id, error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbChainInstructionDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +/// Deletes all instructions for one transaction id. +pub async fn delete_chain_instructions_by_transaction_id( + database: &crate::KbDatabase, + transaction_id: i64, +) -> Result<(), crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +DELETE FROM kb_chain_instructions +WHERE transaction_id = ? + "#, + ) + .bind(transaction_id) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot delete kb_chain_instructions for transaction_id '{}' on sqlite: {}", + transaction_id, error + ))); + } + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> crate::KbDatabase { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("chain_instruction.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed") + } + + async fn make_transaction(database: &crate::KbDatabase) -> i64 { + let dto = crate::KbChainTransactionDto::new( + "sig-chain-instruction-1".to_string(), + None, + None, + Some("mainnet_public_http".to_string()), + Some("legacy".to_string()), + None, + None, + r#"{"transaction":{"message":{"instructions":[]}}}"#.to_string(), + ); + crate::upsert_chain_transaction(database, &dto) + .await + .expect("chain transaction upsert must succeed") + } + + #[tokio::test] + async fn chain_instruction_insert_list_delete_works() { + let database = make_database().await; + let transaction_id = make_transaction(&database).await; + let outer_dto = crate::KbChainInstructionDto::new( + transaction_id, + None, + 0, + None, + Some(crate::SPL_TOKEN_PROGRAM_ID.to_string()), + Some("spl-token".to_string()), + Some(1), + r#"["AccountA","AccountB"]"#.to_string(), + Some(r#""raw-data-outer""#.to_string()), + Some("transfer".to_string()), + Some(r#"{"type":"transfer","info":{"amount":"10"}}"#.to_string()), + ); + let outer_instruction_id = crate::insert_chain_instruction(&database, &outer_dto) + .await + .expect("outer instruction insert must succeed"); + assert!(outer_instruction_id > 0); + let inner_dto = crate::KbChainInstructionDto::new( + transaction_id, + Some(outer_instruction_id), + 0, + Some(0), + Some(crate::SPL_TOKEN_PROGRAM_ID.to_string()), + Some("spl-token".to_string()), + Some(2), + r#"["InnerA","InnerB"]"#.to_string(), + Some(r#""raw-data-inner""#.to_string()), + Some("mintTo".to_string()), + Some(r#"{"type":"mintTo","info":{"amount":"5"}}"#.to_string()), + ); + let inner_instruction_id = crate::insert_chain_instruction(&database, &inner_dto) + .await + .expect("inner instruction insert must succeed"); + assert!(inner_instruction_id > outer_instruction_id); + let listed = crate::list_chain_instructions_by_transaction_id(&database, transaction_id) + .await + .expect("chain instruction list must succeed"); + assert_eq!(listed.len(), 2); + assert_eq!(listed[0].parent_instruction_id, None); + assert_eq!(listed[0].instruction_index, 0); + assert_eq!(listed[0].inner_instruction_index, None); + assert_eq!(listed[0].parsed_type, Some("transfer".to_string())); + assert_eq!(listed[1].parent_instruction_id, Some(outer_instruction_id)); + assert_eq!(listed[1].instruction_index, 0); + assert_eq!(listed[1].inner_instruction_index, Some(0)); + assert_eq!(listed[1].parsed_type, Some("mintTo".to_string())); + crate::delete_chain_instructions_by_transaction_id(&database, transaction_id) + .await + .expect("chain instruction delete must succeed"); + let listed_after_delete = + crate::list_chain_instructions_by_transaction_id(&database, transaction_id) + .await + .expect("chain instruction list after delete must succeed"); + assert_eq!(listed_after_delete.len(), 0); + } +} diff --git a/kb_lib/src/db/queries/chain_slot.rs b/kb_lib/src/db/queries/chain_slot.rs new file mode 100644 index 0000000..403e59d --- /dev/null +++ b/kb_lib/src/db/queries/chain_slot.rs @@ -0,0 +1,221 @@ +// file: kb_lib/src/db/queries/chain_slot.rs + +//! Queries for `kb_chain_slots`. + +/// Inserts or updates one normalized chain slot row. +pub async fn upsert_chain_slot( + database: &crate::KbDatabase, + dto: &crate::KbChainSlotDto, +) -> Result { + let slot_result = i64::try_from(dto.slot); + let slot = match slot_result { + Ok(slot) => slot, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain slot '{}' to i64: {}", + dto.slot, error + ))); + } + }; + let parent_slot = match dto.parent_slot { + Some(parent_slot) => { + let parent_slot_result = i64::try_from(parent_slot); + match parent_slot_result { + Ok(parent_slot) => Some(parent_slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain parent_slot '{}' to i64: {}", + parent_slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_chain_slots ( + slot, + parent_slot, + block_time_unix, + created_at, + updated_at +) +VALUES (?, ?, ?, ?, ?) +ON CONFLICT(slot) DO UPDATE SET + parent_slot = excluded.parent_slot, + block_time_unix = excluded.block_time_unix, + updated_at = excluded.updated_at + "#, + ) + .bind(slot) + .bind(parent_slot) + .bind(dto.block_time_unix) + .bind(dto.created_at.to_rfc3339()) + .bind(dto.updated_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_chain_slots on sqlite: {}", + error + ))); + } + Ok(dto.slot) + } + } +} + +/// Reads one chain slot row by slot number. +pub async fn get_chain_slot( + database: &crate::KbDatabase, + slot: u64, +) -> Result, crate::KbError> { + let slot_result = i64::try_from(slot); + let slot_i64 = match slot_result { + Ok(slot_i64) => slot_i64, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert requested chain slot '{}' to i64: {}", + slot, error + ))); + } + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + slot, + parent_slot, + block_time_unix, + created_at, + updated_at +FROM kb_chain_slots +WHERE slot = ? +LIMIT 1 + "#, + ) + .bind(slot_i64) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot fetch kb_chain_slots for slot '{}' on sqlite: {}", + slot, error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbChainSlotDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + +/// Lists recent chain slots ordered from newest to oldest. +pub async fn list_recent_chain_slots( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + slot, + parent_slot, + block_time_unix, + created_at, + updated_at +FROM kb_chain_slots +ORDER BY slot DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_chain_slots on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbChainSlotDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> crate::KbDatabase { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("chain_slot.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed") + } + + #[tokio::test] + async fn chain_slot_roundtrip_works() { + let database = make_database().await; + let dto = crate::KbChainSlotDto::new(424242, Some(424241), Some(1_777_777_777)); + let slot = crate::upsert_chain_slot(&database, &dto) + .await + .expect("chain slot upsert must succeed"); + assert_eq!(slot, 424242); + let fetched = crate::get_chain_slot(&database, 424242) + .await + .expect("chain slot fetch must succeed") + .expect("chain slot must exist"); + assert_eq!(fetched.slot, 424242); + assert_eq!(fetched.parent_slot, Some(424241)); + assert_eq!(fetched.block_time_unix, Some(1_777_777_777)); + let listed = crate::list_recent_chain_slots(&database, 10) + .await + .expect("chain slot list must succeed"); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].slot, 424242); + assert_eq!(listed[0].parent_slot, Some(424241)); + assert_eq!(listed[0].block_time_unix, Some(1_777_777_777)); + } +} diff --git a/kb_lib/src/db/queries/chain_transaction.rs b/kb_lib/src/db/queries/chain_transaction.rs new file mode 100644 index 0000000..c2a99e0 --- /dev/null +++ b/kb_lib/src/db/queries/chain_transaction.rs @@ -0,0 +1,265 @@ +// file: kb_lib/src/db/queries/chain_transaction.rs + +//! Queries for `kb_chain_transactions`. + +/// Inserts or updates one normalized chain transaction row. +pub async fn upsert_chain_transaction( + database: &crate::KbDatabase, + dto: &crate::KbChainTransactionDto, +) -> Result { + let slot_i64 = match dto.slot { + Some(slot) => { + let slot_result = i64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert chain transaction slot '{}' to i64: {}", + slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_chain_transactions ( + signature, + slot, + block_time_unix, + source_endpoint_name, + version_text, + err_json, + meta_json, + transaction_json, + created_at, + updated_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(signature) DO UPDATE SET + slot = excluded.slot, + block_time_unix = excluded.block_time_unix, + source_endpoint_name = excluded.source_endpoint_name, + version_text = excluded.version_text, + err_json = excluded.err_json, + meta_json = excluded.meta_json, + transaction_json = excluded.transaction_json, + updated_at = excluded.updated_at + "#, + ) + .bind(dto.signature.clone()) + .bind(slot_i64) + .bind(dto.block_time_unix) + .bind(dto.source_endpoint_name.clone()) + .bind(dto.version_text.clone()) + .bind(dto.err_json.clone()) + .bind(dto.meta_json.clone()) + .bind(dto.transaction_json.clone()) + .bind(dto.created_at.to_rfc3339()) + .bind(dto.updated_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_chain_transactions on sqlite: {}", + error + ))); + } + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_chain_transactions +WHERE signature = ? +LIMIT 1 + "#, + ) + .bind(dto.signature.clone()) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_chain_transactions id for signature '{}' on sqlite: {}", + dto.signature, error + ))), + } + } + } +} + +/// Reads one chain transaction row by signature. +pub async fn get_chain_transaction_by_signature( + database: &crate::KbDatabase, + signature: &str, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + signature, + slot, + block_time_unix, + source_endpoint_name, + version_text, + err_json, + meta_json, + transaction_json, + created_at, + updated_at +FROM kb_chain_transactions +WHERE signature = ? +LIMIT 1 + "#, + ) + .bind(signature.to_string()) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot fetch kb_chain_transactions for signature '{}' on sqlite: {}", + signature, error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbChainTransactionDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + +/// Lists recent chain transactions ordered from newest to oldest. +pub async fn list_recent_chain_transactions( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + signature, + slot, + block_time_unix, + source_endpoint_name, + version_text, + err_json, + meta_json, + transaction_json, + created_at, + updated_at +FROM kb_chain_transactions +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_chain_transactions on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbChainTransactionDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> crate::KbDatabase { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("chain_transaction.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed") + } + + #[tokio::test] + async fn chain_transaction_roundtrip_works() { + let database = make_database().await; + let slot_dto = crate::KbChainSlotDto::new(515151, Some(515150), Some(1_700_000_001)); + crate::upsert_chain_slot(&database, &slot_dto) + .await + .expect("chain slot upsert must succeed"); + let dto = crate::KbChainTransactionDto::new( + "sig-chain-transaction-1".to_string(), + Some(515151), + Some(1_700_000_001), + Some("helius_primary_http".to_string()), + Some("0".to_string()), + None, + Some(r#"{"fee":5000}"#.to_string()), + r#"{"slot":515151,"transaction":{"message":{"instructions":[]}}}"#.to_string(), + ); + let transaction_id = crate::upsert_chain_transaction(&database, &dto) + .await + .expect("chain transaction upsert must succeed"); + assert!(transaction_id > 0); + let fetched = + crate::get_chain_transaction_by_signature(&database, "sig-chain-transaction-1") + .await + .expect("chain transaction fetch must succeed") + .expect("chain transaction must exist"); + assert_eq!(fetched.id, Some(transaction_id)); + assert_eq!(fetched.signature, "sig-chain-transaction-1"); + assert_eq!(fetched.slot, Some(515151)); + assert_eq!(fetched.block_time_unix, Some(1_700_000_001)); + assert_eq!( + fetched.source_endpoint_name, + Some("helius_primary_http".to_string()) + ); + assert_eq!(fetched.version_text, Some("0".to_string())); + assert_eq!(fetched.meta_json, Some(r#"{"fee":5000}"#.to_string())); + let listed = crate::list_recent_chain_transactions(&database, 10) + .await + .expect("chain transaction list must succeed"); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].signature, "sig-chain-transaction-1"); + assert_eq!(listed[0].slot, Some(515151)); + } +} diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs index 2068f29..05e1886 100644 --- a/kb_lib/src/db/schema.rs +++ b/kb_lib/src/db/schema.rs @@ -190,10 +190,35 @@ pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), cr if let Err(error) = result { return Err(error); } + let result = create_kb_chain_slots_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_chain_transactions_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_chain_transactions_slot(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_chain_instructions_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_chain_instructions_transaction_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_chain_instructions_program_id(pool).await; + if let Err(error) = result { + return Err(error); + } let result = update_schema_version_metadata(database).await; if let Err(error) = result { return Err(error); } + Ok(()) } } @@ -1029,6 +1054,126 @@ ON kb_token_burn_events (executed_at) .await } +/// Creates `kb_chain_slots`. +async fn create_kb_chain_slots_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_chain_slots_table", + r#" +CREATE TABLE IF NOT EXISTS kb_chain_slots ( + slot INTEGER NOT NULL PRIMARY KEY, + parent_slot INTEGER NULL, + block_time_unix INTEGER NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +) + "#, + ) + .await +} + +/// Creates `kb_chain_transactions`. +async fn create_kb_chain_transactions_table( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_chain_transactions_table", + r#" +CREATE TABLE IF NOT EXISTS kb_chain_transactions ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + signature TEXT NOT NULL UNIQUE, + slot INTEGER NULL, + block_time_unix INTEGER NULL, + source_endpoint_name TEXT NULL, + version_text TEXT NULL, + err_json TEXT NULL, + meta_json TEXT NULL, + transaction_json TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(slot) REFERENCES kb_chain_slots(slot) +) + "#, + ) + .await +} + +/// Creates index on `kb_chain_transactions(slot)`. +async fn create_kb_idx_chain_transactions_slot( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_chain_transactions_slot", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_chain_transactions_slot +ON kb_chain_transactions (slot) + "#, + ) + .await +} + +/// Creates `kb_chain_instructions`. +async fn create_kb_chain_instructions_table( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_chain_instructions_table", + r#" +CREATE TABLE IF NOT EXISTS kb_chain_instructions ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + transaction_id INTEGER NOT NULL, + parent_instruction_id INTEGER NULL, + instruction_index INTEGER NOT NULL, + inner_instruction_index INTEGER NULL, + program_id TEXT NULL, + program_name TEXT NULL, + stack_height INTEGER NULL, + accounts_json TEXT NOT NULL, + data_json TEXT NULL, + parsed_type TEXT NULL, + parsed_json TEXT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY(transaction_id) REFERENCES kb_chain_transactions(id), + FOREIGN KEY(parent_instruction_id) REFERENCES kb_chain_instructions(id) +) + "#, + ) + .await +} + +/// Creates index on `kb_chain_instructions(transaction_id)`. +async fn create_kb_idx_chain_instructions_transaction_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_chain_instructions_transaction_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_chain_instructions_transaction_id +ON kb_chain_instructions (transaction_id) + "#, + ) + .await +} + +/// Creates index on `kb_chain_instructions(program_id)`. +async fn create_kb_idx_chain_instructions_program_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_chain_instructions_program_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_chain_instructions_program_id +ON kb_chain_instructions (program_id) + "#, + ) + .await +} + /// Updates the persisted schema version metadata entry. async fn update_schema_version_metadata( database: &crate::KbDatabase, diff --git a/kb_lib/src/db/types.rs b/kb_lib/src/db/types.rs index a429406..c1b6709 100644 --- a/kb_lib/src/db/types.rs +++ b/kb_lib/src/db/types.rs @@ -13,13 +13,13 @@ mod pool_token_role; mod runtime_event_level; mod swap_trade_side; -pub use crate::db::types::analysis_signal_severity::KbAnalysisSignalSeverity; -pub use crate::db::types::database_backend::KbDatabaseBackend; -pub use crate::db::types::liquidity_event_kind::KbLiquidityEventKind; -pub use crate::db::types::observation_source_kind::KbObservationSourceKind; -pub use crate::db::types::observed_token_status::KbObservedTokenStatus; -pub use crate::db::types::pool_kind::KbPoolKind; -pub use crate::db::types::pool_status::KbPoolStatus; -pub use crate::db::types::pool_token_role::KbPoolTokenRole; -pub use crate::db::types::runtime_event_level::KbDbRuntimeEventLevel; -pub use crate::db::types::swap_trade_side::KbSwapTradeSide; +pub use analysis_signal_severity::KbAnalysisSignalSeverity; +pub use database_backend::KbDatabaseBackend; +pub use liquidity_event_kind::KbLiquidityEventKind; +pub use observation_source_kind::KbObservationSourceKind; +pub use observed_token_status::KbObservedTokenStatus; +pub use pool_kind::KbPoolKind; +pub use pool_status::KbPoolStatus; +pub use pool_token_role::KbPoolTokenRole; +pub use runtime_event_level::KbDbRuntimeEventLevel; +pub use swap_trade_side::KbSwapTradeSide; diff --git a/kb_lib/src/detect.rs b/kb_lib/src/detect.rs index 0ac8b00..a7cf22f 100644 --- a/kb_lib/src/detect.rs +++ b/kb_lib/src/detect.rs @@ -11,15 +11,15 @@ mod solana_ws; mod types; mod ws_relay; -pub use crate::detect::service::KbDetectionPersistenceService; -pub use crate::detect::solana_ws::KbSolanaWsDetectionOutcome; -pub use crate::detect::solana_ws::KbSolanaWsDetectionService; -pub use crate::detect::types::KbDetectionObservationInput; -pub use crate::detect::types::KbDetectionPoolCandidateInput; -pub use crate::detect::types::KbDetectionPoolCandidateResult; -pub use crate::detect::types::KbDetectionSignalInput; -pub use crate::detect::types::KbDetectionTokenCandidateInput; -pub use crate::detect::types::KbDetectionTokenCandidateResult; -pub use crate::detect::ws_relay::KbWsDetectionNotificationEnvelope; -pub use crate::detect::ws_relay::KbWsDetectionRelay; -pub use crate::detect::ws_relay::KbWsDetectionRelayStats; +pub use service::KbDetectionPersistenceService; +pub use solana_ws::KbSolanaWsDetectionOutcome; +pub use solana_ws::KbSolanaWsDetectionService; +pub use types::KbDetectionObservationInput; +pub use types::KbDetectionPoolCandidateInput; +pub use types::KbDetectionPoolCandidateResult; +pub use types::KbDetectionSignalInput; +pub use types::KbDetectionTokenCandidateInput; +pub use types::KbDetectionTokenCandidateResult; +pub use ws_relay::KbWsDetectionNotificationEnvelope; +pub use ws_relay::KbWsDetectionRelay; +pub use ws_relay::KbWsDetectionRelayStats; diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index b9989fc..6e82ddc 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -22,156 +22,173 @@ mod http_pool; mod db; mod detect; mod tx_resolution; +mod tx_model; -pub use crate::constants::*; -pub use crate::error::KbError; -pub use crate::config::KbAppConfig; -pub use crate::config::KbConfig; -pub use crate::config::KbDataConfig; -pub use crate::config::KbHttpEndpointConfig; -pub use crate::config::KbLoggingConfig; -pub use crate::config::KbSolanaConfig; -pub use crate::config::KbWsEndpointConfig; -pub use crate::config::KbDatabaseConfig; -pub use crate::config::KbSqliteDatabaseConfig; -pub use crate::tracing::KbTracingGuard; -pub use crate::tracing::init_tracing; -pub use crate::types::KbConnectionState; -pub use crate::ws_client::WsClient; -pub use crate::ws_client::WsEvent; -pub use crate::ws_client::WsOutgoingMessage; -pub use crate::ws_client::WsSubscriptionInfo; -pub use crate::json_rpc_ws::KbJsonRpcWsErrorObject; -pub use crate::json_rpc_ws::KbJsonRpcWsErrorResponse; -pub use crate::json_rpc_ws::KbJsonRpcWsIncomingMessage; -pub use crate::json_rpc_ws::KbJsonRpcWsNotification; -pub use crate::json_rpc_ws::KbJsonRpcWsNotificationParams; -pub use crate::json_rpc_ws::KbJsonRpcWsRequest; -pub use crate::json_rpc_ws::KbJsonRpcWsSuccessResponse; -pub use crate::json_rpc_ws::kb_is_probable_json_rpc_object_text; -pub use crate::json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; -pub use crate::json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; -pub use crate::solana_pubsub_ws::KbSolanaWsTypedNotification; -pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification; -pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; -pub use crate::ws_manager::WsManagedEndpointSnapshot; -pub use crate::ws_manager::WsManager; -pub use crate::ws_manager::WsManagerSnapshot; -pub use crate::http_client::HttpClient; -pub use crate::http_client::KbJsonRpcHttpErrorObject; -pub use crate::http_client::KbJsonRpcHttpErrorResponse; -pub use crate::http_client::KbJsonRpcHttpRequest; -pub use crate::http_client::KbJsonRpcHttpResponse; -pub use crate::http_client::KbJsonRpcHttpSuccessResponse; -pub use crate::http_client::KbHttpEndpointStatus; -pub use crate::http_client::KbHttpMethodClass; -pub use crate::http_client::parse_kb_json_rpc_http_response_text; -pub use crate::http_client::parse_kb_json_rpc_http_response_value; -pub use crate::http_pool::HttpEndpointPool; -pub use crate::http_pool::KbHttpPoolClientSnapshot; -pub use crate::db::KbDatabase; -pub use crate::db::KbDatabaseBackend; -pub use crate::db::KbDatabaseConnection; -pub use crate::db::KbDbMetadataDto; -pub use crate::db::KbDbMetadataEntity; -pub use crate::db::get_db_metadata; -pub use crate::db::list_db_metadata; -pub use crate::db::upsert_db_metadata; -pub use crate::db::KbDbRuntimeEventDto; -pub use crate::db::KbDbRuntimeEventEntity; -pub use crate::db::KbDbRuntimeEventLevel; -pub use crate::db::KbKnownHttpEndpointDto; -pub use crate::db::KbKnownHttpEndpointEntity; -pub use crate::db::KbKnownWsEndpointDto; -pub use crate::db::KbKnownWsEndpointEntity; -pub use crate::db::KbObservedTokenDto; -pub use crate::db::KbObservedTokenEntity; -pub use crate::db::KbObservedTokenStatus; -pub use crate::db::KbAnalysisSignalDto; -pub use crate::db::KbAnalysisSignalEntity; -pub use crate::db::KbAnalysisSignalSeverity; -pub use crate::db::KbObservationSourceKind; -pub use crate::db::KbOnchainObservationDto; -pub use crate::db::KbOnchainObservationEntity; -pub use crate::db::KbDexDto; -pub use crate::db::KbDexEntity; -pub use crate::db::KbPairDto; -pub use crate::db::KbPairEntity; -pub use crate::db::KbPoolDto; -pub use crate::db::KbPoolEntity; -pub use crate::db::KbPoolKind; -pub use crate::db::KbPoolListingDto; -pub use crate::db::KbPoolListingEntity; -pub use crate::db::KbPoolStatus; -pub use crate::db::KbPoolTokenDto; -pub use crate::db::KbPoolTokenEntity; -pub use crate::db::KbPoolTokenRole; -pub use crate::db::KbTokenDto; -pub use crate::db::KbTokenEntity; -pub use crate::db::KbLiquidityEventDto; -pub use crate::db::KbLiquidityEventEntity; -pub use crate::db::KbLiquidityEventKind; -pub use crate::db::KbSwapDto; -pub use crate::db::KbSwapEntity; -pub use crate::db::KbSwapTradeSide; -pub use crate::db::KbTokenBurnEventDto; -pub use crate::db::KbTokenBurnEventEntity; -pub use crate::db::KbTokenMintEventDto; -pub use crate::db::KbTokenMintEventEntity; -pub use crate::db::list_recent_liquidity_events; -pub use crate::db::list_recent_swaps; -pub use crate::db::list_recent_token_burn_events; -pub use crate::db::list_recent_token_mint_events; -pub use crate::db::upsert_liquidity_event; -pub use crate::db::upsert_swap; -pub use crate::db::upsert_token_burn_event; -pub use crate::db::upsert_token_mint_event; -pub use crate::db::get_token_by_mint; -pub use crate::db::list_dexes; -pub use crate::db::upsert_dex; -pub use crate::db::upsert_pair; -pub use crate::db::upsert_pool; -pub use crate::db::upsert_pool_listing; -pub use crate::db::upsert_pool_token; -pub use crate::db::upsert_token; -pub use crate::db::insert_analysis_signal; -pub use crate::db::insert_onchain_observation; -pub use crate::db::list_recent_analysis_signals; -pub use crate::db::list_recent_onchain_observations; -pub use crate::db::get_observed_token_by_mint; -pub use crate::db::list_observed_tokens; -pub use crate::db::upsert_observed_token; -pub use crate::db::get_known_http_endpoint; -pub use crate::db::get_known_ws_endpoint; -pub use crate::db::insert_db_runtime_event; -pub use crate::db::list_known_http_endpoints; -pub use crate::db::list_known_ws_endpoints; -pub use crate::db::list_recent_db_runtime_events; -pub use crate::db::upsert_known_http_endpoint; -pub use crate::db::upsert_known_ws_endpoint; -pub use crate::db::get_dex_by_code; -pub use crate::db::get_pair_by_pool_id; -pub use crate::db::get_pool_by_address; -pub use crate::db::get_pool_listing_by_pool_id; -pub use crate::db::list_pairs; -pub use crate::db::list_pool_listings; -pub use crate::db::list_pool_tokens_by_pool_id; -pub use crate::db::list_pools; -pub use crate::detect::KbDetectionObservationInput; -pub use crate::detect::KbDetectionPersistenceService; -pub use crate::detect::KbDetectionSignalInput; -pub use crate::detect::KbDetectionTokenCandidateInput; -pub use crate::detect::KbDetectionTokenCandidateResult; -pub use crate::detect::KbSolanaWsDetectionOutcome; -pub use crate::detect::KbSolanaWsDetectionService; -pub use crate::detect::KbWsDetectionNotificationEnvelope; -pub use crate::detect::KbWsDetectionRelay; -pub use crate::detect::KbWsDetectionRelayStats; -pub use crate::detect::KbDetectionPoolCandidateInput; -pub use crate::detect::KbDetectionPoolCandidateResult; -pub use crate::tx_resolution::KbTransactionResolutionOutcome; -pub use crate::tx_resolution::KbTransactionResolutionRequest; -pub use crate::tx_resolution::KbTransactionResolutionService; -pub use crate::tx_resolution::KbWsTransactionResolutionEnvelope; -pub use crate::tx_resolution::KbWsTransactionResolutionRelay; -pub use crate::tx_resolution::KbWsTransactionResolutionRelayStats; +pub use constants::*; +pub use error::KbError; +pub use config::KbAppConfig; +pub use config::KbConfig; +pub use config::KbDataConfig; +pub use config::KbHttpEndpointConfig; +pub use config::KbLoggingConfig; +pub use config::KbSolanaConfig; +pub use config::KbWsEndpointConfig; +pub use config::KbDatabaseConfig; +pub use config::KbSqliteDatabaseConfig; +pub use tracing::KbTracingGuard; +pub use tracing::init_tracing; +pub use types::KbConnectionState; +pub use ws_client::WsClient; +pub use ws_client::WsEvent; +pub use ws_client::WsOutgoingMessage; +pub use ws_client::WsSubscriptionInfo; +pub use json_rpc_ws::KbJsonRpcWsErrorObject; +pub use json_rpc_ws::KbJsonRpcWsErrorResponse; +pub use json_rpc_ws::KbJsonRpcWsIncomingMessage; +pub use json_rpc_ws::KbJsonRpcWsNotification; +pub use json_rpc_ws::KbJsonRpcWsNotificationParams; +pub use json_rpc_ws::KbJsonRpcWsRequest; +pub use json_rpc_ws::KbJsonRpcWsSuccessResponse; +pub use json_rpc_ws::kb_is_probable_json_rpc_object_text; +pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text; +pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value; +pub use solana_pubsub_ws::KbSolanaWsTypedNotification; +pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification; +pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event; +pub use ws_manager::WsManagedEndpointSnapshot; +pub use ws_manager::WsManager; +pub use ws_manager::WsManagerSnapshot; +pub use http_client::HttpClient; +pub use http_client::KbJsonRpcHttpErrorObject; +pub use http_client::KbJsonRpcHttpErrorResponse; +pub use http_client::KbJsonRpcHttpRequest; +pub use http_client::KbJsonRpcHttpResponse; +pub use http_client::KbJsonRpcHttpSuccessResponse; +pub use http_client::KbHttpEndpointStatus; +pub use http_client::KbHttpMethodClass; +pub use http_client::parse_kb_json_rpc_http_response_text; +pub use http_client::parse_kb_json_rpc_http_response_value; +pub use http_pool::HttpEndpointPool; +pub use http_pool::KbHttpPoolClientSnapshot; +pub use db::KbDatabase; +pub use db::KbDatabaseBackend; +pub use db::KbDatabaseConnection; +pub use db::KbDbMetadataDto; +pub use db::KbDbMetadataEntity; +pub use db::get_db_metadata; +pub use db::list_db_metadata; +pub use db::upsert_db_metadata; +pub use db::KbDbRuntimeEventDto; +pub use db::KbDbRuntimeEventEntity; +pub use db::KbDbRuntimeEventLevel; +pub use db::KbKnownHttpEndpointDto; +pub use db::KbKnownHttpEndpointEntity; +pub use db::KbKnownWsEndpointDto; +pub use db::KbKnownWsEndpointEntity; +pub use db::KbObservedTokenDto; +pub use db::KbObservedTokenEntity; +pub use db::KbObservedTokenStatus; +pub use db::KbAnalysisSignalDto; +pub use db::KbAnalysisSignalEntity; +pub use db::KbAnalysisSignalSeverity; +pub use db::KbObservationSourceKind; +pub use db::KbOnchainObservationDto; +pub use db::KbOnchainObservationEntity; +pub use db::KbDexDto; +pub use db::KbDexEntity; +pub use db::KbPairDto; +pub use db::KbPairEntity; +pub use db::KbPoolDto; +pub use db::KbPoolEntity; +pub use db::KbPoolKind; +pub use db::KbPoolListingDto; +pub use db::KbPoolListingEntity; +pub use db::KbPoolStatus; +pub use db::KbPoolTokenDto; +pub use db::KbPoolTokenEntity; +pub use db::KbPoolTokenRole; +pub use db::KbTokenDto; +pub use db::KbTokenEntity; +pub use db::KbLiquidityEventDto; +pub use db::KbLiquidityEventEntity; +pub use db::KbLiquidityEventKind; +pub use db::KbSwapDto; +pub use db::KbSwapEntity; +pub use db::KbSwapTradeSide; +pub use db::KbTokenBurnEventDto; +pub use db::KbTokenBurnEventEntity; +pub use db::KbTokenMintEventDto; +pub use db::KbTokenMintEventEntity; +pub use db::KbChainInstructionDto; +pub use db::KbChainSlotDto; +pub use db::KbChainTransactionDto; +pub use db::KbChainInstructionEntity; +pub use db::KbChainSlotEntity; +pub use db::KbChainTransactionEntity; +pub use db::delete_chain_instructions_by_transaction_id; +pub use db::get_chain_slot; +pub use db::get_chain_transaction_by_signature; +pub use db::insert_chain_instruction; +pub use db::list_chain_instructions_by_transaction_id; +pub use db::list_recent_chain_slots; +pub use db::list_recent_chain_transactions; +pub use db::upsert_chain_slot; +pub use db::upsert_chain_transaction; +pub use db::list_recent_liquidity_events; +pub use db::list_recent_swaps; +pub use db::list_recent_token_burn_events; +pub use db::list_recent_token_mint_events; +pub use db::upsert_liquidity_event; +pub use db::upsert_swap; +pub use db::upsert_token_burn_event; +pub use db::upsert_token_mint_event; +pub use db::get_token_by_mint; +pub use db::list_dexes; +pub use db::upsert_dex; +pub use db::upsert_pair; +pub use db::upsert_pool; +pub use db::upsert_pool_listing; +pub use db::upsert_pool_token; +pub use db::upsert_token; +pub use db::insert_analysis_signal; +pub use db::insert_onchain_observation; +pub use db::list_recent_analysis_signals; +pub use db::list_recent_onchain_observations; +pub use db::get_observed_token_by_mint; +pub use db::list_observed_tokens; +pub use db::upsert_observed_token; +pub use db::get_known_http_endpoint; +pub use db::get_known_ws_endpoint; +pub use db::insert_db_runtime_event; +pub use db::list_known_http_endpoints; +pub use db::list_known_ws_endpoints; +pub use db::list_recent_db_runtime_events; +pub use db::upsert_known_http_endpoint; +pub use db::upsert_known_ws_endpoint; +pub use db::get_dex_by_code; +pub use db::get_pair_by_pool_id; +pub use db::get_pool_by_address; +pub use db::get_pool_listing_by_pool_id; +pub use db::list_pairs; +pub use db::list_pool_listings; +pub use db::list_pool_tokens_by_pool_id; +pub use db::list_pools; +pub use detect::KbDetectionObservationInput; +pub use detect::KbDetectionPersistenceService; +pub use detect::KbDetectionSignalInput; +pub use detect::KbDetectionTokenCandidateInput; +pub use detect::KbDetectionTokenCandidateResult; +pub use detect::KbSolanaWsDetectionOutcome; +pub use detect::KbSolanaWsDetectionService; +pub use detect::KbWsDetectionNotificationEnvelope; +pub use detect::KbWsDetectionRelay; +pub use detect::KbWsDetectionRelayStats; +pub use detect::KbDetectionPoolCandidateInput; +pub use detect::KbDetectionPoolCandidateResult; +pub use tx_resolution::KbTransactionResolutionOutcome; +pub use tx_resolution::KbTransactionResolutionRequest; +pub use tx_resolution::KbTransactionResolutionService; +pub use tx_resolution::KbWsTransactionResolutionEnvelope; +pub use tx_resolution::KbWsTransactionResolutionRelay; +pub use tx_resolution::KbWsTransactionResolutionRelayStats; +pub use tx_model::KbTransactionModelService; diff --git a/kb_lib/src/tx_model.rs b/kb_lib/src/tx_model.rs new file mode 100644 index 0000000..ca793e3 --- /dev/null +++ b/kb_lib/src/tx_model.rs @@ -0,0 +1,695 @@ +// file: kb_lib/src/tx_model.rs + +//! Projection of resolved transactions into normalized internal DB tables. + +/// Service projecting resolved transaction JSON into internal chain tables. +#[derive(Debug, Clone)] +pub struct KbTransactionModelService { + database: std::sync::Arc, +} + +impl KbTransactionModelService { + /// Creates one transaction model projection service. + pub fn new(database: std::sync::Arc) -> Self { + Self { database } + } + + /// Projects one resolved transaction JSON into slots / transactions / instructions tables. + pub async fn persist_resolved_transaction( + &self, + signature: &str, + source_endpoint_name: std::option::Option, + resolved_transaction: &serde_json::Value, + ) -> Result { + let slot_i64 = resolved_transaction + .get("slot") + .and_then(serde_json::Value::as_i64); + let slot_u64 = match slot_i64 { + Some(slot_i64) => { + let convert_result = u64::try_from(slot_i64); + match convert_result { + Ok(slot_u64) => Some(slot_u64), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert resolved transaction slot '{}' to u64: {}", + slot_i64, error + ))); + } + } + } + None => None, + }; + let block_time_unix = resolved_transaction + .get("blockTime") + .and_then(serde_json::Value::as_i64); + if let Some(slot_value) = slot_u64 { + let slot_dto = crate::KbChainSlotDto::new(slot_value, None, block_time_unix); + let upsert_result = crate::upsert_chain_slot(self.database.as_ref(), &slot_dto).await; + if let Err(error) = upsert_result { + return Err(error); + } + } + let version_text = kb_extract_version_text(resolved_transaction); + let err_json = kb_extract_meta_err_json(resolved_transaction); + let meta_json = kb_extract_meta_json(resolved_transaction); + let transaction_json_result = serde_json::to_string(resolved_transaction); + let transaction_json = match transaction_json_result { + Ok(transaction_json) => transaction_json, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot serialize resolved transaction '{}': {}", + signature, error + ))); + } + }; + let transaction_dto = crate::KbChainTransactionDto::new( + signature.to_string(), + slot_u64, + block_time_unix, + source_endpoint_name, + version_text, + err_json, + meta_json, + transaction_json, + ); + let transaction_id_result = + crate::upsert_chain_transaction(self.database.as_ref(), &transaction_dto).await; + let transaction_id = match transaction_id_result { + Ok(transaction_id) => transaction_id, + Err(error) => return Err(error), + }; + let delete_result = crate::delete_chain_instructions_by_transaction_id( + self.database.as_ref(), + transaction_id, + ) + .await; + if let Err(error) = delete_result { + return Err(error); + } + let outer_instructions = kb_extract_outer_instructions(resolved_transaction); + let mut outer_instruction_ids = std::collections::BTreeMap::::new(); + for (index, instruction) in outer_instructions.iter().enumerate() { + let index_result = u32::try_from(index); + let instruction_index = match index_result { + Ok(instruction_index) => instruction_index, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert outer instruction index '{}' to u32: {}", + index, error + ))); + } + }; + let dto_result = kb_build_instruction_dto( + transaction_id, + None, + instruction_index, + None, + instruction, + ); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + let instruction_id_result = + crate::insert_chain_instruction(self.database.as_ref(), &dto).await; + let instruction_id = match instruction_id_result { + Ok(instruction_id) => instruction_id, + Err(error) => return Err(error), + }; + outer_instruction_ids.insert(instruction_index, instruction_id); + } + let inner_instruction_groups = kb_extract_inner_instruction_groups(resolved_transaction); + for group in &inner_instruction_groups { + let outer_index = match group.index { + Some(outer_index) => outer_index, + None => continue, + }; + let parent_instruction_id_option = outer_instruction_ids.get(&outer_index).cloned(); + let parent_instruction_id = match parent_instruction_id_option { + Some(parent_instruction_id) => parent_instruction_id, + None => continue, + }; + for (inner_index, inner_instruction) in group.instructions.iter().enumerate() { + let inner_index_result = u32::try_from(inner_index); + let inner_instruction_index = match inner_index_result { + Ok(inner_instruction_index) => inner_instruction_index, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert inner instruction index '{}' to u32: {}", + inner_index, error + ))); + } + }; + let dto_result = kb_build_instruction_dto( + transaction_id, + Some(parent_instruction_id), + outer_index, + Some(inner_instruction_index), + inner_instruction, + ); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + let insert_result = + crate::insert_chain_instruction(self.database.as_ref(), &dto).await; + if let Err(error) = insert_result { + return Err(error); + } + } + } + Ok(transaction_id) + } +} + +#[derive(Debug, Clone)] +struct KbInnerInstructionGroup { + index: std::option::Option, + instructions: std::vec::Vec, +} + +fn kb_extract_version_text( + resolved_transaction: &serde_json::Value, +) -> std::option::Option { + let version_value_option = resolved_transaction.get("version"); + let version_value = match version_value_option { + Some(version_value) => version_value, + None => return None, + }; + if let Some(version_text) = version_value.as_str() { + return Some(version_text.to_string()); + } + if let Some(version_number) = version_value.as_i64() { + return Some(version_number.to_string()); + } + + None +} + +fn kb_extract_meta_json( + resolved_transaction: &serde_json::Value, +) -> std::option::Option { + let meta_option = resolved_transaction.get("meta"); + let meta = match meta_option { + Some(meta) => meta, + None => return None, + }; + let serialize_result = serde_json::to_string(meta); + match serialize_result { + Ok(json_text) => Some(json_text), + Err(_) => None, + } +} + +fn kb_extract_meta_err_json( + resolved_transaction: &serde_json::Value, +) -> std::option::Option { + let meta_option = resolved_transaction.get("meta"); + let meta = match meta_option { + Some(meta) => meta, + None => return None, + }; + let err_option = meta.get("err"); + let err = match err_option { + Some(err) => err, + None => return None, + }; + if err.is_null() { + return None; + } + let serialize_result = serde_json::to_string(err); + match serialize_result { + Ok(json_text) => Some(json_text), + Err(_) => None, + } +} + +fn kb_extract_outer_instructions( + resolved_transaction: &serde_json::Value, +) -> std::vec::Vec { + let mut instructions = std::vec::Vec::new(); + let transaction_option = resolved_transaction.get("transaction"); + let transaction = match transaction_option { + Some(transaction) => transaction, + None => return instructions, + }; + let message_option = transaction.get("message"); + let message = match message_option { + Some(message) => message, + None => return instructions, + }; + let instructions_option = message.get("instructions"); + let instructions_value = match instructions_option { + Some(instructions_value) => instructions_value, + None => return instructions, + }; + let array_option = instructions_value.as_array(); + let array = match array_option { + Some(array) => array, + None => return instructions, + }; + for instruction in array { + instructions.push(instruction.clone()); + } + instructions +} + +fn kb_extract_inner_instruction_groups( + resolved_transaction: &serde_json::Value, +) -> std::vec::Vec { + let mut groups = std::vec::Vec::new(); + + let meta_option = resolved_transaction.get("meta"); + let meta = match meta_option { + Some(meta) => meta, + None => return groups, + }; + let inner_option = meta.get("innerInstructions"); + let inner_value = match inner_option { + Some(inner_value) => inner_value, + None => return groups, + }; + let array_option = inner_value.as_array(); + let array = match array_option { + Some(array) => array, + None => return groups, + }; + for group in array { + let index = match group.get("index").and_then(serde_json::Value::as_i64) { + Some(index_i64) => { + let convert_result = u32::try_from(index_i64); + match convert_result { + Ok(index_u32) => Some(index_u32), + Err(_) => None, + } + } + None => None, + }; + let mut instructions = std::vec::Vec::new(); + let instructions_option = group.get("instructions"); + if let Some(instructions_value) = instructions_option { + if let Some(instructions_array) = instructions_value.as_array() { + for instruction in instructions_array { + instructions.push(instruction.clone()); + } + } + } + groups.push(KbInnerInstructionGroup { + index, + instructions, + }); + } + groups +} + +fn kb_build_instruction_dto( + transaction_id: i64, + parent_instruction_id: std::option::Option, + instruction_index: u32, + inner_instruction_index: std::option::Option, + instruction: &serde_json::Value, +) -> Result { + let program_id = instruction + .get("programId") + .and_then(serde_json::Value::as_str) + .map(std::string::ToString::to_string); + let program_name = instruction + .get("program") + .and_then(serde_json::Value::as_str) + .map(std::string::ToString::to_string); + let stack_height = match instruction + .get("stackHeight") + .and_then(serde_json::Value::as_i64) + { + Some(stack_height_i64) => { + let stack_height_result = u32::try_from(stack_height_i64); + match stack_height_result { + Ok(stack_height) => Some(stack_height), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert instruction stack_height '{}' to u32: {}", + stack_height_i64, error + ))); + } + } + } + None => None, + }; + let accounts_json = if let Some(accounts) = instruction.get("accounts") { + let serialize_result = serde_json::to_string(accounts); + match serialize_result { + Ok(text) => text, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot serialize instruction accounts json: {}", + error + ))); + } + } + } else { + "[]".to_string() + }; + let data_json = match instruction.get("data") { + Some(data_value) => { + let serialize_result = serde_json::to_string(data_value); + match serialize_result { + Ok(text) => Some(text), + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot serialize instruction data json: {}", + error + ))); + } + } + } + None => None, + }; + let parsed_type = instruction + .get("parsed") + .and_then(|parsed| parsed.get("type")) + .and_then(serde_json::Value::as_str) + .map(std::string::ToString::to_string); + let parsed_json = match instruction.get("parsed") { + Some(parsed_value) => { + let serialize_result = serde_json::to_string(parsed_value); + match serialize_result { + Ok(text) => Some(text), + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot serialize instruction parsed json: {}", + error + ))); + } + } + } + None => None, + }; + Ok(crate::KbChainInstructionDto::new( + transaction_id, + parent_instruction_id, + instruction_index, + inner_instruction_index, + program_id, + program_name, + stack_height, + accounts_json, + data_json, + parsed_type, + parsed_json, + )) +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("tx_model.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + std::sync::Arc::new(database) + } + + #[tokio::test] + async fn persist_resolved_transaction_projects_outer_instructions() { + let database = make_database().await; + let service = crate::KbTransactionModelService::new(database.clone()); + let resolved_transaction = serde_json::json!({ + "slot": 700001, + "blockTime": 1777000001, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::SPL_TOKEN_PROGRAM_ID, + "program": "spl-token", + "stackHeight": 1, + "accounts": ["A111", "B222"], + "data": "raw-ix-0", + "parsed": { + "type": "transfer", + "info": { "amount": "100" } + } + }, + { + "programId": "11111111111111111111111111111111", + "program": "system", + "stackHeight": 1, + "accounts": ["C333", "D444"], + "data": "raw-ix-1" + } + ] + } + }, + "meta": { + "err": null, + "fee": 5000 + } + }); + let transaction_id = service + .persist_resolved_transaction( + "sig-model-outer-1", + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await + .expect("persist_resolved_transaction must succeed"); + assert!(transaction_id > 0); + let slot = crate::get_chain_slot(database.as_ref(), 700001) + .await + .expect("chain slot fetch must succeed") + .expect("chain slot must exist"); + assert_eq!(slot.slot, 700001); + assert_eq!(slot.block_time_unix, Some(1777000001)); + let transaction = + crate::get_chain_transaction_by_signature(database.as_ref(), "sig-model-outer-1") + .await + .expect("chain transaction fetch must succeed") + .expect("chain transaction must exist"); + assert_eq!(transaction.id, Some(transaction_id)); + assert_eq!(transaction.slot, Some(700001)); + assert_eq!( + transaction.source_endpoint_name, + Some("helius_primary_http".to_string()) + ); + assert_eq!(transaction.version_text, Some("0".to_string())); + let instructions = + crate::list_chain_instructions_by_transaction_id(database.as_ref(), transaction_id) + .await + .expect("chain instructions list must succeed"); + assert_eq!(instructions.len(), 2); + assert_eq!(instructions[0].instruction_index, 0); + assert_eq!(instructions[0].inner_instruction_index, None); + assert_eq!(instructions[0].program_name, Some("spl-token".to_string())); + assert_eq!(instructions[0].parsed_type, Some("transfer".to_string())); + assert_eq!(instructions[1].instruction_index, 1); + assert_eq!(instructions[1].inner_instruction_index, None); + assert_eq!(instructions[1].program_name, Some("system".to_string())); + assert_eq!(instructions[1].parsed_type, None); + } + + #[tokio::test] + async fn persist_resolved_transaction_projects_inner_instructions() { + let database = make_database().await; + let service = crate::KbTransactionModelService::new(database.clone()); + let resolved_transaction = serde_json::json!({ + "slot": 700002, + "blockTime": 1777000002, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": "OuterProgram1111111111111111111111111111111", + "program": "outer-program", + "stackHeight": 1, + "accounts": ["PARENT1"], + "data": "raw-parent", + "parsed": { + "type": "outerInstruction", + "info": { "k": "v" } + } + } + ] + } + }, + "meta": { + "err": null, + "innerInstructions": [ + { + "index": 0, + "instructions": [ + { + "programId": crate::SPL_TOKEN_PROGRAM_ID, + "program": "spl-token", + "stackHeight": 2, + "accounts": ["INNER1", "INNER2"], + "data": "raw-inner-0", + "parsed": { + "type": "mintTo", + "info": { "amount": "5" } + } + }, + { + "programId": crate::SPL_TOKEN_PROGRAM_ID, + "program": "spl-token", + "stackHeight": 2, + "accounts": ["INNER3", "INNER4"], + "data": "raw-inner-1", + "parsed": { + "type": "burn", + "info": { "amount": "2" } + } + } + ] + } + ] + } + }); + let transaction_id = service + .persist_resolved_transaction( + "sig-model-inner-1", + Some("mainnet_public_http".to_string()), + &resolved_transaction, + ) + .await + .expect("persist_resolved_transaction must succeed"); + let instructions = + crate::list_chain_instructions_by_transaction_id(database.as_ref(), transaction_id) + .await + .expect("chain instructions list must succeed"); + assert_eq!(instructions.len(), 3); + let parent = &instructions[0]; + assert_eq!(parent.parent_instruction_id, None); + assert_eq!(parent.instruction_index, 0); + assert_eq!(parent.inner_instruction_index, None); + let inner_a = &instructions[1]; + assert_eq!(inner_a.parent_instruction_id, parent.id); + assert_eq!(inner_a.instruction_index, 0); + assert_eq!(inner_a.inner_instruction_index, Some(0)); + assert_eq!(inner_a.parsed_type, Some("mintTo".to_string())); + let inner_b = &instructions[2]; + assert_eq!(inner_b.parent_instruction_id, parent.id); + assert_eq!(inner_b.instruction_index, 0); + assert_eq!(inner_b.inner_instruction_index, Some(1)); + assert_eq!(inner_b.parsed_type, Some("burn".to_string())); + } + + #[tokio::test] + async fn persist_resolved_transaction_replaces_existing_projection() { + let database = make_database().await; + let service = crate::KbTransactionModelService::new(database.clone()); + let first_resolved_transaction = serde_json::json!({ + "slot": 700003, + "blockTime": 1777000003, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": "FirstProgram1111111111111111111111111111111", + "program": "first-program", + "stackHeight": 1, + "accounts": ["A1"], + "data": "raw-a1" + }, + { + "programId": "SecondProgram111111111111111111111111111111", + "program": "second-program", + "stackHeight": 1, + "accounts": ["A2"], + "data": "raw-a2" + } + ] + } + }, + "meta": { + "err": null + } + }); + let transaction_id_first = service + .persist_resolved_transaction( + "sig-model-replace-1", + Some("helius_primary_http".to_string()), + &first_resolved_transaction, + ) + .await + .expect("first projection must succeed"); + let first_instructions = crate::list_chain_instructions_by_transaction_id( + database.as_ref(), + transaction_id_first, + ) + .await + .expect("first chain instructions list must succeed"); + assert_eq!(first_instructions.len(), 2); + let second_resolved_transaction = serde_json::json!({ + "slot": 700003, + "blockTime": 1777000004, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": "OnlyProgram11111111111111111111111111111111", + "program": "only-program", + "stackHeight": 1, + "accounts": ["B1"], + "data": "raw-b1", + "parsed": { + "type": "singleInstruction", + "info": { "flag": true } + } + } + ] + } + }, + "meta": { + "err": null + } + }); + let transaction_id_second = service + .persist_resolved_transaction( + "sig-model-replace-1", + Some("helius_primary_http".to_string()), + &second_resolved_transaction, + ) + .await + .expect("second projection must succeed"); + assert_eq!(transaction_id_first, transaction_id_second); + let second_instructions = crate::list_chain_instructions_by_transaction_id( + database.as_ref(), + transaction_id_second, + ) + .await + .expect("second chain instructions list must succeed"); + assert_eq!(second_instructions.len(), 1); + assert_eq!(second_instructions[0].instruction_index, 0); + assert_eq!( + second_instructions[0].program_name, + Some("only-program".to_string()) + ); + assert_eq!( + second_instructions[0].parsed_type, + Some("singleInstruction".to_string()) + ); + let transaction = + crate::get_chain_transaction_by_signature(database.as_ref(), "sig-model-replace-1") + .await + .expect("transaction fetch must succeed") + .expect("transaction must exist"); + assert_eq!(transaction.block_time_unix, Some(1777000004)); + } +} diff --git a/kb_lib/src/tx_resolution.rs b/kb_lib/src/tx_resolution.rs index 0a72271..b918bc0 100644 --- a/kb_lib/src/tx_resolution.rs +++ b/kb_lib/src/tx_resolution.rs @@ -98,6 +98,7 @@ pub struct KbWsTransactionResolutionRelayStats { pub struct KbTransactionResolutionService { http_pool: std::sync::Arc, persistence: crate::KbDetectionPersistenceService, + transaction_model: crate::KbTransactionModelService, http_role: std::string::String, resolved_signatures: std::sync::Arc>>, @@ -107,12 +108,15 @@ impl KbTransactionResolutionService { /// Creates a new transaction resolution service. pub fn new( http_pool: std::sync::Arc, - persistence: crate::KbDetectionPersistenceService, + database: std::sync::Arc, http_role: std::string::String, ) -> Self { + let persistence = crate::KbDetectionPersistenceService::new(database.clone()); + let transaction_model = crate::KbTransactionModelService::new(database); Self { http_pool, persistence, + transaction_model, http_role, resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), @@ -274,12 +278,25 @@ impl KbTransactionResolutionService { .get("slot") .and_then(serde_json::Value::as_u64) .or(request.slot_hint); + let projection_result = self + .transaction_model + .persist_resolved_transaction( + request.signature.as_str(), + request.source_endpoint_name.clone(), + &transaction_value, + ) + .await; + let projected_transaction_id = match projection_result { + Ok(projected_transaction_id) => projected_transaction_id, + Err(error) => return Err(error), + }; let payload = serde_json::json!({ "status": "resolved", "signature": request.signature.clone(), "triggerMethod": request.trigger_method.clone(), "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, + "projectedTransactionId": projected_transaction_id, "triggerPayload": request.trigger_payload.clone(), "transaction": transaction_value }); diff --git a/kb_lib/src/ws_client.rs b/kb_lib/src/ws_client.rs index 5aa1b52..654b3c3 100644 --- a/kb_lib/src/ws_client.rs +++ b/kb_lib/src/ws_client.rs @@ -1463,13 +1463,23 @@ fn kb_build_pending_json_rpc_request( request: &crate::KbJsonRpcWsRequest, ) -> std::option::Option { let request_id_option = kb_json_value_to_u64(&request.id); - let request_id = request_id_option?; + let request_id = match request_id_option { + Some(request_id) => request_id, + None => return None, + }; if kb_is_subscribe_method(&request.method) { let notification_method_option = kb_infer_notification_method_from_subscribe(&request.method); - let unsubscribe_method_option = kb_infer_unsubscribe_method_from_subscribe(&request.method); - let notification_method = notification_method_option?; - let unsubscribe_method = unsubscribe_method_option?; + let notification_method = match notification_method_option { + Some(notification_method) => notification_method, + None => return None, + }; + let unsubscribe_method_option = + kb_infer_unsubscribe_method_from_subscribe(&request.method); + let unsubscribe_method = match unsubscribe_method_option { + Some(unsubscribe_method) => unsubscribe_method, + None => return None, + }; return Some(WsPendingJsonRpcRequest { request_id, method: request.method.clone(), @@ -1482,9 +1492,15 @@ fn kb_build_pending_json_rpc_request( } if kb_is_unsubscribe_method(&request.method) { let first_param_option = request.params.first(); - let first_param = first_param_option?; + let first_param = match first_param_option { + Some(first_param) => first_param, + None => return None, + }; let subscription_id_option = first_param.as_u64(); - let subscription_id = subscription_id_option?; + let subscription_id = match subscription_id_option { + Some(subscription_id) => subscription_id, + None => return None, + }; return Some(WsPendingJsonRpcRequest { request_id, method: request.method.clone(), diff --git a/kb_lib/src/ws_manager.rs b/kb_lib/src/ws_manager.rs index 36263ad..d9ef516 100644 --- a/kb_lib/src/ws_manager.rs +++ b/kb_lib/src/ws_manager.rs @@ -469,9 +469,11 @@ impl WsManager { )); } } - let persistence = crate::KbDetectionPersistenceService::new(database); - let resolver = - crate::KbTransactionResolutionService::new(http_pool, persistence, http_role); + let resolver = crate::KbTransactionResolutionService::new( + http_pool, + database, + http_role, + ); let relay = crate::KbWsTransactionResolutionRelay::new(resolver); let (sender, receiver) = crate::KbWsTransactionResolutionRelay::channel(queue_capacity); let relay_task = relay.spawn(receiver);