0.7.1 for Real ! - it was 0.7.0 before, not 0.7.1 !!

This commit is contained in:
2026-04-26 11:44:58 +02:00
parent 60b8841895
commit ac5bf10af6
26 changed files with 2560 additions and 388 deletions

View File

@@ -31,3 +31,4 @@
0.6.5 - Ajout de ws_manager.rs pour lorchestration multi-clients WebSocket, le bus dévénements unifié et le branchement centralisé du relais de détection
0.6.6 - Ajout de la fenêtre Demo Ws Manager dans kb_app pour piloter plusieurs WsClient, visualiser le snapshot consolidé, tester le démarrage/arrêt par rôle et valider le flux unifié de WsEvent
0.7.0 - Ajout du socle de résolution transactionnelle orientée DEX : relais WS vers file de résolution, récupération getTransaction via HttpEndpointPool et persistance des résolutions dans les observations/signaux
0.7.1 - Ajout du modèle transactionnel enrichi : tables slots/transactions/instructions, requêtes daccès et projection structurée des transactions résolues

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "0.7.0"
version = "0.7.1"
edition = "2024"
license = "MIT"
repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot"

View File

@@ -457,14 +457,14 @@ Réalisé :
- préparation du futur modèle transactionnel enrichi sans bloquer les flux temps réel.
### 6.033. Version `0.7.1` — Modèle transactionnel Solana enrichi
Objectif : préparer un modèle interne plus riche, inspiré dune vision `slot -> signature -> instruction`.
Réalisé :
À faire :
- préparer les structures et tables permettant de relier blocs/slots, signatures et instructions,
- distinguer clairement transaction, instruction principale et éventuelles inner instructions,
- conserver la possibilité de relier plus tard un pool, un token ou un wallet à une signature fondatrice,
- préparer lhistorique transactionnel nécessaire aux futurs décodeurs DEX.
- ajout des tables techniques `kb_chain_slots`, `kb_chain_transactions` et `kb_chain_instructions`,
- distinction claire entre slot, transaction résolue et instructions normalisées,
- support des instructions principales et inner instructions,
- ajout des entités, DTOs et requêtes associées,
- ajout dun service de projection pour transformer une transaction JSON-RPC résolue en modèle transactionnel interne,
- ajout des tests de roundtrip et de projection.
### 6.034. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version
Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés.
@@ -656,10 +656,9 @@ Le projet doit maintenir au minimum :
## 12. Priorité immédiate
La priorité immédiate est désormais la suivante :
1. démarrer la version `0.7.1` avec le modèle transactionnel Solana enrichi,
2. préparer les structures et tables reliant slots, signatures et instructions,
3. distinguer clairement transaction principale et inner instructions,
4. préparer lhistorique transactionnel exploitable par les futurs décodeurs DEX,
5. conserver le découplage entre transport, résolution transactionnelle, détection métier et stockage,
6. préparer ensuite la version `0.7.2` pour les décodeurs DEX spécifiques par programme et version.
1. brancher automatiquement `tx_resolution.rs` vers `KbTransactionModelService`,
2. stabiliser la chaîne complète `WS -> résolution HTTP -> projection transactionnelle`,
3. démarrer la version `0.7.2` avec les décodeurs DEX spécifiques par programme et version,
4. introduire les premières règles de décodage dédiées à Raydium / Pump.fun / PumpSwap,
5. conserver le découplage entre transport, résolution transactionnelle, projection et décodage métier,
6. préparer ensuite lenrichissement des objets métier DEX à partir du modèle transactionnel.

View File

@@ -427,60 +427,122 @@ async fn kb_execute_demo_ws_subscribe(
let method = request.method.trim();
let mode = request.mode.trim();
if method == "account" {
let target = kb_required_target(request, "account pubkey")?;
let target_result = kb_required_target(request, "account pubkey");
let target = match target_result {
Ok(target) => target,
Err(error) => return Err(error),
};
if mode == "typed" {
let config = kb_parse_optional_json_typed::<
let config_result = kb_parse_optional_json_typed::<
solana_rpc_client_api::config::RpcAccountInfoConfig,
>(&request.config_json, "account typed config")?;
>(&request.config_json, "account typed config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.account_subscribe_typed(target, config).await;
return result.map_err(|error| format!("account typed subscribe failed: {error}"));
}
let config = kb_parse_optional_json_value(&request.config_json, "account raw config")?;
let config_result =
kb_parse_optional_json_value(&request.config_json, "account raw config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.account_subscribe_raw(target, config).await;
return result.map_err(|error| format!("account raw subscribe failed: {error}"));
}
if method == "block" {
if mode == "typed" {
let filter = kb_parse_required_json_typed::<
let filter_result = kb_parse_required_json_typed::<
solana_rpc_client_api::config::RpcBlockSubscribeFilter,
>(&request.filter_json, "block typed filter")?;
let config = kb_parse_optional_json_typed::<
>(&request.filter_json, "block typed filter");
let filter = match filter_result {
Ok(filter) => filter,
Err(error) => return Err(error),
};
let config_result = kb_parse_optional_json_typed::<
solana_rpc_client_api::config::RpcBlockSubscribeConfig,
>(&request.config_json, "block typed config")?;
>(&request.config_json, "block typed config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.block_subscribe_typed(filter, config).await;
return result.map_err(|error| format!("block typed subscribe failed: {error}"));
}
let filter = kb_parse_required_json_value(&request.filter_json, "block raw filter")?;
let config = kb_parse_optional_json_value(&request.config_json, "block raw config")?;
let filter_result =
kb_parse_required_json_value(&request.filter_json, "block raw filter");
let filter = match filter_result {
Ok(filter) => filter,
Err(error) => return Err(error),
};
let config_result =
kb_parse_optional_json_value(&request.config_json, "block raw config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.block_subscribe_raw(filter, config).await;
return result.map_err(|error| format!("block raw subscribe failed: {error}"));
}
if method == "logs" {
if mode == "typed" {
let filter = kb_parse_required_json_typed::<
let filter_result = kb_parse_required_json_typed::<
solana_rpc_client_api::config::RpcTransactionLogsFilter,
>(&request.filter_json, "logs typed filter")?;
let config = kb_parse_optional_json_typed::<
>(&request.filter_json, "logs typed filter");
let filter = match filter_result {
Ok(filter) => filter,
Err(error) => return Err(error),
};
let config_result = kb_parse_optional_json_typed::<
solana_rpc_client_api::config::RpcTransactionLogsConfig,
>(&request.config_json, "logs typed config")?;
>(&request.config_json, "logs typed config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.logs_subscribe_typed(filter, config).await;
return result.map_err(|error| format!("logs typed subscribe failed: {error}"));
}
let filter = kb_parse_required_json_value(&request.filter_json, "logs raw filter")?;
let config = kb_parse_optional_json_value(&request.config_json, "logs raw config")?;
let filter_result =
kb_parse_required_json_value(&request.filter_json, "logs raw filter");
let filter = match filter_result {
Ok(filter) => filter,
Err(error) => return Err(error),
};
let config_result =
kb_parse_optional_json_value(&request.config_json, "logs raw config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.logs_subscribe_raw(filter, config).await;
return result.map_err(|error| format!("logs raw subscribe failed: {error}"));
}
if method == "program" {
let target = kb_required_target(request, "program id")?;
let target_result = kb_required_target(request, "program id");
let target = match target_result {
Ok(target) => target,
Err(error) => return Err(error),
};
if mode == "typed" {
let config = kb_parse_optional_json_typed::<
let config_result = kb_parse_optional_json_typed::<
solana_rpc_client_api::config::RpcProgramAccountsConfig,
>(&request.config_json, "program typed config")?;
>(&request.config_json, "program typed config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.program_subscribe_typed(target, config).await;
return result.map_err(|error| format!("program typed subscribe failed: {error}"));
}
let config = kb_parse_optional_json_value(&request.config_json, "program raw config")?;
let config_result =
kb_parse_optional_json_value(&request.config_json, "program raw config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.program_subscribe_raw(target, config).await;
return result.map_err(|error| format!("program raw subscribe failed: {error}"));
}
@@ -489,15 +551,28 @@ async fn kb_execute_demo_ws_subscribe(
return result.map_err(|error| format!("root subscribe failed: {error}"));
}
if method == "signature" {
let target = kb_required_target(request, "signature")?;
let target_result = kb_required_target(request, "signature");
let target = match target_result {
Ok(target) => target,
Err(error) => return Err(error),
};
if mode == "typed" {
let config = kb_parse_optional_json_typed::<
let config_result = kb_parse_optional_json_typed::<
solana_rpc_client_api::config::RpcSignatureSubscribeConfig,
>(&request.config_json, "signature typed config")?;
>(&request.config_json, "signature typed config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.signature_subscribe_typed(target, config).await;
return result.map_err(|error| format!("signature typed subscribe failed: {error}"));
}
let config = kb_parse_optional_json_value(&request.config_json, "signature raw config")?;
let config_result =
kb_parse_optional_json_value(&request.config_json, "signature raw config");
let config = match config_result {
Ok(config) => config,
Err(error) => return Err(error),
};
let result = client.signature_subscribe_raw(target, config).await;
return result.map_err(|error| format!("signature raw subscribe failed: {error}"));
}

View File

@@ -14,7 +14,7 @@ mod demo_ws;
mod demo_ws_manager;
mod splash;
pub use crate::splash::SplashOrder;
pub use splash::SplashOrder;
use tauri::Emitter;
use tauri::Manager;

View File

@@ -13,91 +13,107 @@ mod schema;
mod sqlite;
mod types;
pub use crate::db::connection::KbDatabase;
pub use crate::db::connection::KbDatabaseConnection;
pub use crate::db::dtos::KbAnalysisSignalDto;
pub use crate::db::dtos::KbDbMetadataDto;
pub use crate::db::dtos::KbDbRuntimeEventDto;
pub use crate::db::dtos::KbDexDto;
pub use crate::db::dtos::KbKnownHttpEndpointDto;
pub use crate::db::dtos::KbKnownWsEndpointDto;
pub use crate::db::dtos::KbLiquidityEventDto;
pub use crate::db::dtos::KbObservedTokenDto;
pub use crate::db::dtos::KbOnchainObservationDto;
pub use crate::db::dtos::KbPairDto;
pub use crate::db::dtos::KbPoolDto;
pub use crate::db::dtos::KbPoolListingDto;
pub use crate::db::dtos::KbPoolTokenDto;
pub use crate::db::dtos::KbSwapDto;
pub use crate::db::dtos::KbTokenBurnEventDto;
pub use crate::db::dtos::KbTokenDto;
pub use crate::db::dtos::KbTokenMintEventDto;
pub use crate::db::entities::KbAnalysisSignalEntity;
pub use crate::db::entities::KbDbMetadataEntity;
pub use crate::db::entities::KbDbRuntimeEventEntity;
pub use crate::db::entities::KbDexEntity;
pub use crate::db::entities::KbKnownHttpEndpointEntity;
pub use crate::db::entities::KbKnownWsEndpointEntity;
pub use crate::db::entities::KbLiquidityEventEntity;
pub use crate::db::entities::KbObservedTokenEntity;
pub use crate::db::entities::KbOnchainObservationEntity;
pub use crate::db::entities::KbPairEntity;
pub use crate::db::entities::KbPoolEntity;
pub use crate::db::entities::KbPoolListingEntity;
pub use crate::db::entities::KbPoolTokenEntity;
pub use crate::db::entities::KbSwapEntity;
pub use crate::db::entities::KbTokenBurnEventEntity;
pub use crate::db::entities::KbTokenEntity;
pub use crate::db::entities::KbTokenMintEventEntity;
pub use crate::db::queries::get_db_metadata;
pub use crate::db::queries::get_known_http_endpoint;
pub use crate::db::queries::get_known_ws_endpoint;
pub use crate::db::queries::get_observed_token_by_mint;
pub use crate::db::queries::get_token_by_mint;
pub use crate::db::queries::insert_analysis_signal;
pub use crate::db::queries::insert_db_runtime_event;
pub use crate::db::queries::insert_onchain_observation;
pub use crate::db::queries::list_db_metadata;
pub use crate::db::queries::list_dexes;
pub use crate::db::queries::list_known_http_endpoints;
pub use crate::db::queries::list_known_ws_endpoints;
pub use crate::db::queries::list_observed_tokens;
pub use crate::db::queries::list_recent_analysis_signals;
pub use crate::db::queries::list_recent_db_runtime_events;
pub use crate::db::queries::list_recent_liquidity_events;
pub use crate::db::queries::list_recent_onchain_observations;
pub use crate::db::queries::list_recent_swaps;
pub use crate::db::queries::list_recent_token_burn_events;
pub use crate::db::queries::list_recent_token_mint_events;
pub use crate::db::queries::upsert_db_metadata;
pub use crate::db::queries::upsert_dex;
pub use crate::db::queries::upsert_known_http_endpoint;
pub use crate::db::queries::upsert_known_ws_endpoint;
pub use crate::db::queries::upsert_liquidity_event;
pub use crate::db::queries::upsert_observed_token;
pub use crate::db::queries::upsert_pair;
pub use crate::db::queries::upsert_pool;
pub use crate::db::queries::upsert_pool_listing;
pub use crate::db::queries::upsert_pool_token;
pub use crate::db::queries::upsert_swap;
pub use crate::db::queries::upsert_token;
pub use crate::db::queries::upsert_token_burn_event;
pub use crate::db::queries::upsert_token_mint_event;
pub use crate::db::queries::get_dex_by_code;
pub use crate::db::queries::get_pair_by_pool_id;
pub use crate::db::queries::get_pool_by_address;
pub use crate::db::queries::get_pool_listing_by_pool_id;
pub use crate::db::queries::list_pairs;
pub use crate::db::queries::list_pool_listings;
pub use crate::db::queries::list_pool_tokens_by_pool_id;
pub use crate::db::queries::list_pools;
pub use crate::db::types::KbAnalysisSignalSeverity;
pub use crate::db::types::KbDatabaseBackend;
pub use crate::db::types::KbDbRuntimeEventLevel;
pub use crate::db::types::KbLiquidityEventKind;
pub use crate::db::types::KbObservationSourceKind;
pub use crate::db::types::KbObservedTokenStatus;
pub use crate::db::types::KbPoolKind;
pub use crate::db::types::KbPoolStatus;
pub use crate::db::types::KbPoolTokenRole;
pub use crate::db::types::KbSwapTradeSide;
pub use connection::KbDatabase;
pub use connection::KbDatabaseConnection;
pub use dtos::KbAnalysisSignalDto;
pub use dtos::KbDbMetadataDto;
pub use dtos::KbDbRuntimeEventDto;
pub use dtos::KbDexDto;
pub use dtos::KbKnownHttpEndpointDto;
pub use dtos::KbKnownWsEndpointDto;
pub use dtos::KbLiquidityEventDto;
pub use dtos::KbObservedTokenDto;
pub use dtos::KbOnchainObservationDto;
pub use dtos::KbPairDto;
pub use dtos::KbPoolDto;
pub use dtos::KbPoolListingDto;
pub use dtos::KbPoolTokenDto;
pub use dtos::KbSwapDto;
pub use dtos::KbTokenBurnEventDto;
pub use dtos::KbTokenDto;
pub use dtos::KbTokenMintEventDto;
pub use dtos::KbChainInstructionDto;
pub use dtos::KbChainSlotDto;
pub use dtos::KbChainTransactionDto;
pub use entities::KbAnalysisSignalEntity;
pub use entities::KbDbMetadataEntity;
pub use entities::KbDbRuntimeEventEntity;
pub use entities::KbDexEntity;
pub use entities::KbKnownHttpEndpointEntity;
pub use entities::KbKnownWsEndpointEntity;
pub use entities::KbLiquidityEventEntity;
pub use entities::KbObservedTokenEntity;
pub use entities::KbOnchainObservationEntity;
pub use entities::KbPairEntity;
pub use entities::KbPoolEntity;
pub use entities::KbPoolListingEntity;
pub use entities::KbPoolTokenEntity;
pub use entities::KbSwapEntity;
pub use entities::KbTokenBurnEventEntity;
pub use entities::KbTokenEntity;
pub use entities::KbTokenMintEventEntity;
pub use entities::KbChainInstructionEntity;
pub use entities::KbChainSlotEntity;
pub use entities::KbChainTransactionEntity;
pub use queries::get_db_metadata;
pub use queries::get_known_http_endpoint;
pub use queries::get_known_ws_endpoint;
pub use queries::get_observed_token_by_mint;
pub use queries::get_token_by_mint;
pub use queries::insert_analysis_signal;
pub use queries::insert_db_runtime_event;
pub use queries::insert_onchain_observation;
pub use queries::list_db_metadata;
pub use queries::list_dexes;
pub use queries::list_known_http_endpoints;
pub use queries::list_known_ws_endpoints;
pub use queries::list_observed_tokens;
pub use queries::list_recent_analysis_signals;
pub use queries::list_recent_db_runtime_events;
pub use queries::list_recent_liquidity_events;
pub use queries::list_recent_onchain_observations;
pub use queries::list_recent_swaps;
pub use queries::list_recent_token_burn_events;
pub use queries::list_recent_token_mint_events;
pub use queries::upsert_db_metadata;
pub use queries::upsert_dex;
pub use queries::upsert_known_http_endpoint;
pub use queries::upsert_known_ws_endpoint;
pub use queries::upsert_liquidity_event;
pub use queries::upsert_observed_token;
pub use queries::upsert_pair;
pub use queries::upsert_pool;
pub use queries::upsert_pool_listing;
pub use queries::upsert_pool_token;
pub use queries::upsert_swap;
pub use queries::upsert_token;
pub use queries::upsert_token_burn_event;
pub use queries::upsert_token_mint_event;
pub use queries::get_dex_by_code;
pub use queries::get_pair_by_pool_id;
pub use queries::get_pool_by_address;
pub use queries::get_pool_listing_by_pool_id;
pub use queries::list_pairs;
pub use queries::list_pool_listings;
pub use queries::list_pool_tokens_by_pool_id;
pub use queries::list_pools;
pub use queries::delete_chain_instructions_by_transaction_id;
pub use queries::get_chain_slot;
pub use queries::get_chain_transaction_by_signature;
pub use queries::insert_chain_instruction;
pub use queries::list_chain_instructions_by_transaction_id;
pub use queries::list_recent_chain_slots;
pub use queries::list_recent_chain_transactions;
pub use queries::upsert_chain_slot;
pub use queries::upsert_chain_transaction;
pub use types::KbAnalysisSignalSeverity;
pub use types::KbDatabaseBackend;
pub use types::KbDbRuntimeEventLevel;
pub use types::KbLiquidityEventKind;
pub use types::KbObservationSourceKind;
pub use types::KbObservedTokenStatus;
pub use types::KbPoolKind;
pub use types::KbPoolStatus;
pub use types::KbPoolTokenRole;
pub use types::KbSwapTradeSide;

View File

@@ -19,21 +19,27 @@ mod swap;
mod token;
mod token_burn_event;
mod token_mint_event;
mod chain_instruction;
mod chain_slot;
mod chain_transaction;
pub use crate::db::dtos::analysis_signal::KbAnalysisSignalDto;
pub use crate::db::dtos::db_metadata::KbDbMetadataDto;
pub use crate::db::dtos::db_runtime_event::KbDbRuntimeEventDto;
pub use crate::db::dtos::dex::KbDexDto;
pub use crate::db::dtos::known_http_endpoint::KbKnownHttpEndpointDto;
pub use crate::db::dtos::known_ws_endpoint::KbKnownWsEndpointDto;
pub use crate::db::dtos::liquidity_event::KbLiquidityEventDto;
pub use crate::db::dtos::observed_token::KbObservedTokenDto;
pub use crate::db::dtos::onchain_observation::KbOnchainObservationDto;
pub use crate::db::dtos::pair::KbPairDto;
pub use crate::db::dtos::pool::KbPoolDto;
pub use crate::db::dtos::pool_listing::KbPoolListingDto;
pub use crate::db::dtos::pool_token::KbPoolTokenDto;
pub use crate::db::dtos::swap::KbSwapDto;
pub use crate::db::dtos::token::KbTokenDto;
pub use crate::db::dtos::token_burn_event::KbTokenBurnEventDto;
pub use crate::db::dtos::token_mint_event::KbTokenMintEventDto;
pub use analysis_signal::KbAnalysisSignalDto;
pub use db_metadata::KbDbMetadataDto;
pub use db_runtime_event::KbDbRuntimeEventDto;
pub use dex::KbDexDto;
pub use known_http_endpoint::KbKnownHttpEndpointDto;
pub use known_ws_endpoint::KbKnownWsEndpointDto;
pub use liquidity_event::KbLiquidityEventDto;
pub use observed_token::KbObservedTokenDto;
pub use onchain_observation::KbOnchainObservationDto;
pub use pair::KbPairDto;
pub use pool::KbPoolDto;
pub use pool_listing::KbPoolListingDto;
pub use pool_token::KbPoolTokenDto;
pub use swap::KbSwapDto;
pub use token::KbTokenDto;
pub use token_burn_event::KbTokenBurnEventDto;
pub use token_mint_event::KbTokenMintEventDto;
pub use chain_instruction::KbChainInstructionDto;
pub use chain_slot::KbChainSlotDto;
pub use chain_transaction::KbChainTransactionDto;

View File

@@ -0,0 +1,140 @@
// file: kb_lib/src/db/dtos/chain_instruction.rs
//! Application-facing normalized chain instruction DTO.
/// Application-facing normalized chain instruction DTO.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbChainInstructionDto {
/// Optional numeric primary key.
pub id: std::option::Option<i64>,
/// Parent transaction id.
pub transaction_id: i64,
/// Optional parent instruction id for inner instructions.
pub parent_instruction_id: std::option::Option<i64>,
/// Outer instruction index.
pub instruction_index: u32,
/// Optional inner instruction index.
pub inner_instruction_index: std::option::Option<u32>,
/// Optional program id.
pub program_id: std::option::Option<std::string::String>,
/// Optional program name.
pub program_name: std::option::Option<std::string::String>,
/// Optional stack height.
pub stack_height: std::option::Option<u32>,
/// Serialized accounts JSON array.
pub accounts_json: std::string::String,
/// Optional serialized data JSON.
pub data_json: std::option::Option<std::string::String>,
/// Optional parsed type.
pub parsed_type: std::option::Option<std::string::String>,
/// Optional serialized parsed JSON.
pub parsed_json: std::option::Option<std::string::String>,
/// Creation timestamp.
pub created_at: chrono::DateTime<chrono::Utc>,
}
impl KbChainInstructionDto {
/// Creates a new chain instruction DTO.
#[allow(clippy::too_many_arguments)]
pub fn new(
transaction_id: i64,
parent_instruction_id: std::option::Option<i64>,
instruction_index: u32,
inner_instruction_index: std::option::Option<u32>,
program_id: std::option::Option<std::string::String>,
program_name: std::option::Option<std::string::String>,
stack_height: std::option::Option<u32>,
accounts_json: std::string::String,
data_json: std::option::Option<std::string::String>,
parsed_type: std::option::Option<std::string::String>,
parsed_json: std::option::Option<std::string::String>,
) -> Self {
Self {
id: None,
transaction_id,
parent_instruction_id,
instruction_index,
inner_instruction_index,
program_id,
program_name,
stack_height,
accounts_json,
data_json,
parsed_type,
parsed_json,
created_at: chrono::Utc::now(),
}
}
}
impl TryFrom<crate::KbChainInstructionEntity> for KbChainInstructionDto {
type Error = crate::KbError;
fn try_from(entity: crate::KbChainInstructionEntity) -> Result<Self, Self::Error> {
let instruction_index_result = u32::try_from(entity.instruction_index);
let instruction_index = match instruction_index_result {
Ok(instruction_index) => instruction_index,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain instruction instruction_index '{}' to u32: {}",
entity.instruction_index, error
)));
}
};
let inner_instruction_index = match entity.inner_instruction_index {
Some(inner_instruction_index) => {
let inner_instruction_index_result = u32::try_from(inner_instruction_index);
match inner_instruction_index_result {
Ok(inner_instruction_index) => Some(inner_instruction_index),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain instruction inner_instruction_index '{}' to u32: {}",
inner_instruction_index, error
)));
}
}
}
None => None,
};
let stack_height = match entity.stack_height {
Some(stack_height) => {
let stack_height_result = u32::try_from(stack_height);
match stack_height_result {
Ok(stack_height) => Some(stack_height),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain instruction stack_height '{}' to u32: {}",
stack_height, error
)));
}
}
}
None => None,
};
let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at);
let created_at = match created_at_result {
Ok(created_at) => created_at.with_timezone(&chrono::Utc),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot parse chain instruction created_at '{}': {}",
entity.created_at, error
)));
}
};
Ok(Self {
id: Some(entity.id),
transaction_id: entity.transaction_id,
parent_instruction_id: entity.parent_instruction_id,
instruction_index,
inner_instruction_index,
program_id: entity.program_id,
program_name: entity.program_name,
stack_height,
accounts_json: entity.accounts_json,
data_json: entity.data_json,
parsed_type: entity.parsed_type,
parsed_json: entity.parsed_json,
created_at,
})
}
}

View File

@@ -0,0 +1,95 @@
// file: kb_lib/src/db/dtos/chain_slot.rs
//! Application-facing normalized chain slot DTO.
/// Application-facing normalized chain slot DTO.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbChainSlotDto {
/// Slot number.
pub slot: u64,
/// Optional parent slot number.
pub parent_slot: std::option::Option<u64>,
/// Optional block time in unix seconds.
pub block_time_unix: std::option::Option<i64>,
/// Creation timestamp.
pub created_at: chrono::DateTime<chrono::Utc>,
/// Update timestamp.
pub updated_at: chrono::DateTime<chrono::Utc>,
}
impl KbChainSlotDto {
/// Creates a new chain slot DTO.
pub fn new(
slot: u64,
parent_slot: std::option::Option<u64>,
block_time_unix: std::option::Option<i64>,
) -> Self {
let now = chrono::Utc::now();
Self {
slot,
parent_slot,
block_time_unix,
created_at: now,
updated_at: now,
}
}
}
impl TryFrom<crate::KbChainSlotEntity> for KbChainSlotDto {
type Error = crate::KbError;
fn try_from(entity: crate::KbChainSlotEntity) -> Result<Self, Self::Error> {
let slot_result = u64::try_from(entity.slot);
let slot = match slot_result {
Ok(slot) => slot,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain slot '{}' to u64: {}",
entity.slot, error
)));
}
};
let parent_slot = match entity.parent_slot {
Some(parent_slot) => {
let parent_slot_result = u64::try_from(parent_slot);
match parent_slot_result {
Ok(parent_slot) => Some(parent_slot),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain parent_slot '{}' to u64: {}",
parent_slot, error
)));
}
}
}
None => None,
};
let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at);
let created_at = match created_at_result {
Ok(created_at) => created_at.with_timezone(&chrono::Utc),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot parse chain slot created_at '{}': {}",
entity.created_at, error
)));
}
};
let updated_at_result = chrono::DateTime::parse_from_rfc3339(&entity.updated_at);
let updated_at = match updated_at_result {
Ok(updated_at) => updated_at.with_timezone(&chrono::Utc),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot parse chain slot updated_at '{}': {}",
entity.updated_at, error
)));
}
};
Ok(Self {
slot,
parent_slot,
block_time_unix: entity.block_time_unix,
created_at,
updated_at,
})
}
}

View File

@@ -0,0 +1,115 @@
// file: kb_lib/src/db/dtos/chain_transaction.rs
//! Application-facing normalized chain transaction DTO.
/// Application-facing normalized chain transaction DTO.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct KbChainTransactionDto {
/// Optional numeric primary key.
pub id: std::option::Option<i64>,
/// Transaction signature.
pub signature: std::string::String,
/// Optional slot number.
pub slot: std::option::Option<u64>,
/// Optional block time in unix seconds.
pub block_time_unix: std::option::Option<i64>,
/// Optional source endpoint name.
pub source_endpoint_name: std::option::Option<std::string::String>,
/// Optional version text.
pub version_text: std::option::Option<std::string::String>,
/// Optional serialized transaction error JSON.
pub err_json: std::option::Option<std::string::String>,
/// Optional serialized meta JSON.
pub meta_json: std::option::Option<std::string::String>,
/// Serialized full transaction JSON.
pub transaction_json: std::string::String,
/// Creation timestamp.
pub created_at: chrono::DateTime<chrono::Utc>,
/// Update timestamp.
pub updated_at: chrono::DateTime<chrono::Utc>,
}
impl KbChainTransactionDto {
/// Creates a new chain transaction DTO.
#[allow(clippy::too_many_arguments)]
pub fn new(
signature: std::string::String,
slot: std::option::Option<u64>,
block_time_unix: std::option::Option<i64>,
source_endpoint_name: std::option::Option<std::string::String>,
version_text: std::option::Option<std::string::String>,
err_json: std::option::Option<std::string::String>,
meta_json: std::option::Option<std::string::String>,
transaction_json: std::string::String,
) -> Self {
let now = chrono::Utc::now();
Self {
id: None,
signature,
slot,
block_time_unix,
source_endpoint_name,
version_text,
err_json,
meta_json,
transaction_json,
created_at: now,
updated_at: now,
}
}
}
impl TryFrom<crate::KbChainTransactionEntity> for KbChainTransactionDto {
type Error = crate::KbError;
fn try_from(entity: crate::KbChainTransactionEntity) -> Result<Self, Self::Error> {
let slot = match entity.slot {
Some(slot) => {
let slot_result = u64::try_from(slot);
match slot_result {
Ok(slot) => Some(slot),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain transaction slot '{}' to u64: {}",
slot, error
)));
}
}
}
None => None,
};
let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at);
let created_at = match created_at_result {
Ok(created_at) => created_at.with_timezone(&chrono::Utc),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot parse chain transaction created_at '{}': {}",
entity.created_at, error
)));
}
};
let updated_at_result = chrono::DateTime::parse_from_rfc3339(&entity.updated_at);
let updated_at = match updated_at_result {
Ok(updated_at) => updated_at.with_timezone(&chrono::Utc),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot parse chain transaction updated_at '{}': {}",
entity.updated_at, error
)));
}
};
Ok(Self {
id: Some(entity.id),
signature: entity.signature,
slot,
block_time_unix: entity.block_time_unix,
source_endpoint_name: entity.source_endpoint_name,
version_text: entity.version_text,
err_json: entity.err_json,
meta_json: entity.meta_json,
transaction_json: entity.transaction_json,
created_at,
updated_at,
})
}
}

View File

@@ -21,21 +21,27 @@ mod swap;
mod token;
mod token_burn_event;
mod token_mint_event;
mod chain_instruction;
mod chain_slot;
mod chain_transaction;
pub use crate::db::entities::analysis_signal::KbAnalysisSignalEntity;
pub use crate::db::entities::db_metadata::KbDbMetadataEntity;
pub use crate::db::entities::db_runtime_event::KbDbRuntimeEventEntity;
pub use crate::db::entities::dex::KbDexEntity;
pub use crate::db::entities::known_http_endpoint::KbKnownHttpEndpointEntity;
pub use crate::db::entities::known_ws_endpoint::KbKnownWsEndpointEntity;
pub use crate::db::entities::liquidity_event::KbLiquidityEventEntity;
pub use crate::db::entities::observed_token::KbObservedTokenEntity;
pub use crate::db::entities::onchain_observation::KbOnchainObservationEntity;
pub use crate::db::entities::pair::KbPairEntity;
pub use crate::db::entities::pool::KbPoolEntity;
pub use crate::db::entities::pool_listing::KbPoolListingEntity;
pub use crate::db::entities::pool_token::KbPoolTokenEntity;
pub use crate::db::entities::swap::KbSwapEntity;
pub use crate::db::entities::token::KbTokenEntity;
pub use crate::db::entities::token_burn_event::KbTokenBurnEventEntity;
pub use crate::db::entities::token_mint_event::KbTokenMintEventEntity;
pub use analysis_signal::KbAnalysisSignalEntity;
pub use db_metadata::KbDbMetadataEntity;
pub use db_runtime_event::KbDbRuntimeEventEntity;
pub use dex::KbDexEntity;
pub use known_http_endpoint::KbKnownHttpEndpointEntity;
pub use known_ws_endpoint::KbKnownWsEndpointEntity;
pub use liquidity_event::KbLiquidityEventEntity;
pub use observed_token::KbObservedTokenEntity;
pub use onchain_observation::KbOnchainObservationEntity;
pub use pair::KbPairEntity;
pub use pool::KbPoolEntity;
pub use pool_listing::KbPoolListingEntity;
pub use pool_token::KbPoolTokenEntity;
pub use swap::KbSwapEntity;
pub use token::KbTokenEntity;
pub use token_burn_event::KbTokenBurnEventEntity;
pub use token_mint_event::KbTokenMintEventEntity;
pub use chain_instruction::KbChainInstructionEntity;
pub use chain_slot::KbChainSlotEntity;
pub use chain_transaction::KbChainTransactionEntity;

View File

@@ -0,0 +1,34 @@
// file: kb_lib/src/db/entities/chain_instruction.rs
//! Database entity for one normalized Solana instruction row.
/// Persisted Solana instruction row.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct KbChainInstructionEntity {
/// Internal row id.
pub id: i64,
/// Parent transaction id.
pub transaction_id: i64,
/// Optional parent instruction id for inner instructions.
pub parent_instruction_id: std::option::Option<i64>,
/// Top-level instruction index.
pub instruction_index: i64,
/// Optional inner instruction index.
pub inner_instruction_index: std::option::Option<i64>,
/// Optional program id.
pub program_id: std::option::Option<std::string::String>,
/// Optional program name.
pub program_name: std::option::Option<std::string::String>,
/// Optional stack height.
pub stack_height: std::option::Option<i64>,
/// Accounts JSON array.
pub accounts_json: std::string::String,
/// Optional data JSON.
pub data_json: std::option::Option<std::string::String>,
/// Optional parsed type.
pub parsed_type: std::option::Option<std::string::String>,
/// Optional parsed payload JSON.
pub parsed_json: std::option::Option<std::string::String>,
/// Creation timestamp.
pub created_at: std::string::String,
}

View File

@@ -0,0 +1,19 @@
// file: kb_lib/src/db/entities/chain_slot.rs
//! Database entity for one observed Solana slot.
/// Persisted Solana slot row.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct KbChainSlotEntity {
/// Slot number.
pub slot: i64,
/// Optional parent slot number.
pub parent_slot: std::option::Option<i64>,
/// Optional block time in unix seconds.
pub block_time_unix: std::option::Option<i64>,
/// Creation timestamp.
pub created_at: std::string::String,
/// Update timestamp.
pub updated_at: std::string::String,
}

View File

@@ -0,0 +1,30 @@
// file: kb_lib/src/db/entities/chain_transaction.rs
//! Database entity for one resolved Solana transaction.
/// Persisted Solana transaction row.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)]
pub struct KbChainTransactionEntity {
/// Internal row id.
pub id: i64,
/// Transaction signature.
pub signature: std::string::String,
/// Optional slot number.
pub slot: std::option::Option<i64>,
/// Optional block time in unix seconds.
pub block_time_unix: std::option::Option<i64>,
/// Optional logical endpoint name that resolved the transaction.
pub source_endpoint_name: std::option::Option<std::string::String>,
/// Optional version text.
pub version_text: std::option::Option<std::string::String>,
/// Optional transaction error JSON.
pub err_json: std::option::Option<std::string::String>,
/// Optional transaction meta JSON.
pub meta_json: std::option::Option<std::string::String>,
/// Full transaction JSON.
pub transaction_json: std::string::String,
/// Creation timestamp.
pub created_at: std::string::String,
/// Update timestamp.
pub updated_at: std::string::String,
}

View File

@@ -23,46 +23,58 @@ mod swap;
mod token;
mod token_burn_event;
mod token_mint_event;
mod chain_instruction;
mod chain_slot;
mod chain_transaction;
pub use crate::db::queries::analysis_signal::insert_analysis_signal;
pub use crate::db::queries::analysis_signal::list_recent_analysis_signals;
pub use crate::db::queries::db_metadata::get_db_metadata;
pub use crate::db::queries::db_metadata::list_db_metadata;
pub use crate::db::queries::db_metadata::upsert_db_metadata;
pub use crate::db::queries::db_runtime_event::insert_db_runtime_event;
pub use crate::db::queries::db_runtime_event::list_recent_db_runtime_events;
pub use crate::db::queries::dex::get_dex_by_code;
pub use crate::db::queries::dex::list_dexes;
pub use crate::db::queries::dex::upsert_dex;
pub use crate::db::queries::known_http_endpoint::get_known_http_endpoint;
pub use crate::db::queries::known_http_endpoint::list_known_http_endpoints;
pub use crate::db::queries::known_http_endpoint::upsert_known_http_endpoint;
pub use crate::db::queries::known_ws_endpoint::get_known_ws_endpoint;
pub use crate::db::queries::known_ws_endpoint::list_known_ws_endpoints;
pub use crate::db::queries::known_ws_endpoint::upsert_known_ws_endpoint;
pub use crate::db::queries::liquidity_event::list_recent_liquidity_events;
pub use crate::db::queries::liquidity_event::upsert_liquidity_event;
pub use crate::db::queries::observed_token::get_observed_token_by_mint;
pub use crate::db::queries::observed_token::list_observed_tokens;
pub use crate::db::queries::observed_token::upsert_observed_token;
pub use crate::db::queries::onchain_observation::insert_onchain_observation;
pub use crate::db::queries::onchain_observation::list_recent_onchain_observations;
pub use crate::db::queries::pair::get_pair_by_pool_id;
pub use crate::db::queries::pair::list_pairs;
pub use crate::db::queries::pair::upsert_pair;
pub use crate::db::queries::pool::get_pool_by_address;
pub use crate::db::queries::pool::list_pools;
pub use crate::db::queries::pool::upsert_pool;
pub use crate::db::queries::pool_listing::get_pool_listing_by_pool_id;
pub use crate::db::queries::pool_listing::list_pool_listings;
pub use crate::db::queries::pool_listing::upsert_pool_listing;
pub use crate::db::queries::pool_token::list_pool_tokens_by_pool_id;
pub use crate::db::queries::pool_token::upsert_pool_token;
pub use crate::db::queries::swap::list_recent_swaps;
pub use crate::db::queries::swap::upsert_swap;
pub use crate::db::queries::token::get_token_by_mint;
pub use crate::db::queries::token::upsert_token;
pub use crate::db::queries::token_burn_event::list_recent_token_burn_events;
pub use crate::db::queries::token_burn_event::upsert_token_burn_event;
pub use crate::db::queries::token_mint_event::list_recent_token_mint_events;
pub use crate::db::queries::token_mint_event::upsert_token_mint_event;
pub use analysis_signal::insert_analysis_signal;
pub use analysis_signal::list_recent_analysis_signals;
pub use db_metadata::get_db_metadata;
pub use db_metadata::list_db_metadata;
pub use db_metadata::upsert_db_metadata;
pub use db_runtime_event::insert_db_runtime_event;
pub use db_runtime_event::list_recent_db_runtime_events;
pub use dex::get_dex_by_code;
pub use dex::list_dexes;
pub use dex::upsert_dex;
pub use known_http_endpoint::get_known_http_endpoint;
pub use known_http_endpoint::list_known_http_endpoints;
pub use known_http_endpoint::upsert_known_http_endpoint;
pub use known_ws_endpoint::get_known_ws_endpoint;
pub use known_ws_endpoint::list_known_ws_endpoints;
pub use known_ws_endpoint::upsert_known_ws_endpoint;
pub use liquidity_event::list_recent_liquidity_events;
pub use liquidity_event::upsert_liquidity_event;
pub use observed_token::get_observed_token_by_mint;
pub use observed_token::list_observed_tokens;
pub use observed_token::upsert_observed_token;
pub use onchain_observation::insert_onchain_observation;
pub use onchain_observation::list_recent_onchain_observations;
pub use pair::get_pair_by_pool_id;
pub use pair::list_pairs;
pub use pair::upsert_pair;
pub use pool::get_pool_by_address;
pub use pool::list_pools;
pub use pool::upsert_pool;
pub use pool_listing::get_pool_listing_by_pool_id;
pub use pool_listing::list_pool_listings;
pub use pool_listing::upsert_pool_listing;
pub use pool_token::list_pool_tokens_by_pool_id;
pub use pool_token::upsert_pool_token;
pub use swap::list_recent_swaps;
pub use swap::upsert_swap;
pub use token::get_token_by_mint;
pub use token::upsert_token;
pub use token_burn_event::list_recent_token_burn_events;
pub use token_burn_event::upsert_token_burn_event;
pub use token_mint_event::list_recent_token_mint_events;
pub use token_mint_event::upsert_token_mint_event;
pub use chain_instruction::list_chain_instructions_by_transaction_id;
pub use chain_instruction::insert_chain_instruction;
pub use chain_instruction::delete_chain_instructions_by_transaction_id;
pub use chain_slot::list_recent_chain_slots;
pub use chain_slot::upsert_chain_slot;
pub use chain_slot::get_chain_slot;
pub use chain_transaction::list_recent_chain_transactions;
pub use chain_transaction::upsert_chain_transaction;
pub use chain_transaction::get_chain_transaction_by_signature;

View File

@@ -0,0 +1,246 @@
// file: kb_lib/src/db/queries/chain_instruction.rs
//! Queries for `kb_chain_instructions`.
/// Inserts one normalized chain instruction row.
pub async fn insert_chain_instruction(
database: &crate::KbDatabase,
dto: &crate::KbChainInstructionDto,
) -> Result<i64, crate::KbError> {
let instruction_index_result = i64::from(dto.instruction_index);
let inner_instruction_index = match dto.inner_instruction_index {
Some(inner_instruction_index) => Some(i64::from(inner_instruction_index)),
None => None,
};
let stack_height = match dto.stack_height {
Some(stack_height) => Some(i64::from(stack_height)),
None => None,
};
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
INSERT INTO kb_chain_instructions (
transaction_id,
parent_instruction_id,
instruction_index,
inner_instruction_index,
program_id,
program_name,
stack_height,
accounts_json,
data_json,
parsed_type,
parsed_json,
created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(dto.transaction_id)
.bind(dto.parent_instruction_id)
.bind(instruction_index_result)
.bind(inner_instruction_index)
.bind(dto.program_id.clone())
.bind(dto.program_name.clone())
.bind(stack_height)
.bind(dto.accounts_json.clone())
.bind(dto.data_json.clone())
.bind(dto.parsed_type.clone())
.bind(dto.parsed_json.clone())
.bind(dto.created_at.to_rfc3339())
.execute(pool)
.await;
let insert_result = match query_result {
Ok(insert_result) => insert_result,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot insert kb_chain_instructions on sqlite: {}",
error
)));
}
};
Ok(insert_result.last_insert_rowid())
}
}
}
/// Lists instructions for one transaction ordered from outer to inner.
pub async fn list_chain_instructions_by_transaction_id(
database: &crate::KbDatabase,
transaction_id: i64,
) -> Result<std::vec::Vec<crate::KbChainInstructionDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbChainInstructionEntity>(
r#"
SELECT
id,
transaction_id,
parent_instruction_id,
instruction_index,
inner_instruction_index,
program_id,
program_name,
stack_height,
accounts_json,
data_json,
parsed_type,
parsed_json,
created_at
FROM kb_chain_instructions
WHERE transaction_id = ?
ORDER BY instruction_index ASC, inner_instruction_index ASC, id ASC
"#,
)
.bind(transaction_id)
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_chain_instructions for transaction_id '{}' on sqlite: {}",
transaction_id, error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbChainInstructionDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
/// Deletes all instructions for one transaction id.
pub async fn delete_chain_instructions_by_transaction_id(
database: &crate::KbDatabase,
transaction_id: i64,
) -> Result<(), crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
DELETE FROM kb_chain_instructions
WHERE transaction_id = ?
"#,
)
.bind(transaction_id)
.execute(pool)
.await;
if let Err(error) = query_result {
return Err(crate::KbError::Db(format!(
"cannot delete kb_chain_instructions for transaction_id '{}' on sqlite: {}",
transaction_id, error
)));
}
Ok(())
}
}
}
#[cfg(test)]
mod tests {
async fn make_database() -> crate::KbDatabase {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("chain_instruction.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed")
}
async fn make_transaction(database: &crate::KbDatabase) -> i64 {
let dto = crate::KbChainTransactionDto::new(
"sig-chain-instruction-1".to_string(),
None,
None,
Some("mainnet_public_http".to_string()),
Some("legacy".to_string()),
None,
None,
r#"{"transaction":{"message":{"instructions":[]}}}"#.to_string(),
);
crate::upsert_chain_transaction(database, &dto)
.await
.expect("chain transaction upsert must succeed")
}
#[tokio::test]
async fn chain_instruction_insert_list_delete_works() {
let database = make_database().await;
let transaction_id = make_transaction(&database).await;
let outer_dto = crate::KbChainInstructionDto::new(
transaction_id,
None,
0,
None,
Some(crate::SPL_TOKEN_PROGRAM_ID.to_string()),
Some("spl-token".to_string()),
Some(1),
r#"["AccountA","AccountB"]"#.to_string(),
Some(r#""raw-data-outer""#.to_string()),
Some("transfer".to_string()),
Some(r#"{"type":"transfer","info":{"amount":"10"}}"#.to_string()),
);
let outer_instruction_id = crate::insert_chain_instruction(&database, &outer_dto)
.await
.expect("outer instruction insert must succeed");
assert!(outer_instruction_id > 0);
let inner_dto = crate::KbChainInstructionDto::new(
transaction_id,
Some(outer_instruction_id),
0,
Some(0),
Some(crate::SPL_TOKEN_PROGRAM_ID.to_string()),
Some("spl-token".to_string()),
Some(2),
r#"["InnerA","InnerB"]"#.to_string(),
Some(r#""raw-data-inner""#.to_string()),
Some("mintTo".to_string()),
Some(r#"{"type":"mintTo","info":{"amount":"5"}}"#.to_string()),
);
let inner_instruction_id = crate::insert_chain_instruction(&database, &inner_dto)
.await
.expect("inner instruction insert must succeed");
assert!(inner_instruction_id > outer_instruction_id);
let listed = crate::list_chain_instructions_by_transaction_id(&database, transaction_id)
.await
.expect("chain instruction list must succeed");
assert_eq!(listed.len(), 2);
assert_eq!(listed[0].parent_instruction_id, None);
assert_eq!(listed[0].instruction_index, 0);
assert_eq!(listed[0].inner_instruction_index, None);
assert_eq!(listed[0].parsed_type, Some("transfer".to_string()));
assert_eq!(listed[1].parent_instruction_id, Some(outer_instruction_id));
assert_eq!(listed[1].instruction_index, 0);
assert_eq!(listed[1].inner_instruction_index, Some(0));
assert_eq!(listed[1].parsed_type, Some("mintTo".to_string()));
crate::delete_chain_instructions_by_transaction_id(&database, transaction_id)
.await
.expect("chain instruction delete must succeed");
let listed_after_delete =
crate::list_chain_instructions_by_transaction_id(&database, transaction_id)
.await
.expect("chain instruction list after delete must succeed");
assert_eq!(listed_after_delete.len(), 0);
}
}

View File

@@ -0,0 +1,221 @@
// file: kb_lib/src/db/queries/chain_slot.rs
//! Queries for `kb_chain_slots`.
/// Inserts or updates one normalized chain slot row.
pub async fn upsert_chain_slot(
database: &crate::KbDatabase,
dto: &crate::KbChainSlotDto,
) -> Result<u64, crate::KbError> {
let slot_result = i64::try_from(dto.slot);
let slot = match slot_result {
Ok(slot) => slot,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain slot '{}' to i64: {}",
dto.slot, error
)));
}
};
let parent_slot = match dto.parent_slot {
Some(parent_slot) => {
let parent_slot_result = i64::try_from(parent_slot);
match parent_slot_result {
Ok(parent_slot) => Some(parent_slot),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain parent_slot '{}' to i64: {}",
parent_slot, error
)));
}
}
}
None => None,
};
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
INSERT INTO kb_chain_slots (
slot,
parent_slot,
block_time_unix,
created_at,
updated_at
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(slot) DO UPDATE SET
parent_slot = excluded.parent_slot,
block_time_unix = excluded.block_time_unix,
updated_at = excluded.updated_at
"#,
)
.bind(slot)
.bind(parent_slot)
.bind(dto.block_time_unix)
.bind(dto.created_at.to_rfc3339())
.bind(dto.updated_at.to_rfc3339())
.execute(pool)
.await;
if let Err(error) = query_result {
return Err(crate::KbError::Db(format!(
"cannot upsert kb_chain_slots on sqlite: {}",
error
)));
}
Ok(dto.slot)
}
}
}
/// Reads one chain slot row by slot number.
pub async fn get_chain_slot(
database: &crate::KbDatabase,
slot: u64,
) -> Result<std::option::Option<crate::KbChainSlotDto>, crate::KbError> {
let slot_result = i64::try_from(slot);
let slot_i64 = match slot_result {
Ok(slot_i64) => slot_i64,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert requested chain slot '{}' to i64: {}",
slot, error
)));
}
};
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbChainSlotEntity>(
r#"
SELECT
slot,
parent_slot,
block_time_unix,
created_at,
updated_at
FROM kb_chain_slots
WHERE slot = ?
LIMIT 1
"#,
)
.bind(slot_i64)
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot fetch kb_chain_slots for slot '{}' on sqlite: {}",
slot, error
)));
}
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbChainSlotDto::try_from(entity);
match dto_result {
Ok(dto) => Ok(Some(dto)),
Err(error) => Err(error),
}
}
None => Ok(None),
}
}
}
}
/// Lists recent chain slots ordered from newest to oldest.
pub async fn list_recent_chain_slots(
database: &crate::KbDatabase,
limit: u32,
) -> Result<std::vec::Vec<crate::KbChainSlotDto>, crate::KbError> {
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbChainSlotEntity>(
r#"
SELECT
slot,
parent_slot,
block_time_unix,
created_at,
updated_at
FROM kb_chain_slots
ORDER BY slot DESC
LIMIT ?
"#,
)
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_chain_slots on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbChainSlotDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
async fn make_database() -> crate::KbDatabase {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("chain_slot.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed")
}
#[tokio::test]
async fn chain_slot_roundtrip_works() {
let database = make_database().await;
let dto = crate::KbChainSlotDto::new(424242, Some(424241), Some(1_777_777_777));
let slot = crate::upsert_chain_slot(&database, &dto)
.await
.expect("chain slot upsert must succeed");
assert_eq!(slot, 424242);
let fetched = crate::get_chain_slot(&database, 424242)
.await
.expect("chain slot fetch must succeed")
.expect("chain slot must exist");
assert_eq!(fetched.slot, 424242);
assert_eq!(fetched.parent_slot, Some(424241));
assert_eq!(fetched.block_time_unix, Some(1_777_777_777));
let listed = crate::list_recent_chain_slots(&database, 10)
.await
.expect("chain slot list must succeed");
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].slot, 424242);
assert_eq!(listed[0].parent_slot, Some(424241));
assert_eq!(listed[0].block_time_unix, Some(1_777_777_777));
}
}

View File

@@ -0,0 +1,265 @@
// file: kb_lib/src/db/queries/chain_transaction.rs
//! Queries for `kb_chain_transactions`.
/// Inserts or updates one normalized chain transaction row.
pub async fn upsert_chain_transaction(
database: &crate::KbDatabase,
dto: &crate::KbChainTransactionDto,
) -> Result<i64, crate::KbError> {
let slot_i64 = match dto.slot {
Some(slot) => {
let slot_result = i64::try_from(slot);
match slot_result {
Ok(slot) => Some(slot),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert chain transaction slot '{}' to i64: {}",
slot, error
)));
}
}
}
None => None,
};
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
INSERT INTO kb_chain_transactions (
signature,
slot,
block_time_unix,
source_endpoint_name,
version_text,
err_json,
meta_json,
transaction_json,
created_at,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(signature) DO UPDATE SET
slot = excluded.slot,
block_time_unix = excluded.block_time_unix,
source_endpoint_name = excluded.source_endpoint_name,
version_text = excluded.version_text,
err_json = excluded.err_json,
meta_json = excluded.meta_json,
transaction_json = excluded.transaction_json,
updated_at = excluded.updated_at
"#,
)
.bind(dto.signature.clone())
.bind(slot_i64)
.bind(dto.block_time_unix)
.bind(dto.source_endpoint_name.clone())
.bind(dto.version_text.clone())
.bind(dto.err_json.clone())
.bind(dto.meta_json.clone())
.bind(dto.transaction_json.clone())
.bind(dto.created_at.to_rfc3339())
.bind(dto.updated_at.to_rfc3339())
.execute(pool)
.await;
if let Err(error) = query_result {
return Err(crate::KbError::Db(format!(
"cannot upsert kb_chain_transactions on sqlite: {}",
error
)));
}
let id_result = sqlx::query_scalar::<sqlx::Sqlite, i64>(
r#"
SELECT id
FROM kb_chain_transactions
WHERE signature = ?
LIMIT 1
"#,
)
.bind(dto.signature.clone())
.fetch_one(pool)
.await;
match id_result {
Ok(id) => Ok(id),
Err(error) => Err(crate::KbError::Db(format!(
"cannot fetch kb_chain_transactions id for signature '{}' on sqlite: {}",
dto.signature, error
))),
}
}
}
}
/// Reads one chain transaction row by signature.
pub async fn get_chain_transaction_by_signature(
database: &crate::KbDatabase,
signature: &str,
) -> Result<std::option::Option<crate::KbChainTransactionDto>, crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbChainTransactionEntity>(
r#"
SELECT
id,
signature,
slot,
block_time_unix,
source_endpoint_name,
version_text,
err_json,
meta_json,
transaction_json,
created_at,
updated_at
FROM kb_chain_transactions
WHERE signature = ?
LIMIT 1
"#,
)
.bind(signature.to_string())
.fetch_optional(pool)
.await;
let entity_option = match query_result {
Ok(entity_option) => entity_option,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot fetch kb_chain_transactions for signature '{}' on sqlite: {}",
signature, error
)));
}
};
match entity_option {
Some(entity) => {
let dto_result = crate::KbChainTransactionDto::try_from(entity);
match dto_result {
Ok(dto) => Ok(Some(dto)),
Err(error) => Err(error),
}
}
None => Ok(None),
}
}
}
}
/// Lists recent chain transactions ordered from newest to oldest.
pub async fn list_recent_chain_transactions(
database: &crate::KbDatabase,
limit: u32,
) -> Result<std::vec::Vec<crate::KbChainTransactionDto>, crate::KbError> {
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbChainTransactionEntity>(
r#"
SELECT
id,
signature,
slot,
block_time_unix,
source_endpoint_name,
version_text,
err_json,
meta_json,
transaction_json,
created_at,
updated_at
FROM kb_chain_transactions
ORDER BY id DESC
LIMIT ?
"#,
)
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list kb_chain_transactions on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbChainTransactionDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
async fn make_database() -> crate::KbDatabase {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("chain_transaction.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed")
}
#[tokio::test]
async fn chain_transaction_roundtrip_works() {
let database = make_database().await;
let slot_dto = crate::KbChainSlotDto::new(515151, Some(515150), Some(1_700_000_001));
crate::upsert_chain_slot(&database, &slot_dto)
.await
.expect("chain slot upsert must succeed");
let dto = crate::KbChainTransactionDto::new(
"sig-chain-transaction-1".to_string(),
Some(515151),
Some(1_700_000_001),
Some("helius_primary_http".to_string()),
Some("0".to_string()),
None,
Some(r#"{"fee":5000}"#.to_string()),
r#"{"slot":515151,"transaction":{"message":{"instructions":[]}}}"#.to_string(),
);
let transaction_id = crate::upsert_chain_transaction(&database, &dto)
.await
.expect("chain transaction upsert must succeed");
assert!(transaction_id > 0);
let fetched =
crate::get_chain_transaction_by_signature(&database, "sig-chain-transaction-1")
.await
.expect("chain transaction fetch must succeed")
.expect("chain transaction must exist");
assert_eq!(fetched.id, Some(transaction_id));
assert_eq!(fetched.signature, "sig-chain-transaction-1");
assert_eq!(fetched.slot, Some(515151));
assert_eq!(fetched.block_time_unix, Some(1_700_000_001));
assert_eq!(
fetched.source_endpoint_name,
Some("helius_primary_http".to_string())
);
assert_eq!(fetched.version_text, Some("0".to_string()));
assert_eq!(fetched.meta_json, Some(r#"{"fee":5000}"#.to_string()));
let listed = crate::list_recent_chain_transactions(&database, 10)
.await
.expect("chain transaction list must succeed");
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].signature, "sig-chain-transaction-1");
assert_eq!(listed[0].slot, Some(515151));
}
}

View File

@@ -190,10 +190,35 @@ pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), cr
if let Err(error) = result {
return Err(error);
}
let result = create_kb_chain_slots_table(pool).await;
if let Err(error) = result {
return Err(error);
}
let result = create_kb_chain_transactions_table(pool).await;
if let Err(error) = result {
return Err(error);
}
let result = create_kb_idx_chain_transactions_slot(pool).await;
if let Err(error) = result {
return Err(error);
}
let result = create_kb_chain_instructions_table(pool).await;
if let Err(error) = result {
return Err(error);
}
let result = create_kb_idx_chain_instructions_transaction_id(pool).await;
if let Err(error) = result {
return Err(error);
}
let result = create_kb_idx_chain_instructions_program_id(pool).await;
if let Err(error) = result {
return Err(error);
}
let result = update_schema_version_metadata(database).await;
if let Err(error) = result {
return Err(error);
}
Ok(())
}
}
@@ -1029,6 +1054,126 @@ ON kb_token_burn_events (executed_at)
.await
}
/// Creates `kb_chain_slots`.
async fn create_kb_chain_slots_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> {
execute_sqlite_schema_statement(
pool,
"create_kb_chain_slots_table",
r#"
CREATE TABLE IF NOT EXISTS kb_chain_slots (
slot INTEGER NOT NULL PRIMARY KEY,
parent_slot INTEGER NULL,
block_time_unix INTEGER NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"#,
)
.await
}
/// Creates `kb_chain_transactions`.
async fn create_kb_chain_transactions_table(
pool: &sqlx::SqlitePool,
) -> Result<(), crate::KbError> {
execute_sqlite_schema_statement(
pool,
"create_kb_chain_transactions_table",
r#"
CREATE TABLE IF NOT EXISTS kb_chain_transactions (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
signature TEXT NOT NULL UNIQUE,
slot INTEGER NULL,
block_time_unix INTEGER NULL,
source_endpoint_name TEXT NULL,
version_text TEXT NULL,
err_json TEXT NULL,
meta_json TEXT NULL,
transaction_json TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(slot) REFERENCES kb_chain_slots(slot)
)
"#,
)
.await
}
/// Creates index on `kb_chain_transactions(slot)`.
async fn create_kb_idx_chain_transactions_slot(
pool: &sqlx::SqlitePool,
) -> Result<(), crate::KbError> {
execute_sqlite_schema_statement(
pool,
"create_kb_idx_chain_transactions_slot",
r#"
CREATE INDEX IF NOT EXISTS kb_idx_chain_transactions_slot
ON kb_chain_transactions (slot)
"#,
)
.await
}
/// Creates `kb_chain_instructions`.
async fn create_kb_chain_instructions_table(
pool: &sqlx::SqlitePool,
) -> Result<(), crate::KbError> {
execute_sqlite_schema_statement(
pool,
"create_kb_chain_instructions_table",
r#"
CREATE TABLE IF NOT EXISTS kb_chain_instructions (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
transaction_id INTEGER NOT NULL,
parent_instruction_id INTEGER NULL,
instruction_index INTEGER NOT NULL,
inner_instruction_index INTEGER NULL,
program_id TEXT NULL,
program_name TEXT NULL,
stack_height INTEGER NULL,
accounts_json TEXT NOT NULL,
data_json TEXT NULL,
parsed_type TEXT NULL,
parsed_json TEXT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(transaction_id) REFERENCES kb_chain_transactions(id),
FOREIGN KEY(parent_instruction_id) REFERENCES kb_chain_instructions(id)
)
"#,
)
.await
}
/// Creates index on `kb_chain_instructions(transaction_id)`.
async fn create_kb_idx_chain_instructions_transaction_id(
pool: &sqlx::SqlitePool,
) -> Result<(), crate::KbError> {
execute_sqlite_schema_statement(
pool,
"create_kb_idx_chain_instructions_transaction_id",
r#"
CREATE INDEX IF NOT EXISTS kb_idx_chain_instructions_transaction_id
ON kb_chain_instructions (transaction_id)
"#,
)
.await
}
/// Creates index on `kb_chain_instructions(program_id)`.
async fn create_kb_idx_chain_instructions_program_id(
pool: &sqlx::SqlitePool,
) -> Result<(), crate::KbError> {
execute_sqlite_schema_statement(
pool,
"create_kb_idx_chain_instructions_program_id",
r#"
CREATE INDEX IF NOT EXISTS kb_idx_chain_instructions_program_id
ON kb_chain_instructions (program_id)
"#,
)
.await
}
/// Updates the persisted schema version metadata entry.
async fn update_schema_version_metadata(
database: &crate::KbDatabase,

View File

@@ -13,13 +13,13 @@ mod pool_token_role;
mod runtime_event_level;
mod swap_trade_side;
pub use crate::db::types::analysis_signal_severity::KbAnalysisSignalSeverity;
pub use crate::db::types::database_backend::KbDatabaseBackend;
pub use crate::db::types::liquidity_event_kind::KbLiquidityEventKind;
pub use crate::db::types::observation_source_kind::KbObservationSourceKind;
pub use crate::db::types::observed_token_status::KbObservedTokenStatus;
pub use crate::db::types::pool_kind::KbPoolKind;
pub use crate::db::types::pool_status::KbPoolStatus;
pub use crate::db::types::pool_token_role::KbPoolTokenRole;
pub use crate::db::types::runtime_event_level::KbDbRuntimeEventLevel;
pub use crate::db::types::swap_trade_side::KbSwapTradeSide;
pub use analysis_signal_severity::KbAnalysisSignalSeverity;
pub use database_backend::KbDatabaseBackend;
pub use liquidity_event_kind::KbLiquidityEventKind;
pub use observation_source_kind::KbObservationSourceKind;
pub use observed_token_status::KbObservedTokenStatus;
pub use pool_kind::KbPoolKind;
pub use pool_status::KbPoolStatus;
pub use pool_token_role::KbPoolTokenRole;
pub use runtime_event_level::KbDbRuntimeEventLevel;
pub use swap_trade_side::KbSwapTradeSide;

View File

@@ -11,15 +11,15 @@ mod solana_ws;
mod types;
mod ws_relay;
pub use crate::detect::service::KbDetectionPersistenceService;
pub use crate::detect::solana_ws::KbSolanaWsDetectionOutcome;
pub use crate::detect::solana_ws::KbSolanaWsDetectionService;
pub use crate::detect::types::KbDetectionObservationInput;
pub use crate::detect::types::KbDetectionPoolCandidateInput;
pub use crate::detect::types::KbDetectionPoolCandidateResult;
pub use crate::detect::types::KbDetectionSignalInput;
pub use crate::detect::types::KbDetectionTokenCandidateInput;
pub use crate::detect::types::KbDetectionTokenCandidateResult;
pub use crate::detect::ws_relay::KbWsDetectionNotificationEnvelope;
pub use crate::detect::ws_relay::KbWsDetectionRelay;
pub use crate::detect::ws_relay::KbWsDetectionRelayStats;
pub use service::KbDetectionPersistenceService;
pub use solana_ws::KbSolanaWsDetectionOutcome;
pub use solana_ws::KbSolanaWsDetectionService;
pub use types::KbDetectionObservationInput;
pub use types::KbDetectionPoolCandidateInput;
pub use types::KbDetectionPoolCandidateResult;
pub use types::KbDetectionSignalInput;
pub use types::KbDetectionTokenCandidateInput;
pub use types::KbDetectionTokenCandidateResult;
pub use ws_relay::KbWsDetectionNotificationEnvelope;
pub use ws_relay::KbWsDetectionRelay;
pub use ws_relay::KbWsDetectionRelayStats;

View File

@@ -22,156 +22,173 @@ mod http_pool;
mod db;
mod detect;
mod tx_resolution;
mod tx_model;
pub use crate::constants::*;
pub use crate::error::KbError;
pub use crate::config::KbAppConfig;
pub use crate::config::KbConfig;
pub use crate::config::KbDataConfig;
pub use crate::config::KbHttpEndpointConfig;
pub use crate::config::KbLoggingConfig;
pub use crate::config::KbSolanaConfig;
pub use crate::config::KbWsEndpointConfig;
pub use crate::config::KbDatabaseConfig;
pub use crate::config::KbSqliteDatabaseConfig;
pub use crate::tracing::KbTracingGuard;
pub use crate::tracing::init_tracing;
pub use crate::types::KbConnectionState;
pub use crate::ws_client::WsClient;
pub use crate::ws_client::WsEvent;
pub use crate::ws_client::WsOutgoingMessage;
pub use crate::ws_client::WsSubscriptionInfo;
pub use crate::json_rpc_ws::KbJsonRpcWsErrorObject;
pub use crate::json_rpc_ws::KbJsonRpcWsErrorResponse;
pub use crate::json_rpc_ws::KbJsonRpcWsIncomingMessage;
pub use crate::json_rpc_ws::KbJsonRpcWsNotification;
pub use crate::json_rpc_ws::KbJsonRpcWsNotificationParams;
pub use crate::json_rpc_ws::KbJsonRpcWsRequest;
pub use crate::json_rpc_ws::KbJsonRpcWsSuccessResponse;
pub use crate::json_rpc_ws::kb_is_probable_json_rpc_object_text;
pub use crate::json_rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use crate::json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use crate::solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use crate::solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
pub use crate::ws_manager::WsManagedEndpointSnapshot;
pub use crate::ws_manager::WsManager;
pub use crate::ws_manager::WsManagerSnapshot;
pub use crate::http_client::HttpClient;
pub use crate::http_client::KbJsonRpcHttpErrorObject;
pub use crate::http_client::KbJsonRpcHttpErrorResponse;
pub use crate::http_client::KbJsonRpcHttpRequest;
pub use crate::http_client::KbJsonRpcHttpResponse;
pub use crate::http_client::KbJsonRpcHttpSuccessResponse;
pub use crate::http_client::KbHttpEndpointStatus;
pub use crate::http_client::KbHttpMethodClass;
pub use crate::http_client::parse_kb_json_rpc_http_response_text;
pub use crate::http_client::parse_kb_json_rpc_http_response_value;
pub use crate::http_pool::HttpEndpointPool;
pub use crate::http_pool::KbHttpPoolClientSnapshot;
pub use crate::db::KbDatabase;
pub use crate::db::KbDatabaseBackend;
pub use crate::db::KbDatabaseConnection;
pub use crate::db::KbDbMetadataDto;
pub use crate::db::KbDbMetadataEntity;
pub use crate::db::get_db_metadata;
pub use crate::db::list_db_metadata;
pub use crate::db::upsert_db_metadata;
pub use crate::db::KbDbRuntimeEventDto;
pub use crate::db::KbDbRuntimeEventEntity;
pub use crate::db::KbDbRuntimeEventLevel;
pub use crate::db::KbKnownHttpEndpointDto;
pub use crate::db::KbKnownHttpEndpointEntity;
pub use crate::db::KbKnownWsEndpointDto;
pub use crate::db::KbKnownWsEndpointEntity;
pub use crate::db::KbObservedTokenDto;
pub use crate::db::KbObservedTokenEntity;
pub use crate::db::KbObservedTokenStatus;
pub use crate::db::KbAnalysisSignalDto;
pub use crate::db::KbAnalysisSignalEntity;
pub use crate::db::KbAnalysisSignalSeverity;
pub use crate::db::KbObservationSourceKind;
pub use crate::db::KbOnchainObservationDto;
pub use crate::db::KbOnchainObservationEntity;
pub use crate::db::KbDexDto;
pub use crate::db::KbDexEntity;
pub use crate::db::KbPairDto;
pub use crate::db::KbPairEntity;
pub use crate::db::KbPoolDto;
pub use crate::db::KbPoolEntity;
pub use crate::db::KbPoolKind;
pub use crate::db::KbPoolListingDto;
pub use crate::db::KbPoolListingEntity;
pub use crate::db::KbPoolStatus;
pub use crate::db::KbPoolTokenDto;
pub use crate::db::KbPoolTokenEntity;
pub use crate::db::KbPoolTokenRole;
pub use crate::db::KbTokenDto;
pub use crate::db::KbTokenEntity;
pub use crate::db::KbLiquidityEventDto;
pub use crate::db::KbLiquidityEventEntity;
pub use crate::db::KbLiquidityEventKind;
pub use crate::db::KbSwapDto;
pub use crate::db::KbSwapEntity;
pub use crate::db::KbSwapTradeSide;
pub use crate::db::KbTokenBurnEventDto;
pub use crate::db::KbTokenBurnEventEntity;
pub use crate::db::KbTokenMintEventDto;
pub use crate::db::KbTokenMintEventEntity;
pub use crate::db::list_recent_liquidity_events;
pub use crate::db::list_recent_swaps;
pub use crate::db::list_recent_token_burn_events;
pub use crate::db::list_recent_token_mint_events;
pub use crate::db::upsert_liquidity_event;
pub use crate::db::upsert_swap;
pub use crate::db::upsert_token_burn_event;
pub use crate::db::upsert_token_mint_event;
pub use crate::db::get_token_by_mint;
pub use crate::db::list_dexes;
pub use crate::db::upsert_dex;
pub use crate::db::upsert_pair;
pub use crate::db::upsert_pool;
pub use crate::db::upsert_pool_listing;
pub use crate::db::upsert_pool_token;
pub use crate::db::upsert_token;
pub use crate::db::insert_analysis_signal;
pub use crate::db::insert_onchain_observation;
pub use crate::db::list_recent_analysis_signals;
pub use crate::db::list_recent_onchain_observations;
pub use crate::db::get_observed_token_by_mint;
pub use crate::db::list_observed_tokens;
pub use crate::db::upsert_observed_token;
pub use crate::db::get_known_http_endpoint;
pub use crate::db::get_known_ws_endpoint;
pub use crate::db::insert_db_runtime_event;
pub use crate::db::list_known_http_endpoints;
pub use crate::db::list_known_ws_endpoints;
pub use crate::db::list_recent_db_runtime_events;
pub use crate::db::upsert_known_http_endpoint;
pub use crate::db::upsert_known_ws_endpoint;
pub use crate::db::get_dex_by_code;
pub use crate::db::get_pair_by_pool_id;
pub use crate::db::get_pool_by_address;
pub use crate::db::get_pool_listing_by_pool_id;
pub use crate::db::list_pairs;
pub use crate::db::list_pool_listings;
pub use crate::db::list_pool_tokens_by_pool_id;
pub use crate::db::list_pools;
pub use crate::detect::KbDetectionObservationInput;
pub use crate::detect::KbDetectionPersistenceService;
pub use crate::detect::KbDetectionSignalInput;
pub use crate::detect::KbDetectionTokenCandidateInput;
pub use crate::detect::KbDetectionTokenCandidateResult;
pub use crate::detect::KbSolanaWsDetectionOutcome;
pub use crate::detect::KbSolanaWsDetectionService;
pub use crate::detect::KbWsDetectionNotificationEnvelope;
pub use crate::detect::KbWsDetectionRelay;
pub use crate::detect::KbWsDetectionRelayStats;
pub use crate::detect::KbDetectionPoolCandidateInput;
pub use crate::detect::KbDetectionPoolCandidateResult;
pub use crate::tx_resolution::KbTransactionResolutionOutcome;
pub use crate::tx_resolution::KbTransactionResolutionRequest;
pub use crate::tx_resolution::KbTransactionResolutionService;
pub use crate::tx_resolution::KbWsTransactionResolutionEnvelope;
pub use crate::tx_resolution::KbWsTransactionResolutionRelay;
pub use crate::tx_resolution::KbWsTransactionResolutionRelayStats;
pub use constants::*;
pub use error::KbError;
pub use config::KbAppConfig;
pub use config::KbConfig;
pub use config::KbDataConfig;
pub use config::KbHttpEndpointConfig;
pub use config::KbLoggingConfig;
pub use config::KbSolanaConfig;
pub use config::KbWsEndpointConfig;
pub use config::KbDatabaseConfig;
pub use config::KbSqliteDatabaseConfig;
pub use tracing::KbTracingGuard;
pub use tracing::init_tracing;
pub use types::KbConnectionState;
pub use ws_client::WsClient;
pub use ws_client::WsEvent;
pub use ws_client::WsOutgoingMessage;
pub use ws_client::WsSubscriptionInfo;
pub use json_rpc_ws::KbJsonRpcWsErrorObject;
pub use json_rpc_ws::KbJsonRpcWsErrorResponse;
pub use json_rpc_ws::KbJsonRpcWsIncomingMessage;
pub use json_rpc_ws::KbJsonRpcWsNotification;
pub use json_rpc_ws::KbJsonRpcWsNotificationParams;
pub use json_rpc_ws::KbJsonRpcWsRequest;
pub use json_rpc_ws::KbJsonRpcWsSuccessResponse;
pub use json_rpc_ws::kb_is_probable_json_rpc_object_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_text;
pub use json_rpc_ws::parse_kb_json_rpc_ws_incoming_value;
pub use solana_pubsub_ws::KbSolanaWsTypedNotification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification;
pub use solana_pubsub_ws::parse_kb_solana_ws_typed_notification_from_event;
pub use ws_manager::WsManagedEndpointSnapshot;
pub use ws_manager::WsManager;
pub use ws_manager::WsManagerSnapshot;
pub use http_client::HttpClient;
pub use http_client::KbJsonRpcHttpErrorObject;
pub use http_client::KbJsonRpcHttpErrorResponse;
pub use http_client::KbJsonRpcHttpRequest;
pub use http_client::KbJsonRpcHttpResponse;
pub use http_client::KbJsonRpcHttpSuccessResponse;
pub use http_client::KbHttpEndpointStatus;
pub use http_client::KbHttpMethodClass;
pub use http_client::parse_kb_json_rpc_http_response_text;
pub use http_client::parse_kb_json_rpc_http_response_value;
pub use http_pool::HttpEndpointPool;
pub use http_pool::KbHttpPoolClientSnapshot;
pub use db::KbDatabase;
pub use db::KbDatabaseBackend;
pub use db::KbDatabaseConnection;
pub use db::KbDbMetadataDto;
pub use db::KbDbMetadataEntity;
pub use db::get_db_metadata;
pub use db::list_db_metadata;
pub use db::upsert_db_metadata;
pub use db::KbDbRuntimeEventDto;
pub use db::KbDbRuntimeEventEntity;
pub use db::KbDbRuntimeEventLevel;
pub use db::KbKnownHttpEndpointDto;
pub use db::KbKnownHttpEndpointEntity;
pub use db::KbKnownWsEndpointDto;
pub use db::KbKnownWsEndpointEntity;
pub use db::KbObservedTokenDto;
pub use db::KbObservedTokenEntity;
pub use db::KbObservedTokenStatus;
pub use db::KbAnalysisSignalDto;
pub use db::KbAnalysisSignalEntity;
pub use db::KbAnalysisSignalSeverity;
pub use db::KbObservationSourceKind;
pub use db::KbOnchainObservationDto;
pub use db::KbOnchainObservationEntity;
pub use db::KbDexDto;
pub use db::KbDexEntity;
pub use db::KbPairDto;
pub use db::KbPairEntity;
pub use db::KbPoolDto;
pub use db::KbPoolEntity;
pub use db::KbPoolKind;
pub use db::KbPoolListingDto;
pub use db::KbPoolListingEntity;
pub use db::KbPoolStatus;
pub use db::KbPoolTokenDto;
pub use db::KbPoolTokenEntity;
pub use db::KbPoolTokenRole;
pub use db::KbTokenDto;
pub use db::KbTokenEntity;
pub use db::KbLiquidityEventDto;
pub use db::KbLiquidityEventEntity;
pub use db::KbLiquidityEventKind;
pub use db::KbSwapDto;
pub use db::KbSwapEntity;
pub use db::KbSwapTradeSide;
pub use db::KbTokenBurnEventDto;
pub use db::KbTokenBurnEventEntity;
pub use db::KbTokenMintEventDto;
pub use db::KbTokenMintEventEntity;
pub use db::KbChainInstructionDto;
pub use db::KbChainSlotDto;
pub use db::KbChainTransactionDto;
pub use db::KbChainInstructionEntity;
pub use db::KbChainSlotEntity;
pub use db::KbChainTransactionEntity;
pub use db::delete_chain_instructions_by_transaction_id;
pub use db::get_chain_slot;
pub use db::get_chain_transaction_by_signature;
pub use db::insert_chain_instruction;
pub use db::list_chain_instructions_by_transaction_id;
pub use db::list_recent_chain_slots;
pub use db::list_recent_chain_transactions;
pub use db::upsert_chain_slot;
pub use db::upsert_chain_transaction;
pub use db::list_recent_liquidity_events;
pub use db::list_recent_swaps;
pub use db::list_recent_token_burn_events;
pub use db::list_recent_token_mint_events;
pub use db::upsert_liquidity_event;
pub use db::upsert_swap;
pub use db::upsert_token_burn_event;
pub use db::upsert_token_mint_event;
pub use db::get_token_by_mint;
pub use db::list_dexes;
pub use db::upsert_dex;
pub use db::upsert_pair;
pub use db::upsert_pool;
pub use db::upsert_pool_listing;
pub use db::upsert_pool_token;
pub use db::upsert_token;
pub use db::insert_analysis_signal;
pub use db::insert_onchain_observation;
pub use db::list_recent_analysis_signals;
pub use db::list_recent_onchain_observations;
pub use db::get_observed_token_by_mint;
pub use db::list_observed_tokens;
pub use db::upsert_observed_token;
pub use db::get_known_http_endpoint;
pub use db::get_known_ws_endpoint;
pub use db::insert_db_runtime_event;
pub use db::list_known_http_endpoints;
pub use db::list_known_ws_endpoints;
pub use db::list_recent_db_runtime_events;
pub use db::upsert_known_http_endpoint;
pub use db::upsert_known_ws_endpoint;
pub use db::get_dex_by_code;
pub use db::get_pair_by_pool_id;
pub use db::get_pool_by_address;
pub use db::get_pool_listing_by_pool_id;
pub use db::list_pairs;
pub use db::list_pool_listings;
pub use db::list_pool_tokens_by_pool_id;
pub use db::list_pools;
pub use detect::KbDetectionObservationInput;
pub use detect::KbDetectionPersistenceService;
pub use detect::KbDetectionSignalInput;
pub use detect::KbDetectionTokenCandidateInput;
pub use detect::KbDetectionTokenCandidateResult;
pub use detect::KbSolanaWsDetectionOutcome;
pub use detect::KbSolanaWsDetectionService;
pub use detect::KbWsDetectionNotificationEnvelope;
pub use detect::KbWsDetectionRelay;
pub use detect::KbWsDetectionRelayStats;
pub use detect::KbDetectionPoolCandidateInput;
pub use detect::KbDetectionPoolCandidateResult;
pub use tx_resolution::KbTransactionResolutionOutcome;
pub use tx_resolution::KbTransactionResolutionRequest;
pub use tx_resolution::KbTransactionResolutionService;
pub use tx_resolution::KbWsTransactionResolutionEnvelope;
pub use tx_resolution::KbWsTransactionResolutionRelay;
pub use tx_resolution::KbWsTransactionResolutionRelayStats;
pub use tx_model::KbTransactionModelService;

695
kb_lib/src/tx_model.rs Normal file
View File

@@ -0,0 +1,695 @@
// file: kb_lib/src/tx_model.rs
//! Projection of resolved transactions into normalized internal DB tables.
/// Service projecting resolved transaction JSON into internal chain tables.
#[derive(Debug, Clone)]
pub struct KbTransactionModelService {
database: std::sync::Arc<crate::KbDatabase>,
}
impl KbTransactionModelService {
/// Creates one transaction model projection service.
pub fn new(database: std::sync::Arc<crate::KbDatabase>) -> Self {
Self { database }
}
/// Projects one resolved transaction JSON into slots / transactions / instructions tables.
pub async fn persist_resolved_transaction(
&self,
signature: &str,
source_endpoint_name: std::option::Option<std::string::String>,
resolved_transaction: &serde_json::Value,
) -> Result<i64, crate::KbError> {
let slot_i64 = resolved_transaction
.get("slot")
.and_then(serde_json::Value::as_i64);
let slot_u64 = match slot_i64 {
Some(slot_i64) => {
let convert_result = u64::try_from(slot_i64);
match convert_result {
Ok(slot_u64) => Some(slot_u64),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert resolved transaction slot '{}' to u64: {}",
slot_i64, error
)));
}
}
}
None => None,
};
let block_time_unix = resolved_transaction
.get("blockTime")
.and_then(serde_json::Value::as_i64);
if let Some(slot_value) = slot_u64 {
let slot_dto = crate::KbChainSlotDto::new(slot_value, None, block_time_unix);
let upsert_result = crate::upsert_chain_slot(self.database.as_ref(), &slot_dto).await;
if let Err(error) = upsert_result {
return Err(error);
}
}
let version_text = kb_extract_version_text(resolved_transaction);
let err_json = kb_extract_meta_err_json(resolved_transaction);
let meta_json = kb_extract_meta_json(resolved_transaction);
let transaction_json_result = serde_json::to_string(resolved_transaction);
let transaction_json = match transaction_json_result {
Ok(transaction_json) => transaction_json,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize resolved transaction '{}': {}",
signature, error
)));
}
};
let transaction_dto = crate::KbChainTransactionDto::new(
signature.to_string(),
slot_u64,
block_time_unix,
source_endpoint_name,
version_text,
err_json,
meta_json,
transaction_json,
);
let transaction_id_result =
crate::upsert_chain_transaction(self.database.as_ref(), &transaction_dto).await;
let transaction_id = match transaction_id_result {
Ok(transaction_id) => transaction_id,
Err(error) => return Err(error),
};
let delete_result = crate::delete_chain_instructions_by_transaction_id(
self.database.as_ref(),
transaction_id,
)
.await;
if let Err(error) = delete_result {
return Err(error);
}
let outer_instructions = kb_extract_outer_instructions(resolved_transaction);
let mut outer_instruction_ids = std::collections::BTreeMap::<u32, i64>::new();
for (index, instruction) in outer_instructions.iter().enumerate() {
let index_result = u32::try_from(index);
let instruction_index = match index_result {
Ok(instruction_index) => instruction_index,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert outer instruction index '{}' to u32: {}",
index, error
)));
}
};
let dto_result = kb_build_instruction_dto(
transaction_id,
None,
instruction_index,
None,
instruction,
);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
let instruction_id_result =
crate::insert_chain_instruction(self.database.as_ref(), &dto).await;
let instruction_id = match instruction_id_result {
Ok(instruction_id) => instruction_id,
Err(error) => return Err(error),
};
outer_instruction_ids.insert(instruction_index, instruction_id);
}
let inner_instruction_groups = kb_extract_inner_instruction_groups(resolved_transaction);
for group in &inner_instruction_groups {
let outer_index = match group.index {
Some(outer_index) => outer_index,
None => continue,
};
let parent_instruction_id_option = outer_instruction_ids.get(&outer_index).cloned();
let parent_instruction_id = match parent_instruction_id_option {
Some(parent_instruction_id) => parent_instruction_id,
None => continue,
};
for (inner_index, inner_instruction) in group.instructions.iter().enumerate() {
let inner_index_result = u32::try_from(inner_index);
let inner_instruction_index = match inner_index_result {
Ok(inner_instruction_index) => inner_instruction_index,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert inner instruction index '{}' to u32: {}",
inner_index, error
)));
}
};
let dto_result = kb_build_instruction_dto(
transaction_id,
Some(parent_instruction_id),
outer_index,
Some(inner_instruction_index),
inner_instruction,
);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
let insert_result =
crate::insert_chain_instruction(self.database.as_ref(), &dto).await;
if let Err(error) = insert_result {
return Err(error);
}
}
}
Ok(transaction_id)
}
}
#[derive(Debug, Clone)]
struct KbInnerInstructionGroup {
index: std::option::Option<u32>,
instructions: std::vec::Vec<serde_json::Value>,
}
fn kb_extract_version_text(
resolved_transaction: &serde_json::Value,
) -> std::option::Option<std::string::String> {
let version_value_option = resolved_transaction.get("version");
let version_value = match version_value_option {
Some(version_value) => version_value,
None => return None,
};
if let Some(version_text) = version_value.as_str() {
return Some(version_text.to_string());
}
if let Some(version_number) = version_value.as_i64() {
return Some(version_number.to_string());
}
None
}
fn kb_extract_meta_json(
resolved_transaction: &serde_json::Value,
) -> std::option::Option<std::string::String> {
let meta_option = resolved_transaction.get("meta");
let meta = match meta_option {
Some(meta) => meta,
None => return None,
};
let serialize_result = serde_json::to_string(meta);
match serialize_result {
Ok(json_text) => Some(json_text),
Err(_) => None,
}
}
fn kb_extract_meta_err_json(
resolved_transaction: &serde_json::Value,
) -> std::option::Option<std::string::String> {
let meta_option = resolved_transaction.get("meta");
let meta = match meta_option {
Some(meta) => meta,
None => return None,
};
let err_option = meta.get("err");
let err = match err_option {
Some(err) => err,
None => return None,
};
if err.is_null() {
return None;
}
let serialize_result = serde_json::to_string(err);
match serialize_result {
Ok(json_text) => Some(json_text),
Err(_) => None,
}
}
fn kb_extract_outer_instructions(
resolved_transaction: &serde_json::Value,
) -> std::vec::Vec<serde_json::Value> {
let mut instructions = std::vec::Vec::new();
let transaction_option = resolved_transaction.get("transaction");
let transaction = match transaction_option {
Some(transaction) => transaction,
None => return instructions,
};
let message_option = transaction.get("message");
let message = match message_option {
Some(message) => message,
None => return instructions,
};
let instructions_option = message.get("instructions");
let instructions_value = match instructions_option {
Some(instructions_value) => instructions_value,
None => return instructions,
};
let array_option = instructions_value.as_array();
let array = match array_option {
Some(array) => array,
None => return instructions,
};
for instruction in array {
instructions.push(instruction.clone());
}
instructions
}
fn kb_extract_inner_instruction_groups(
resolved_transaction: &serde_json::Value,
) -> std::vec::Vec<KbInnerInstructionGroup> {
let mut groups = std::vec::Vec::new();
let meta_option = resolved_transaction.get("meta");
let meta = match meta_option {
Some(meta) => meta,
None => return groups,
};
let inner_option = meta.get("innerInstructions");
let inner_value = match inner_option {
Some(inner_value) => inner_value,
None => return groups,
};
let array_option = inner_value.as_array();
let array = match array_option {
Some(array) => array,
None => return groups,
};
for group in array {
let index = match group.get("index").and_then(serde_json::Value::as_i64) {
Some(index_i64) => {
let convert_result = u32::try_from(index_i64);
match convert_result {
Ok(index_u32) => Some(index_u32),
Err(_) => None,
}
}
None => None,
};
let mut instructions = std::vec::Vec::new();
let instructions_option = group.get("instructions");
if let Some(instructions_value) = instructions_option {
if let Some(instructions_array) = instructions_value.as_array() {
for instruction in instructions_array {
instructions.push(instruction.clone());
}
}
}
groups.push(KbInnerInstructionGroup {
index,
instructions,
});
}
groups
}
fn kb_build_instruction_dto(
transaction_id: i64,
parent_instruction_id: std::option::Option<i64>,
instruction_index: u32,
inner_instruction_index: std::option::Option<u32>,
instruction: &serde_json::Value,
) -> Result<crate::KbChainInstructionDto, crate::KbError> {
let program_id = instruction
.get("programId")
.and_then(serde_json::Value::as_str)
.map(std::string::ToString::to_string);
let program_name = instruction
.get("program")
.and_then(serde_json::Value::as_str)
.map(std::string::ToString::to_string);
let stack_height = match instruction
.get("stackHeight")
.and_then(serde_json::Value::as_i64)
{
Some(stack_height_i64) => {
let stack_height_result = u32::try_from(stack_height_i64);
match stack_height_result {
Ok(stack_height) => Some(stack_height),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert instruction stack_height '{}' to u32: {}",
stack_height_i64, error
)));
}
}
}
None => None,
};
let accounts_json = if let Some(accounts) = instruction.get("accounts") {
let serialize_result = serde_json::to_string(accounts);
match serialize_result {
Ok(text) => text,
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize instruction accounts json: {}",
error
)));
}
}
} else {
"[]".to_string()
};
let data_json = match instruction.get("data") {
Some(data_value) => {
let serialize_result = serde_json::to_string(data_value);
match serialize_result {
Ok(text) => Some(text),
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize instruction data json: {}",
error
)));
}
}
}
None => None,
};
let parsed_type = instruction
.get("parsed")
.and_then(|parsed| parsed.get("type"))
.and_then(serde_json::Value::as_str)
.map(std::string::ToString::to_string);
let parsed_json = match instruction.get("parsed") {
Some(parsed_value) => {
let serialize_result = serde_json::to_string(parsed_value);
match serialize_result {
Ok(text) => Some(text),
Err(error) => {
return Err(crate::KbError::Json(format!(
"cannot serialize instruction parsed json: {}",
error
)));
}
}
}
None => None,
};
Ok(crate::KbChainInstructionDto::new(
transaction_id,
parent_instruction_id,
instruction_index,
inner_instruction_index,
program_id,
program_name,
stack_height,
accounts_json,
data_json,
parsed_type,
parsed_json,
))
}
#[cfg(test)]
mod tests {
async fn make_database() -> std::sync::Arc<crate::KbDatabase> {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("tx_model.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
std::sync::Arc::new(database)
}
#[tokio::test]
async fn persist_resolved_transaction_projects_outer_instructions() {
let database = make_database().await;
let service = crate::KbTransactionModelService::new(database.clone());
let resolved_transaction = serde_json::json!({
"slot": 700001,
"blockTime": 1777000001,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": crate::SPL_TOKEN_PROGRAM_ID,
"program": "spl-token",
"stackHeight": 1,
"accounts": ["A111", "B222"],
"data": "raw-ix-0",
"parsed": {
"type": "transfer",
"info": { "amount": "100" }
}
},
{
"programId": "11111111111111111111111111111111",
"program": "system",
"stackHeight": 1,
"accounts": ["C333", "D444"],
"data": "raw-ix-1"
}
]
}
},
"meta": {
"err": null,
"fee": 5000
}
});
let transaction_id = service
.persist_resolved_transaction(
"sig-model-outer-1",
Some("helius_primary_http".to_string()),
&resolved_transaction,
)
.await
.expect("persist_resolved_transaction must succeed");
assert!(transaction_id > 0);
let slot = crate::get_chain_slot(database.as_ref(), 700001)
.await
.expect("chain slot fetch must succeed")
.expect("chain slot must exist");
assert_eq!(slot.slot, 700001);
assert_eq!(slot.block_time_unix, Some(1777000001));
let transaction =
crate::get_chain_transaction_by_signature(database.as_ref(), "sig-model-outer-1")
.await
.expect("chain transaction fetch must succeed")
.expect("chain transaction must exist");
assert_eq!(transaction.id, Some(transaction_id));
assert_eq!(transaction.slot, Some(700001));
assert_eq!(
transaction.source_endpoint_name,
Some("helius_primary_http".to_string())
);
assert_eq!(transaction.version_text, Some("0".to_string()));
let instructions =
crate::list_chain_instructions_by_transaction_id(database.as_ref(), transaction_id)
.await
.expect("chain instructions list must succeed");
assert_eq!(instructions.len(), 2);
assert_eq!(instructions[0].instruction_index, 0);
assert_eq!(instructions[0].inner_instruction_index, None);
assert_eq!(instructions[0].program_name, Some("spl-token".to_string()));
assert_eq!(instructions[0].parsed_type, Some("transfer".to_string()));
assert_eq!(instructions[1].instruction_index, 1);
assert_eq!(instructions[1].inner_instruction_index, None);
assert_eq!(instructions[1].program_name, Some("system".to_string()));
assert_eq!(instructions[1].parsed_type, None);
}
#[tokio::test]
async fn persist_resolved_transaction_projects_inner_instructions() {
let database = make_database().await;
let service = crate::KbTransactionModelService::new(database.clone());
let resolved_transaction = serde_json::json!({
"slot": 700002,
"blockTime": 1777000002,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": "OuterProgram1111111111111111111111111111111",
"program": "outer-program",
"stackHeight": 1,
"accounts": ["PARENT1"],
"data": "raw-parent",
"parsed": {
"type": "outerInstruction",
"info": { "k": "v" }
}
}
]
}
},
"meta": {
"err": null,
"innerInstructions": [
{
"index": 0,
"instructions": [
{
"programId": crate::SPL_TOKEN_PROGRAM_ID,
"program": "spl-token",
"stackHeight": 2,
"accounts": ["INNER1", "INNER2"],
"data": "raw-inner-0",
"parsed": {
"type": "mintTo",
"info": { "amount": "5" }
}
},
{
"programId": crate::SPL_TOKEN_PROGRAM_ID,
"program": "spl-token",
"stackHeight": 2,
"accounts": ["INNER3", "INNER4"],
"data": "raw-inner-1",
"parsed": {
"type": "burn",
"info": { "amount": "2" }
}
}
]
}
]
}
});
let transaction_id = service
.persist_resolved_transaction(
"sig-model-inner-1",
Some("mainnet_public_http".to_string()),
&resolved_transaction,
)
.await
.expect("persist_resolved_transaction must succeed");
let instructions =
crate::list_chain_instructions_by_transaction_id(database.as_ref(), transaction_id)
.await
.expect("chain instructions list must succeed");
assert_eq!(instructions.len(), 3);
let parent = &instructions[0];
assert_eq!(parent.parent_instruction_id, None);
assert_eq!(parent.instruction_index, 0);
assert_eq!(parent.inner_instruction_index, None);
let inner_a = &instructions[1];
assert_eq!(inner_a.parent_instruction_id, parent.id);
assert_eq!(inner_a.instruction_index, 0);
assert_eq!(inner_a.inner_instruction_index, Some(0));
assert_eq!(inner_a.parsed_type, Some("mintTo".to_string()));
let inner_b = &instructions[2];
assert_eq!(inner_b.parent_instruction_id, parent.id);
assert_eq!(inner_b.instruction_index, 0);
assert_eq!(inner_b.inner_instruction_index, Some(1));
assert_eq!(inner_b.parsed_type, Some("burn".to_string()));
}
#[tokio::test]
async fn persist_resolved_transaction_replaces_existing_projection() {
let database = make_database().await;
let service = crate::KbTransactionModelService::new(database.clone());
let first_resolved_transaction = serde_json::json!({
"slot": 700003,
"blockTime": 1777000003,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": "FirstProgram1111111111111111111111111111111",
"program": "first-program",
"stackHeight": 1,
"accounts": ["A1"],
"data": "raw-a1"
},
{
"programId": "SecondProgram111111111111111111111111111111",
"program": "second-program",
"stackHeight": 1,
"accounts": ["A2"],
"data": "raw-a2"
}
]
}
},
"meta": {
"err": null
}
});
let transaction_id_first = service
.persist_resolved_transaction(
"sig-model-replace-1",
Some("helius_primary_http".to_string()),
&first_resolved_transaction,
)
.await
.expect("first projection must succeed");
let first_instructions = crate::list_chain_instructions_by_transaction_id(
database.as_ref(),
transaction_id_first,
)
.await
.expect("first chain instructions list must succeed");
assert_eq!(first_instructions.len(), 2);
let second_resolved_transaction = serde_json::json!({
"slot": 700003,
"blockTime": 1777000004,
"version": 0,
"transaction": {
"message": {
"instructions": [
{
"programId": "OnlyProgram11111111111111111111111111111111",
"program": "only-program",
"stackHeight": 1,
"accounts": ["B1"],
"data": "raw-b1",
"parsed": {
"type": "singleInstruction",
"info": { "flag": true }
}
}
]
}
},
"meta": {
"err": null
}
});
let transaction_id_second = service
.persist_resolved_transaction(
"sig-model-replace-1",
Some("helius_primary_http".to_string()),
&second_resolved_transaction,
)
.await
.expect("second projection must succeed");
assert_eq!(transaction_id_first, transaction_id_second);
let second_instructions = crate::list_chain_instructions_by_transaction_id(
database.as_ref(),
transaction_id_second,
)
.await
.expect("second chain instructions list must succeed");
assert_eq!(second_instructions.len(), 1);
assert_eq!(second_instructions[0].instruction_index, 0);
assert_eq!(
second_instructions[0].program_name,
Some("only-program".to_string())
);
assert_eq!(
second_instructions[0].parsed_type,
Some("singleInstruction".to_string())
);
let transaction =
crate::get_chain_transaction_by_signature(database.as_ref(), "sig-model-replace-1")
.await
.expect("transaction fetch must succeed")
.expect("transaction must exist");
assert_eq!(transaction.block_time_unix, Some(1777000004));
}
}

View File

@@ -98,6 +98,7 @@ pub struct KbWsTransactionResolutionRelayStats {
pub struct KbTransactionResolutionService {
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
persistence: crate::KbDetectionPersistenceService,
transaction_model: crate::KbTransactionModelService,
http_role: std::string::String,
resolved_signatures:
std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<std::string::String>>>,
@@ -107,12 +108,15 @@ impl KbTransactionResolutionService {
/// Creates a new transaction resolution service.
pub fn new(
http_pool: std::sync::Arc<crate::HttpEndpointPool>,
persistence: crate::KbDetectionPersistenceService,
database: std::sync::Arc<crate::KbDatabase>,
http_role: std::string::String,
) -> Self {
let persistence = crate::KbDetectionPersistenceService::new(database.clone());
let transaction_model = crate::KbTransactionModelService::new(database);
Self {
http_pool,
persistence,
transaction_model,
http_role,
resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashSet::new(),
@@ -274,12 +278,25 @@ impl KbTransactionResolutionService {
.get("slot")
.and_then(serde_json::Value::as_u64)
.or(request.slot_hint);
let projection_result = self
.transaction_model
.persist_resolved_transaction(
request.signature.as_str(),
request.source_endpoint_name.clone(),
&transaction_value,
)
.await;
let projected_transaction_id = match projection_result {
Ok(projected_transaction_id) => projected_transaction_id,
Err(error) => return Err(error),
};
let payload = serde_json::json!({
"status": "resolved",
"signature": request.signature.clone(),
"triggerMethod": request.trigger_method.clone(),
"sourceEndpointName": request.source_endpoint_name.clone(),
"slotHint": request.slot_hint,
"projectedTransactionId": projected_transaction_id,
"triggerPayload": request.trigger_payload.clone(),
"transaction": transaction_value
});

View File

@@ -1463,13 +1463,23 @@ fn kb_build_pending_json_rpc_request(
request: &crate::KbJsonRpcWsRequest,
) -> std::option::Option<WsPendingJsonRpcRequest> {
let request_id_option = kb_json_value_to_u64(&request.id);
let request_id = request_id_option?;
let request_id = match request_id_option {
Some(request_id) => request_id,
None => return None,
};
if kb_is_subscribe_method(&request.method) {
let notification_method_option =
kb_infer_notification_method_from_subscribe(&request.method);
let unsubscribe_method_option = kb_infer_unsubscribe_method_from_subscribe(&request.method);
let notification_method = notification_method_option?;
let unsubscribe_method = unsubscribe_method_option?;
let notification_method = match notification_method_option {
Some(notification_method) => notification_method,
None => return None,
};
let unsubscribe_method_option =
kb_infer_unsubscribe_method_from_subscribe(&request.method);
let unsubscribe_method = match unsubscribe_method_option {
Some(unsubscribe_method) => unsubscribe_method,
None => return None,
};
return Some(WsPendingJsonRpcRequest {
request_id,
method: request.method.clone(),
@@ -1482,9 +1492,15 @@ fn kb_build_pending_json_rpc_request(
}
if kb_is_unsubscribe_method(&request.method) {
let first_param_option = request.params.first();
let first_param = first_param_option?;
let first_param = match first_param_option {
Some(first_param) => first_param,
None => return None,
};
let subscription_id_option = first_param.as_u64();
let subscription_id = subscription_id_option?;
let subscription_id = match subscription_id_option {
Some(subscription_id) => subscription_id,
None => return None,
};
return Some(WsPendingJsonRpcRequest {
request_id,
method: request.method.clone(),

View File

@@ -469,9 +469,11 @@ impl WsManager {
));
}
}
let persistence = crate::KbDetectionPersistenceService::new(database);
let resolver =
crate::KbTransactionResolutionService::new(http_pool, persistence, http_role);
let resolver = crate::KbTransactionResolutionService::new(
http_pool,
database,
http_role,
);
let relay = crate::KbWsTransactionResolutionRelay::new(resolver);
let (sender, receiver) = crate::KbWsTransactionResolutionRelay::channel(queue_capacity);
let relay_task = relay.spawn(receiver);