From 081758995a581deda42cca481b10cf31d2e77122 Mon Sep 17 00:00:00 2001 From: SinuS Von SifriduS Date: Sun, 26 Apr 2026 12:34:23 +0200 Subject: [PATCH] 0.7.2 --- CHANGELOG.md | 3 +- Cargo.toml | 2 +- ROADMAP.md | 27 +- kb_lib/src/db.rs | 6 +- kb_lib/src/db/dtos.rs | 2 + kb_lib/src/db/dtos/dex_decoded_event.rs | 100 ++++++ kb_lib/src/db/entities.rs | 2 + kb_lib/src/db/entities/dex_decoded_event.rs | 34 ++ kb_lib/src/db/queries.rs | 4 + kb_lib/src/db/queries/dex_decoded_event.rs | 322 +++++++++++++++++ kb_lib/src/db/schema.rs | 105 ++++-- kb_lib/src/dex.rs | 10 + kb_lib/src/dex/raydium_amm_v4.rs | 333 ++++++++++++++++++ kb_lib/src/dex_decode.rs | 361 ++++++++++++++++++++ kb_lib/src/lib.rs | 12 + kb_lib/src/tx_resolution.rs | 16 +- 16 files changed, 1302 insertions(+), 37 deletions(-) create mode 100644 kb_lib/src/db/dtos/dex_decoded_event.rs create mode 100644 kb_lib/src/db/entities/dex_decoded_event.rs create mode 100644 kb_lib/src/db/queries/dex_decoded_event.rs create mode 100644 kb_lib/src/dex.rs create mode 100644 kb_lib/src/dex/raydium_amm_v4.rs create mode 100644 kb_lib/src/dex_decode.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e85482..867412a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,4 +31,5 @@ 0.6.5 - Ajout de ws_manager.rs pour l’orchestration multi-clients WebSocket, le bus d’événements unifié et le branchement centralisé du relais de détection 0.6.6 - Ajout de la fenêtre Demo Ws Manager dans kb_app pour piloter plusieurs WsClient, visualiser le snapshot consolidé, tester le démarrage/arrêt par rôle et valider le flux unifié de WsEvent 0.7.0 - Ajout du socle de résolution transactionnelle orientée DEX : relais WS vers file de résolution, récupération getTransaction via HttpEndpointPool et persistance des résolutions dans les observations/signaux -0.7.1 - Ajout du modèle transactionnel enrichi : tables slots/transactions/instructions, requêtes d’accès et projection structurée des transactions résolues +0.7.1 - Ajout du modèle transactionnel enrichi : tables slots/transactions/instructions, requêtes d’accès et projection structurée des transactions résolues +0.7.2 - Ajout du premier décodeur DEX spécifique Raydium AmmV4 / initialize2, persistance des événements DEX décodés et branchement automatique du décodage après résolution/projection transactionnelle diff --git a/Cargo.toml b/Cargo.toml index 2e58e28..4f48cb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.7.1" +version = "0.7.2" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index 5c9da5a..e4bed5c 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -467,14 +467,15 @@ Réalisé : - ajout des tests de roundtrip et de projection. ### 6.034. Version `0.7.2` — Décodeurs DEX spécifiques par programme et version -Objectif : remplacer les heuristiques ponctuelles par de vrais décodeurs Rust dédiés. +Réalisé : -À faire : - -- introduire des règles spécifiques à chaque DEX / version de programme, -- détecter les instructions utiles à la création de pools, paires et évènements de liquidité, -- encapsuler les index de comptes et les motifs de logs propres à chaque protocole, -- prévoir des décodeurs séparés au minimum pour Raydium, Pump.fun / PumpSwap, Meteora, puis les autres cibles. +- ajout d’un premier décodeur transactionnel spécifique Raydium AmmV4 / initialize2, +- lecture combinée du `transaction_json` et des instructions projetées, +- extraction des comptes utiles à l’initialisation du pool, +- persistance des événements DEX décodés dans une table dédiée, +- émission d’observations et de signaux dérivés du décodage DEX, +- branchement automatique du décodage DEX depuis le pipeline de résolution transactionnelle, +- préparation de la future détection métier pool / pair / listing. ### 6.035. Version `0.7.3` — Détection des nouveaux pools et paires via logs + transaction Objectif : détecter rapidement les nouvelles paires/pools à partir des flux RPC et des transactions enrichies. @@ -656,9 +657,9 @@ Le projet doit maintenir au minimum : ## 12. Priorité immédiate La priorité immédiate est désormais la suivante : -1. brancher automatiquement `tx_resolution.rs` vers `KbTransactionModelService`, -2. stabiliser la chaîne complète `WS -> résolution HTTP -> projection transactionnelle`, -3. démarrer la version `0.7.2` avec les décodeurs DEX spécifiques par programme et version, -4. introduire les premières règles de décodage dédiées à Raydium / Pump.fun / PumpSwap, -5. conserver le découplage entre transport, résolution transactionnelle, projection et décodage métier, -6. préparer ensuite l’enrichissement des objets métier DEX à partir du modèle transactionnel. +1. démarrer la version `0.7.3` avec la détection métier à partir des événements DEX décodés, +2. transformer les événements DEX normalisés en objets métier de type pool, pair et listing, +3. identifier la première apparition locale d’un pool ou d’une paire, +4. relier les détections métier aux tokens et paires déjà connus ou à créer, +5. conserver le découplage entre résolution transactionnelle, projection, décodage DEX et détection métier, +6. préparer ensuite la version `0.7.4` pour l’enrichissement des événements de liquidité, de swaps et de suivi d’activité. diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs index c9c9119..db2c7c3 100644 --- a/kb_lib/src/db.rs +++ b/kb_lib/src/db.rs @@ -35,6 +35,7 @@ pub use dtos::KbTokenMintEventDto; pub use dtos::KbChainInstructionDto; pub use dtos::KbChainSlotDto; pub use dtos::KbChainTransactionDto; +pub use dtos::KbDexDecodedEventDto; pub use entities::KbAnalysisSignalEntity; pub use entities::KbDbMetadataEntity; pub use entities::KbDbRuntimeEventEntity; @@ -55,7 +56,7 @@ pub use entities::KbTokenMintEventEntity; pub use entities::KbChainInstructionEntity; pub use entities::KbChainSlotEntity; pub use entities::KbChainTransactionEntity; - +pub use entities::KbDexDecodedEventEntity; pub use queries::get_db_metadata; pub use queries::get_known_http_endpoint; pub use queries::get_known_ws_endpoint; @@ -107,6 +108,9 @@ pub use queries::list_recent_chain_slots; pub use queries::list_recent_chain_transactions; pub use queries::upsert_chain_slot; pub use queries::upsert_chain_transaction; +pub use queries::get_dex_decoded_event_by_key; +pub use queries::list_dex_decoded_events_by_transaction_id; +pub use queries::upsert_dex_decoded_event; pub use types::KbAnalysisSignalSeverity; pub use types::KbDatabaseBackend; pub use types::KbDbRuntimeEventLevel; diff --git a/kb_lib/src/db/dtos.rs b/kb_lib/src/db/dtos.rs index 45441ba..b580619 100644 --- a/kb_lib/src/db/dtos.rs +++ b/kb_lib/src/db/dtos.rs @@ -22,6 +22,7 @@ mod token_mint_event; mod chain_instruction; mod chain_slot; mod chain_transaction; +mod dex_decoded_event; pub use analysis_signal::KbAnalysisSignalDto; pub use db_metadata::KbDbMetadataDto; @@ -43,3 +44,4 @@ pub use token_mint_event::KbTokenMintEventDto; pub use chain_instruction::KbChainInstructionDto; pub use chain_slot::KbChainSlotDto; pub use chain_transaction::KbChainTransactionDto; +pub use dex_decoded_event::KbDexDecodedEventDto; diff --git a/kb_lib/src/db/dtos/dex_decoded_event.rs b/kb_lib/src/db/dtos/dex_decoded_event.rs new file mode 100644 index 0000000..48a0b69 --- /dev/null +++ b/kb_lib/src/db/dtos/dex_decoded_event.rs @@ -0,0 +1,100 @@ +// file: kb_lib/src/db/dtos/dex_decoded_event.rs + +//! Application-facing decoded DEX event DTO. + +/// Application-facing decoded DEX event DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbDexDecodedEventDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Parent transaction id. + pub transaction_id: i64, + /// Optional parent instruction id. + pub instruction_id: std::option::Option, + /// Decoded protocol name. + pub protocol_name: std::string::String, + /// Program id that produced the decoded event. + pub program_id: std::string::String, + /// Event kind. + pub event_kind: std::string::String, + /// Optional decoded pool account. + pub pool_account: std::option::Option, + /// Optional decoded lp mint. + pub lp_mint: std::option::Option, + /// Optional decoded token A mint. + pub token_a_mint: std::option::Option, + /// Optional decoded token B mint. + pub token_b_mint: std::option::Option, + /// Optional decoded market account. + pub market_account: std::option::Option, + /// Serialized decoded payload. + pub payload_json: std::string::String, + /// Creation timestamp. + pub created_at: chrono::DateTime, +} + +impl KbDexDecodedEventDto { + /// Creates a new decoded DEX event DTO. + #[allow(clippy::too_many_arguments)] + pub fn new( + transaction_id: i64, + instruction_id: std::option::Option, + protocol_name: std::string::String, + program_id: std::string::String, + event_kind: std::string::String, + pool_account: std::option::Option, + lp_mint: std::option::Option, + token_a_mint: std::option::Option, + token_b_mint: std::option::Option, + market_account: std::option::Option, + payload_json: std::string::String, + ) -> Self { + Self { + id: None, + transaction_id, + instruction_id, + protocol_name, + program_id, + event_kind, + pool_account, + lp_mint, + token_a_mint, + token_b_mint, + market_account, + payload_json, + created_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbDexDecodedEventDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbDexDecodedEventEntity) -> Result { + let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at); + let created_at = match created_at_result { + Ok(created_at) => created_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse dex decoded event created_at '{}': {}", + entity.created_at, error + ))); + } + }; + Ok(Self { + id: Some(entity.id), + transaction_id: entity.transaction_id, + instruction_id: entity.instruction_id, + protocol_name: entity.protocol_name, + program_id: entity.program_id, + event_kind: entity.event_kind, + pool_account: entity.pool_account, + lp_mint: entity.lp_mint, + token_a_mint: entity.token_a_mint, + token_b_mint: entity.token_b_mint, + market_account: entity.market_account, + payload_json: entity.payload_json, + created_at, + }) + } +} diff --git a/kb_lib/src/db/entities.rs b/kb_lib/src/db/entities.rs index 586132e..c73ea3b 100644 --- a/kb_lib/src/db/entities.rs +++ b/kb_lib/src/db/entities.rs @@ -24,6 +24,7 @@ mod token_mint_event; mod chain_instruction; mod chain_slot; mod chain_transaction; +mod dex_decoded_event; pub use analysis_signal::KbAnalysisSignalEntity; pub use db_metadata::KbDbMetadataEntity; @@ -45,3 +46,4 @@ pub use token_mint_event::KbTokenMintEventEntity; pub use chain_instruction::KbChainInstructionEntity; pub use chain_slot::KbChainSlotEntity; pub use chain_transaction::KbChainTransactionEntity; +pub use dex_decoded_event::KbDexDecodedEventEntity; diff --git a/kb_lib/src/db/entities/dex_decoded_event.rs b/kb_lib/src/db/entities/dex_decoded_event.rs new file mode 100644 index 0000000..30cea09 --- /dev/null +++ b/kb_lib/src/db/entities/dex_decoded_event.rs @@ -0,0 +1,34 @@ +// file: kb_lib/src/db/entities/dex_decoded_event.rs + +//! Database entity for one decoded DEX event. + +/// Persisted decoded DEX event row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbDexDecodedEventEntity { + /// Internal row id. + pub id: i64, + /// Parent transaction id. + pub transaction_id: i64, + /// Optional parent instruction id. + pub instruction_id: std::option::Option, + /// Decoded protocol name. + pub protocol_name: std::string::String, + /// Program id that produced the decoded event. + pub program_id: std::string::String, + /// Event kind. + pub event_kind: std::string::String, + /// Optional decoded pool account. + pub pool_account: std::option::Option, + /// Optional decoded lp mint. + pub lp_mint: std::option::Option, + /// Optional decoded token A mint. + pub token_a_mint: std::option::Option, + /// Optional decoded token B mint. + pub token_b_mint: std::option::Option, + /// Optional decoded market account. + pub market_account: std::option::Option, + /// Serialized decoded payload. + pub payload_json: std::string::String, + /// Creation timestamp. + pub created_at: std::string::String, +} diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs index d55f656..50abfcc 100644 --- a/kb_lib/src/db/queries.rs +++ b/kb_lib/src/db/queries.rs @@ -26,6 +26,7 @@ mod token_mint_event; mod chain_instruction; mod chain_slot; mod chain_transaction; +mod dex_decoded_event; pub use analysis_signal::insert_analysis_signal; pub use analysis_signal::list_recent_analysis_signals; @@ -78,3 +79,6 @@ pub use chain_slot::get_chain_slot; pub use chain_transaction::list_recent_chain_transactions; pub use chain_transaction::upsert_chain_transaction; pub use chain_transaction::get_chain_transaction_by_signature; +pub use dex_decoded_event::get_dex_decoded_event_by_key; +pub use dex_decoded_event::list_dex_decoded_events_by_transaction_id; +pub use dex_decoded_event::upsert_dex_decoded_event; diff --git a/kb_lib/src/db/queries/dex_decoded_event.rs b/kb_lib/src/db/queries/dex_decoded_event.rs new file mode 100644 index 0000000..c65e172 --- /dev/null +++ b/kb_lib/src/db/queries/dex_decoded_event.rs @@ -0,0 +1,322 @@ +// file: kb_lib/src/db/queries/dex_decoded_event.rs + +//! Queries for `kb_dex_decoded_events`. + +/// Inserts or updates one decoded DEX event row. +pub async fn upsert_dex_decoded_event( + database: &crate::KbDatabase, + dto: &crate::KbDexDecodedEventDto, +) -> Result { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_dex_decoded_events ( + transaction_id, + instruction_id, + protocol_name, + program_id, + event_kind, + pool_account, + lp_mint, + token_a_mint, + token_b_mint, + market_account, + payload_json, + created_at +) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(transaction_id, instruction_id, event_kind) DO UPDATE SET + protocol_name = excluded.protocol_name, + program_id = excluded.program_id, + pool_account = excluded.pool_account, + lp_mint = excluded.lp_mint, + token_a_mint = excluded.token_a_mint, + token_b_mint = excluded.token_b_mint, + market_account = excluded.market_account, + payload_json = excluded.payload_json + "#, + ) + .bind(dto.transaction_id) + .bind(dto.instruction_id) + .bind(dto.protocol_name.clone()) + .bind(dto.program_id.clone()) + .bind(dto.event_kind.clone()) + .bind(dto.pool_account.clone()) + .bind(dto.lp_mint.clone()) + .bind(dto.token_a_mint.clone()) + .bind(dto.token_b_mint.clone()) + .bind(dto.market_account.clone()) + .bind(dto.payload_json.clone()) + .bind(dto.created_at.to_rfc3339()) + .execute(pool) + .await; + if let Err(error) = query_result { + return Err(crate::KbError::Db(format!( + "cannot upsert kb_dex_decoded_events on sqlite: {}", + error + ))); + } + let id_result = sqlx::query_scalar::( + r#" +SELECT id +FROM kb_dex_decoded_events +WHERE transaction_id = ? + AND ( + (instruction_id IS NULL AND ? IS NULL) + OR instruction_id = ? + ) + AND event_kind = ? +LIMIT 1 + "#, + ) + .bind(dto.transaction_id) + .bind(dto.instruction_id) + .bind(dto.instruction_id) + .bind(dto.event_kind.clone()) + .fetch_one(pool) + .await; + match id_result { + Ok(id) => Ok(id), + Err(error) => Err(crate::KbError::Db(format!( + "cannot fetch kb_dex_decoded_events id on sqlite: {}", + error + ))), + } + } + } +} + +/// Reads one decoded DEX event by its natural key. +pub async fn get_dex_decoded_event_by_key( + database: &crate::KbDatabase, + transaction_id: i64, + instruction_id: std::option::Option, + event_kind: &str, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + transaction_id, + instruction_id, + protocol_name, + program_id, + event_kind, + pool_account, + lp_mint, + token_a_mint, + token_b_mint, + market_account, + payload_json, + created_at +FROM kb_dex_decoded_events +WHERE transaction_id = ? + AND ( + (instruction_id IS NULL AND ? IS NULL) + OR instruction_id = ? + ) + AND event_kind = ? +LIMIT 1 + "#, + ) + .bind(transaction_id) + .bind(instruction_id) + .bind(instruction_id) + .bind(event_kind.to_string()) + .fetch_optional(pool) + .await; + let entity_option = match query_result { + Ok(entity_option) => entity_option, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot fetch kb_dex_decoded_events on sqlite: {}", + error + ))); + } + }; + match entity_option { + Some(entity) => { + let dto_result = crate::KbDexDecodedEventDto::try_from(entity); + match dto_result { + Ok(dto) => Ok(Some(dto)), + Err(error) => Err(error), + } + } + None => Ok(None), + } + } + } +} + +/// Lists decoded DEX events for one transaction. +pub async fn list_dex_decoded_events_by_transaction_id( + database: &crate::KbDatabase, + transaction_id: i64, +) -> Result, crate::KbError> { + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + transaction_id, + instruction_id, + protocol_name, + program_id, + event_kind, + pool_account, + lp_mint, + token_a_mint, + token_b_mint, + market_account, + payload_json, + created_at +FROM kb_dex_decoded_events +WHERE transaction_id = ? +ORDER BY id ASC + "#, + ) + .bind(transaction_id) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list kb_dex_decoded_events on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbDexDecodedEventDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> crate::KbDatabase { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("dex_decoded_event.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + } + } + + #[tokio::test] + async fn dex_decoded_event_roundtrip_works() { + let database = make_database().await; + let transaction_dto = crate::KbChainTransactionDto::new( + "sig-dex-event-test-1".to_string(), + None, + None, + Some("helius_primary_http".to_string()), + Some("0".to_string()), + None, + None, + r#"{"transaction":{"message":{"instructions":[]}}}"#.to_string(), + ); + let transaction_id_result = + crate::upsert_chain_transaction(&database, &transaction_dto).await; + let transaction_id = match transaction_id_result { + Ok(transaction_id) => transaction_id, + Err(error) => panic!("transaction upsert must succeed: {}", error), + }; + let instruction_dto = crate::KbChainInstructionDto::new( + transaction_id, + None, + 0, + None, + Some(crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID.to_string()), + Some("raydium-amm-v4".to_string()), + Some(1), + r#"["Account0","Pool111","Lp111","TokenA111","TokenB111"]"#.to_string(), + None, + None, + None, + ); + let instruction_id_result = + crate::insert_chain_instruction(&database, &instruction_dto).await; + let instruction_id = match instruction_id_result { + Ok(instruction_id) => instruction_id, + Err(error) => panic!("instruction insert must succeed: {}", error), + }; + let dto = crate::KbDexDecodedEventDto::new( + transaction_id, + Some(instruction_id), + "raydium_amm_v4".to_string(), + crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID.to_string(), + "raydium_amm_v4.initialize2_pool".to_string(), + Some("Pool111".to_string()), + Some("Lp111".to_string()), + Some("TokenA111".to_string()), + Some("TokenB111".to_string()), + Some("Market111".to_string()), + r#"{"k":"v"}"#.to_string(), + ); + let event_id_result = crate::upsert_dex_decoded_event(&database, &dto).await; + let event_id = match event_id_result { + Ok(event_id) => event_id, + Err(error) => panic!("event upsert must succeed: {}", error), + }; + assert!(event_id > 0); + let fetched_result = crate::get_dex_decoded_event_by_key( + &database, + transaction_id, + Some(instruction_id), + "raydium_amm_v4.initialize2_pool", + ) + .await; + let fetched_option = match fetched_result { + Ok(fetched_option) => fetched_option, + Err(error) => panic!("event fetch must succeed: {}", error), + }; + let fetched = match fetched_option { + Some(fetched) => fetched, + None => panic!("event must exist"), + }; + assert_eq!(fetched.id, Some(event_id)); + assert_eq!(fetched.transaction_id, transaction_id); + assert_eq!(fetched.instruction_id, Some(instruction_id)); + assert_eq!(fetched.protocol_name, "raydium_amm_v4"); + assert_eq!(fetched.pool_account, Some("Pool111".to_string())); + let listed_result = + crate::list_dex_decoded_events_by_transaction_id(&database, transaction_id).await; + let listed = match listed_result { + Ok(listed) => listed, + Err(error) => panic!("event list must succeed: {}", error), + }; + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].event_kind, "raydium_amm_v4.initialize2_pool"); + } +} diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs index 05e1886..90b1037 100644 --- a/kb_lib/src/db/schema.rs +++ b/kb_lib/src/db/schema.rs @@ -218,7 +218,18 @@ pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), cr if let Err(error) = result { return Err(error); } - + let result = create_kb_dex_decoded_events_table(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_idx_dex_decoded_events_transaction_id(pool).await; + if let Err(error) = result { + return Err(error); + } + let result = create_kb_uq_dex_decoded_events_transaction_instruction_event(pool).await; + if let Err(error) = result { + return Err(error); + } Ok(()) } } @@ -240,6 +251,21 @@ async fn execute_sqlite_schema_statement( } } +/// Updates the persisted schema version metadata entry. +async fn update_schema_version_metadata( + database: &crate::KbDatabase, +) -> Result<(), crate::KbError> { + let schema_version = crate::KbDbMetadataDto::new( + "schema_version".to_string(), + env!("CARGO_PKG_VERSION").to_string(), + ); + let upsert_result = crate::upsert_db_metadata(database, &schema_version).await; + match upsert_result { + Ok(_) => Ok(()), + Err(error) => Err(error), + } +} + /// Creates `kb_db_metadata`. async fn create_kb_db_metadata_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { execute_sqlite_schema_statement( @@ -1073,9 +1099,7 @@ CREATE TABLE IF NOT EXISTS kb_chain_slots ( } /// Creates `kb_chain_transactions`. -async fn create_kb_chain_transactions_table( - pool: &sqlx::SqlitePool, -) -> Result<(), crate::KbError> { +async fn create_kb_chain_transactions_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { execute_sqlite_schema_statement( pool, "create_kb_chain_transactions_table", @@ -1115,9 +1139,7 @@ ON kb_chain_transactions (slot) } /// Creates `kb_chain_instructions`. -async fn create_kb_chain_instructions_table( - pool: &sqlx::SqlitePool, -) -> Result<(), crate::KbError> { +async fn create_kb_chain_instructions_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { execute_sqlite_schema_statement( pool, "create_kb_chain_instructions_table", @@ -1174,17 +1196,60 @@ ON kb_chain_instructions (program_id) .await } -/// Updates the persisted schema version metadata entry. -async fn update_schema_version_metadata( - database: &crate::KbDatabase, -) -> Result<(), crate::KbError> { - let schema_version = crate::KbDbMetadataDto::new( - "schema_version".to_string(), - env!("CARGO_PKG_VERSION").to_string(), - ); - let upsert_result = crate::upsert_db_metadata(database, &schema_version).await; - match upsert_result { - Ok(_) => Ok(()), - Err(error) => Err(error), - } +/// Creates `kb_dex_decoded_events`. +async fn create_kb_dex_decoded_events_table(pool: &sqlx::SqlitePool) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_dex_decoded_events_table", + r#" +CREATE TABLE IF NOT EXISTS kb_dex_decoded_events ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + transaction_id INTEGER NOT NULL, + instruction_id INTEGER NULL, + protocol_name TEXT NOT NULL, + program_id TEXT NOT NULL, + event_kind TEXT NOT NULL, + pool_account TEXT NULL, + lp_mint TEXT NULL, + token_a_mint TEXT NULL, + token_b_mint TEXT NULL, + market_account TEXT NULL, + payload_json TEXT NOT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY(transaction_id) REFERENCES kb_chain_transactions(id), + FOREIGN KEY(instruction_id) REFERENCES kb_chain_instructions(id) +) + "#, + ) + .await +} + +/// Creates index on `kb_dex_decoded_events(transaction_id)`. +async fn create_kb_idx_dex_decoded_events_transaction_id( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_idx_dex_decoded_events_transaction_id", + r#" +CREATE INDEX IF NOT EXISTS kb_idx_dex_decoded_events_transaction_id +ON kb_dex_decoded_events (transaction_id) + "#, + ) + .await +} + +/// Creates unique index on `(transaction_id, instruction_id, event_kind)`. +async fn create_kb_uq_dex_decoded_events_transaction_instruction_event( + pool: &sqlx::SqlitePool, +) -> Result<(), crate::KbError> { + execute_sqlite_schema_statement( + pool, + "create_kb_uq_dex_decoded_events_transaction_instruction_event", + r#" +CREATE UNIQUE INDEX IF NOT EXISTS kb_uq_dex_decoded_events_transaction_instruction_event +ON kb_dex_decoded_events (transaction_id, instruction_id, event_kind) + "#, + ) + .await } diff --git a/kb_lib/src/dex.rs b/kb_lib/src/dex.rs new file mode 100644 index 0000000..06ee6ec --- /dev/null +++ b/kb_lib/src/dex.rs @@ -0,0 +1,10 @@ +// file: kb_lib/src/dex.rs + +//! DEX-specific transaction decoders. + +mod raydium_amm_v4; + +pub use raydium_amm_v4::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use raydium_amm_v4::KbRaydiumAmmV4DecodedEvent; +pub use raydium_amm_v4::KbRaydiumAmmV4Decoder; +pub use raydium_amm_v4::KbRaydiumAmmV4Initialize2PoolDecoded; diff --git a/kb_lib/src/dex/raydium_amm_v4.rs b/kb_lib/src/dex/raydium_amm_v4.rs new file mode 100644 index 0000000..5429e97 --- /dev/null +++ b/kb_lib/src/dex/raydium_amm_v4.rs @@ -0,0 +1,333 @@ +// file: kb_lib/src/dex/raydium_amm_v4.rs + +//! Raydium AmmV4 transaction decoder. + +/// Raydium AmmV4 program id. +pub const KB_RAYDIUM_AMM_V4_PROGRAM_ID: &str = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; + +/// Decoded Raydium AmmV4 initialize2 pool event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbRaydiumAmmV4Initialize2PoolDecoded { + /// Parent transaction id. + pub transaction_id: i64, + /// Parent instruction id. + pub instruction_id: i64, + /// Transaction signature. + pub signature: std::string::String, + /// Program id. + pub program_id: std::string::String, + /// Optional pool account. + pub pool_account: std::option::Option, + /// Optional lp mint. + pub lp_mint: std::option::Option, + /// Optional token A mint. + pub token_a_mint: std::option::Option, + /// Optional token B mint. + pub token_b_mint: std::option::Option, + /// Optional market account. + pub market_account: std::option::Option, + /// Decoded payload. + pub payload_json: serde_json::Value, +} + +/// Decoded Raydium AmmV4 event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum KbRaydiumAmmV4DecodedEvent { + /// `initialize2` pool creation-like event. + Initialize2Pool(KbRaydiumAmmV4Initialize2PoolDecoded), +} + +/// Raydium AmmV4 decoder. +#[derive(Debug, Clone, Default)] +pub struct KbRaydiumAmmV4Decoder; + +impl KbRaydiumAmmV4Decoder { + /// Creates a new decoder. + pub fn new() -> Self { + Self + } + + /// Decodes one projected transaction into zero or more Raydium AmmV4 events. + pub fn decode_transaction( + &self, + transaction: &crate::KbChainTransactionDto, + instructions: &[crate::KbChainInstructionDto], + ) -> Result, crate::KbError> { + let transaction_id_option = transaction.id; + let transaction_id = match transaction_id_option { + Some(transaction_id) => transaction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "chain transaction '{}' has no internal id", + transaction.signature + ))); + } + }; + let transaction_json_result = + serde_json::from_str::(transaction.transaction_json.as_str()); + let transaction_json = match transaction_json_result { + Ok(transaction_json) => transaction_json, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse transaction_json for signature '{}': {}", + transaction.signature, error + ))); + } + }; + let log_messages = kb_extract_log_messages(&transaction_json); + let has_initialize2_log = kb_log_messages_contain_initialize2(&log_messages); + let mut decoded_events = std::vec::Vec::new(); + for instruction in instructions { + if instruction.parent_instruction_id.is_some() { + continue; + } + let program_id_option = &instruction.program_id; + let program_id = match program_id_option { + Some(program_id) => program_id, + None => continue, + }; + if program_id.as_str() != crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID { + continue; + } + if !has_initialize2_log { + continue; + } + let instruction_id_option = instruction.id; + let instruction_id = match instruction_id_option { + Some(instruction_id) => instruction_id, + None => continue, + }; + let accounts_result = kb_parse_accounts_json(instruction.accounts_json.as_str()); + let accounts = match accounts_result { + Ok(accounts) => accounts, + Err(error) => return Err(error), + }; + if accounts.len() < 10 { + continue; + } + let pool_account = kb_extract_account(&accounts, 4); + let lp_mint = kb_extract_account(&accounts, 7); + let token_a_mint = kb_extract_account(&accounts, 8); + let token_b_mint = kb_extract_account(&accounts, 9); + let market_account = kb_extract_account(&accounts, 16); + let payload_json = serde_json::json!({ + "decoder": "raydium_amm_v4", + "eventKind": "initialize2_pool", + "signature": transaction.signature, + "instructionId": instruction_id, + "instructionIndex": instruction.instruction_index, + "accounts": accounts, + "logMessages": log_messages, + "poolAccount": pool_account, + "lpMint": lp_mint, + "tokenAMint": token_a_mint, + "tokenBMint": token_b_mint, + "marketAccount": market_account + }); + decoded_events.push(crate::KbRaydiumAmmV4DecodedEvent::Initialize2Pool( + crate::KbRaydiumAmmV4Initialize2PoolDecoded { + transaction_id, + instruction_id, + signature: transaction.signature.clone(), + program_id: program_id.clone(), + pool_account, + lp_mint, + token_a_mint, + token_b_mint, + market_account, + payload_json, + }, + )); + } + Ok(decoded_events) + } +} + +fn kb_extract_log_messages( + transaction_json: &serde_json::Value, +) -> std::vec::Vec { + let mut messages = std::vec::Vec::new(); + let meta_option = transaction_json.get("meta"); + let meta = match meta_option { + Some(meta) => meta, + None => return messages, + }; + let logs_option = meta.get("logMessages"); + let logs = match logs_option { + Some(logs) => logs, + None => return messages, + }; + let logs_array_option = logs.as_array(); + let logs_array = match logs_array_option { + Some(logs_array) => logs_array, + None => return messages, + }; + for value in logs_array { + let text_option = value.as_str(); + if let Some(text) = text_option { + messages.push(text.to_string()); + } + } + messages +} + +fn kb_log_messages_contain_initialize2(log_messages: &[std::string::String]) -> bool { + for log_message in log_messages { + if log_message.contains("initialize2") { + return true; + } + } + false +} + +fn kb_parse_accounts_json( + accounts_json: &str, +) -> Result, crate::KbError> { + let values_result = serde_json::from_str::>(accounts_json); + let values = match values_result { + Ok(values) => values, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot parse instruction accounts_json '{}': {}", + accounts_json, error + ))); + } + }; + let mut accounts = std::vec::Vec::new(); + for value in values { + let text_option = value.as_str(); + if let Some(text) = text_option { + accounts.push(text.to_string()); + } + } + Ok(accounts) +} + +fn kb_extract_account( + accounts: &[std::string::String], + index: usize, +) -> std::option::Option { + if index >= accounts.len() { + return None; + } + Some(accounts[index].clone()) +} + +#[cfg(test)] +mod tests { + fn make_transaction() -> crate::KbChainTransactionDto { + let mut dto = crate::KbChainTransactionDto::new( + "sig-raydium-test-1".to_string(), + Some(888888), + Some(1778000000), + Some("helius_primary_http".to_string()), + Some("0".to_string()), + None, + None, + serde_json::json!({ + "slot": 888888, + "meta": { + "logMessages": [ + "Program log: initialize2" + ] + }, + "transaction": { + "message": { + "instructions": [] + } + } + }) + .to_string(), + ); + dto.id = Some(42); + dto + } + + fn make_instruction() -> crate::KbChainInstructionDto { + let mut dto = crate::KbChainInstructionDto::new( + 42, + None, + 0, + None, + Some(crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID.to_string()), + Some("raydium-amm-v4".to_string()), + Some(1), + serde_json::json!([ + "Account0", + "Account1", + "Account2", + "Account3", + "Pool111", + "Account5", + "Account6", + "LpMint111", + "TokenA111", + "TokenB111", + "Account10", + "Account11", + "Account12", + "Account13", + "Account14", + "Account15", + "Market111" + ]) + .to_string(), + None, + None, + None, + ); + dto.id = Some(7); + dto + } + + #[test] + fn raydium_amm_v4_initialize2_logs_are_detected() { + let decoder = crate::KbRaydiumAmmV4Decoder::new(); + let transaction = make_transaction(); + let instructions = vec![make_instruction()]; + let decoded_result = decoder.decode_transaction(&transaction, &instructions); + let decoded = match decoded_result { + Ok(decoded) => decoded, + Err(error) => panic!("decode must succeed: {}", error), + }; + assert_eq!(decoded.len(), 1); + match &decoded[0] { + crate::KbRaydiumAmmV4DecodedEvent::Initialize2Pool(event) => { + assert_eq!(event.transaction_id, 42); + assert_eq!(event.instruction_id, 7); + assert_eq!(event.pool_account, Some("Pool111".to_string())); + assert_eq!(event.lp_mint, Some("LpMint111".to_string())); + assert_eq!(event.token_a_mint, Some("TokenA111".to_string())); + assert_eq!(event.token_b_mint, Some("TokenB111".to_string())); + assert_eq!(event.market_account, Some("Market111".to_string())); + } + } + } + + #[test] + fn raydium_amm_v4_initialize2_returns_none_without_expected_log() { + let decoder = crate::KbRaydiumAmmV4Decoder::new(); + let mut transaction = make_transaction(); + transaction.transaction_json = serde_json::json!({ + "slot": 888888, + "meta": { + "logMessages": [ + "Program log: swap" + ] + }, + "transaction": { + "message": { + "instructions": [] + } + } + }) + .to_string(); + let instructions = vec![make_instruction()]; + let decoded_result = decoder.decode_transaction(&transaction, &instructions); + let decoded = match decoded_result { + Ok(decoded) => decoded, + Err(error) => panic!("decode must succeed: {}", error), + }; + assert_eq!(decoded.len(), 0); + } +} diff --git a/kb_lib/src/dex_decode.rs b/kb_lib/src/dex_decode.rs new file mode 100644 index 0000000..f0fbfb0 --- /dev/null +++ b/kb_lib/src/dex_decode.rs @@ -0,0 +1,361 @@ +// file: kb_lib/src/dex_decode.rs + +//! Persistence-oriented DEX decoding service. + +/// DEX decode service. +#[derive(Debug, Clone)] +pub struct KbDexDecodeService { + database: std::sync::Arc, + persistence: crate::KbDetectionPersistenceService, + raydium_amm_v4_decoder: crate::KbRaydiumAmmV4Decoder, +} + +impl KbDexDecodeService { + /// Creates a new DEX decode service. + pub fn new(database: std::sync::Arc) -> Self { + let persistence = crate::KbDetectionPersistenceService::new(database.clone()); + Self { + database, + persistence, + raydium_amm_v4_decoder: crate::KbRaydiumAmmV4Decoder::new(), + } + } + + /// Decodes one projected transaction and persists the decoded events. + pub async fn decode_transaction_by_signature( + &self, + signature: &str, + ) -> Result, crate::KbError> { + let transaction_result = + crate::get_chain_transaction_by_signature(self.database.as_ref(), signature).await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => return Err(error), + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => { + return Err(crate::KbError::InvalidState(format!( + "cannot decode unknown chain transaction '{}'", + signature + ))); + } + }; + let transaction_id_option = transaction.id; + let transaction_id = match transaction_id_option { + Some(transaction_id) => transaction_id, + None => { + return Err(crate::KbError::InvalidState(format!( + "chain transaction '{}' has no internal id", + signature + ))); + } + }; + let instructions_result = crate::list_chain_instructions_by_transaction_id( + self.database.as_ref(), + transaction_id, + ) + .await; + let instructions = match instructions_result { + Ok(instructions) => instructions, + Err(error) => return Err(error), + }; + let decoded_result = self + .raydium_amm_v4_decoder + .decode_transaction(&transaction, &instructions); + let decoded = match decoded_result { + Ok(decoded) => decoded, + Err(error) => return Err(error), + }; + let mut persisted = std::vec::Vec::new(); + for decoded_event in &decoded { + let persist_result = self + .persist_raydium_event(&transaction, decoded_event) + .await; + let persisted_event = match persist_result { + Ok(persisted_event) => persisted_event, + Err(error) => return Err(error), + }; + persisted.push(persisted_event); + } + Ok(persisted) + } + + async fn persist_raydium_event( + &self, + transaction: &crate::KbChainTransactionDto, + decoded_event: &crate::KbRaydiumAmmV4DecodedEvent, + ) -> Result { + match decoded_event { + crate::KbRaydiumAmmV4DecodedEvent::Initialize2Pool(event) => { + let payload_json_result = serde_json::to_string(&event.payload_json); + let payload_json = match payload_json_result { + Ok(payload_json) => payload_json, + Err(error) => { + return Err(crate::KbError::Json(format!( + "cannot serialize decoded raydium payload: {}", + error + ))); + } + }; + let existing_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + event.transaction_id, + Some(event.instruction_id), + "raydium_amm_v4.initialize2_pool", + ) + .await; + let existing_option = match existing_result { + Ok(existing_option) => existing_option, + Err(error) => return Err(error), + }; + let already_present = existing_option.is_some(); + let dto = crate::KbDexDecodedEventDto::new( + event.transaction_id, + Some(event.instruction_id), + "raydium_amm_v4".to_string(), + event.program_id.clone(), + "raydium_amm_v4.initialize2_pool".to_string(), + event.pool_account.clone(), + event.lp_mint.clone(), + event.token_a_mint.clone(), + event.token_b_mint.clone(), + event.market_account.clone(), + payload_json, + ); + let upsert_result = + crate::upsert_dex_decoded_event(self.database.as_ref(), &dto).await; + if let Err(error) = upsert_result { + return Err(error); + } + let fetched_result = crate::get_dex_decoded_event_by_key( + self.database.as_ref(), + event.transaction_id, + Some(event.instruction_id), + "raydium_amm_v4.initialize2_pool", + ) + .await; + let fetched_option = match fetched_result { + Ok(fetched_option) => fetched_option, + Err(error) => return Err(error), + }; + let fetched = match fetched_option { + Some(fetched) => fetched, + None => { + return Err(crate::KbError::InvalidState( + "decoded event disappeared after upsert".to_string(), + )); + } + }; + if !already_present { + let payload_value = event.payload_json.clone(); + let observation_result = self + .persistence + .record_observation(&crate::KbDetectionObservationInput::new( + "dex.raydium_amm_v4.initialize2_pool".to_string(), + crate::KbObservationSourceKind::HttpRpc, + transaction.source_endpoint_name.clone(), + transaction.signature.clone(), + transaction.slot, + payload_value.clone(), + )) + .await; + let observation_id = match observation_result { + Ok(observation_id) => observation_id, + Err(error) => return Err(error), + }; + let signal_result = self + .persistence + .record_signal(&crate::KbDetectionSignalInput::new( + "signal.dex.raydium_amm_v4.initialize2_pool".to_string(), + crate::KbAnalysisSignalSeverity::Low, + transaction.signature.clone(), + Some(observation_id), + None, + payload_value, + )) + .await; + if let Err(error) = signal_result { + return Err(error); + } + } + Ok(fetched) + } + } + } +} + +#[cfg(test)] +mod tests { + async fn make_database() -> std::sync::Arc { + let tempdir_result = tempfile::tempdir(); + let tempdir = match tempdir_result { + Ok(tempdir) => tempdir, + Err(error) => panic!("tempdir must succeed: {}", error), + }; + let database_path = tempdir.path().join("dex_decode.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database_result = crate::KbDatabase::connect_and_initialize(&config).await; + let database = match database_result { + Ok(database) => database, + Err(error) => panic!("database init must succeed: {}", error), + }; + std::sync::Arc::new(database) + } + + async fn seed_projected_transaction( + database: std::sync::Arc, + signature: &str, + ) { + let service = crate::KbTransactionModelService::new(database); + let resolved_transaction = serde_json::json!({ + "slot": 999001, + "blockTime": 1779000001, + "version": 0, + "transaction": { + "message": { + "instructions": [ + { + "programId": crate::KB_RAYDIUM_AMM_V4_PROGRAM_ID, + "program": "raydium-amm-v4", + "stackHeight": 1, + "accounts": [ + "Account0", + "Account1", + "Account2", + "Account3", + "PoolXYZ", + "Account5", + "Account6", + "LpMintXYZ", + "TokenAXYZ", + "TokenBXYZ", + "Account10", + "Account11", + "Account12", + "Account13", + "Account14", + "Account15", + "MarketXYZ" + ], + "data": "opaque" + } + ] + } + }, + "meta": { + "err": null, + "logMessages": [ + "Program log: initialize2" + ] + } + }); + let persist_result = service + .persist_resolved_transaction( + signature, + Some("helius_primary_http".to_string()), + &resolved_transaction, + ) + .await; + if let Err(error) = persist_result { + panic!("projection must succeed: {}", error); + } + } + + #[tokio::test] + async fn decode_transaction_by_signature_persists_decoded_event() { + let database = make_database().await; + seed_projected_transaction(database.clone(), "sig-dex-decode-1").await; + let service = crate::KbDexDecodeService::new(database.clone()); + let decoded_result = service + .decode_transaction_by_signature("sig-dex-decode-1") + .await; + let decoded = match decoded_result { + Ok(decoded) => decoded, + Err(error) => panic!("decode must succeed: {}", error), + }; + assert_eq!(decoded.len(), 1); + assert_eq!(decoded[0].protocol_name, "raydium_amm_v4"); + assert_eq!(decoded[0].event_kind, "raydium_amm_v4.initialize2_pool"); + assert_eq!(decoded[0].pool_account, Some("PoolXYZ".to_string())); + let transaction_result = + crate::get_chain_transaction_by_signature(database.as_ref(), "sig-dex-decode-1").await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => panic!("transaction fetch must succeed: {}", error), + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => panic!("transaction must exist"), + }; + let transaction_id_option = transaction.id; + let transaction_id = match transaction_id_option { + Some(transaction_id) => transaction_id, + None => panic!("transaction id must exist"), + }; + let listed_result = + crate::list_dex_decoded_events_by_transaction_id(database.as_ref(), transaction_id) + .await; + let listed = match listed_result { + Ok(listed) => listed, + Err(error) => panic!("dex event list must succeed: {}", error), + }; + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].lp_mint, Some("LpMintXYZ".to_string())); + } + + #[tokio::test] + async fn decode_transaction_by_signature_is_idempotent_on_same_transaction() { + let database = make_database().await; + seed_projected_transaction(database.clone(), "sig-dex-decode-2").await; + let service = crate::KbDexDecodeService::new(database.clone()); + let first_result = service + .decode_transaction_by_signature("sig-dex-decode-2") + .await; + if let Err(error) = first_result { + panic!("first decode must succeed: {}", error); + } + let second_result = service + .decode_transaction_by_signature("sig-dex-decode-2") + .await; + let second = match second_result { + Ok(second) => second, + Err(error) => panic!("second decode must succeed: {}", error), + }; + assert_eq!(second.len(), 1); + let transaction_result = + crate::get_chain_transaction_by_signature(database.as_ref(), "sig-dex-decode-2").await; + let transaction_option = match transaction_result { + Ok(transaction_option) => transaction_option, + Err(error) => panic!("transaction fetch must succeed: {}", error), + }; + let transaction = match transaction_option { + Some(transaction) => transaction, + None => panic!("transaction must exist"), + }; + let transaction_id_option = transaction.id; + let transaction_id = match transaction_id_option { + Some(transaction_id) => transaction_id, + None => panic!("transaction id must exist"), + }; + let listed_result = + crate::list_dex_decoded_events_by_transaction_id(database.as_ref(), transaction_id) + .await; + let listed = match listed_result { + Ok(listed) => listed, + Err(error) => panic!("dex event list must succeed: {}", error), + }; + assert_eq!(listed.len(), 1); + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index 6e82ddc..eb243cf 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -23,6 +23,8 @@ mod db; mod detect; mod tx_resolution; mod tx_model; +mod dex; +mod dex_decode; pub use constants::*; pub use error::KbError; @@ -125,6 +127,8 @@ pub use db::KbChainTransactionDto; pub use db::KbChainInstructionEntity; pub use db::KbChainSlotEntity; pub use db::KbChainTransactionEntity; +pub use db::KbDexDecodedEventDto; +pub use db::KbDexDecodedEventEntity; pub use db::delete_chain_instructions_by_transaction_id; pub use db::get_chain_slot; pub use db::get_chain_transaction_by_signature; @@ -173,6 +177,9 @@ pub use db::list_pairs; pub use db::list_pool_listings; pub use db::list_pool_tokens_by_pool_id; pub use db::list_pools; +pub use db::get_dex_decoded_event_by_key; +pub use db::list_dex_decoded_events_by_transaction_id; +pub use db::upsert_dex_decoded_event; pub use detect::KbDetectionObservationInput; pub use detect::KbDetectionPersistenceService; pub use detect::KbDetectionSignalInput; @@ -192,3 +199,8 @@ pub use tx_resolution::KbWsTransactionResolutionEnvelope; pub use tx_resolution::KbWsTransactionResolutionRelay; pub use tx_resolution::KbWsTransactionResolutionRelayStats; pub use tx_model::KbTransactionModelService; +pub use dex::KbRaydiumAmmV4DecodedEvent; +pub use dex::KbRaydiumAmmV4Decoder; +pub use dex::KbRaydiumAmmV4Initialize2PoolDecoded; +pub use dex::KB_RAYDIUM_AMM_V4_PROGRAM_ID; +pub use dex_decode::KbDexDecodeService; diff --git a/kb_lib/src/tx_resolution.rs b/kb_lib/src/tx_resolution.rs index b918bc0..6cc88f9 100644 --- a/kb_lib/src/tx_resolution.rs +++ b/kb_lib/src/tx_resolution.rs @@ -99,6 +99,7 @@ pub struct KbTransactionResolutionService { http_pool: std::sync::Arc, persistence: crate::KbDetectionPersistenceService, transaction_model: crate::KbTransactionModelService, + dex_decode_service: crate::KbDexDecodeService, http_role: std::string::String, resolved_signatures: std::sync::Arc>>, @@ -112,11 +113,14 @@ impl KbTransactionResolutionService { http_role: std::string::String, ) -> Self { let persistence = crate::KbDetectionPersistenceService::new(database.clone()); - let transaction_model = crate::KbTransactionModelService::new(database); + let transaction_model = crate::KbTransactionModelService::new(database.clone()); + let dex_decode_service = crate::KbDexDecodeService::new(database); + Self { http_pool, persistence, transaction_model, + dex_decode_service, http_role, resolved_signatures: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashSet::new(), @@ -290,6 +294,15 @@ impl KbTransactionResolutionService { Ok(projected_transaction_id) => projected_transaction_id, Err(error) => return Err(error), }; + let decoded_events_result = self + .dex_decode_service + .decode_transaction_by_signature(request.signature.as_str()) + .await; + let decoded_events = match decoded_events_result { + Ok(decoded_events) => decoded_events, + Err(error) => return Err(error), + }; + let decoded_event_count = decoded_events.len(); let payload = serde_json::json!({ "status": "resolved", "signature": request.signature.clone(), @@ -297,6 +310,7 @@ impl KbTransactionResolutionService { "sourceEndpointName": request.source_endpoint_name.clone(), "slotHint": request.slot_hint, "projectedTransactionId": projected_transaction_id, + "decodedEventCount": decoded_event_count, "triggerPayload": request.trigger_payload.clone(), "transaction": transaction_value });