diff --git a/CHANGELOG.md b/CHANGELOG.md index 4be20e1..cc61aa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,3 +19,4 @@ 0.5.0 - Début du socle SQLite : configuration database, ouverture/validation de la base et premières briques de persistance 0.5.1 - Ajout des premières tables métier SQLite pour les endpoints connus HTTP/WS et les événements runtime, avec séparation entities/dtos/queries/types 0.5.2 - Ajout de la table des tokens observés, de leur statut local et des premières requêtes de persistance associées +0.5.3 - Préparation du stockage local des événements techniques et des signaux utiles à l’analyse, avec distinction runtime / on-chain / métier diff --git a/Cargo.toml b/Cargo.toml index 0eac541..3bccac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.5.2" +version = "0.5.3" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobobot" diff --git a/ROADMAP.md b/ROADMAP.md index aebee78..1ab3bad 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -348,22 +348,46 @@ Réalisé : - conservation d’unicité locale par mint sans duplication par endpoint. ### 6.21. Version `0.5.3` — Événements et signaux locaux +Réalisé : + +- conservation des événements runtime techniques via `kb_db_runtime_events`, +- ajout des observations on-chain brutes via `kb_onchain_observations`, +- ajout des signaux d’analyse via `kb_analysis_signals`, +- distinction explicite entre événements runtime, observations on-chain et événements métier, +- préparation de la traçabilité de provenance par type de source et endpoint, sans remettre en cause l’unicité locale d’un token par mint. + +### 6.22. Version `0.5.4` — Modèle métier normalisé initial +Objectif : poser la couche relationnelle métier entre les observations brutes et la future détection technique. + À faire : -- stocker les événements techniques importants remontés par les connecteurs, -- préparer la conservation locale des signaux utiles à l’analyse, -- distinguer événements runtime, observations on-chain et événements métier, -- préparer la traçabilité de provenance si plusieurs sources détectent un même objet, sans remettre en cause l’unicité locale d’un token par mint. +- ajouter les tables de référence métier pour les DEX, tokens, pools et paires, +- distinguer clairement objets de référence et événements d’activité, +- préparer les relations entre tokens, pools, paires et listings, +- éviter que la détection technique `0.6.x` écrive directement dans des tables trop brutes ou ambiguës. + +### 6.23. Version `0.5.5` — Activité métier normalisée +Objectif : poser les premiers événements métier exploitables. + +À faire : + +- ajouter les tables de swaps, +- ajouter les événements de liquidité, +- ajouter les événements de mint et burn utiles au suivi des tokens, +- préparer l’historique métier nécessaire avant l’arrivée des connecteurs DEX complets. + +### 6.24. Version `0.5.6` — Consolidation de la couche stockage +Objectif : stabiliser le schéma avant la détection technique réelle. -### 6.22. Version `0.5.x` — Consolidation de la couche stockage À faire : - conserver l’abstraction du backend dès le départ, - limiter la dépendance directe au SQL concret aux modules `queries`, - garder les conversions explicites entre entités DB et DTOs applicatifs, +- durcir les relations, contraintes et index utiles, - préparer une future compatibilité PostgreSQL sans casser l’organisation générale. -### 6.23. Version `0.6.x` — Détection technique on-chain / RPC +### 6.25. Version `0.6.x` — Détection technique on-chain / RPC Objectif : commencer la détection utile pour l’application. À faire : @@ -373,7 +397,7 @@ Objectif : commencer la détection utile pour l’application. - débuts de normalisation d’événements, - premiers connecteurs DEX. -### 6.24. Version `0.7.x` — DEX connectors v1 +### 6.26. Version `0.7.x` — DEX connectors v1 Objectif : structurer les connecteurs par protocole. Cibles initiales possibles : @@ -392,7 +416,7 @@ Cibles initiales possibles : - création de types métiers propres, - enrichissement des métadonnées token/pool/pair. -### 6.25. Version `0.8.x` — Analyse et filtrage +### 6.27. Version `0.8.x` — Analyse et filtrage Objectif : transformer les événements bruts en signaux exploitables. À faire : @@ -403,7 +427,7 @@ Objectif : transformer les événements bruts en signaux exploitables. - statistiques de comportement, - premiers patterns. -### 6.26. Version `1.x.y` — Wallets et swap préparatoire +### 6.28. Version `1.x.y` — Wallets et swap préparatoire Objectif : préparer la couche d’action. À faire : @@ -414,7 +438,7 @@ Objectif : préparer la couche d’action. - préparation d’ordres et de swaps, - simulation et garde-fous. -### 6.27. Version `2.x.y` — Trading semi-automatisé +### 6.29. Version `2.x.y` — Trading semi-automatisé Objectif : brancher l’analyse à l’action tout en gardant des garde-fous explicites. À faire : @@ -425,7 +449,7 @@ Objectif : brancher l’analyse à l’action tout en gardant des garde-fous exp - confirmations explicites ou semi-automatiques, - journaux d’exécution. -### 6.28. Version `3.x.y` — Yellowstone gRPC +### 6.30. Version `3.x.y` — Yellowstone gRPC Objectif : ajouter le connecteur gRPC dédié. À faire : @@ -510,9 +534,9 @@ Le projet doit maintenir au minimum : ## 12. Priorité immédiate La priorité immédiate est désormais la suivante : -1. démarrer la version `0.5.3` avec les événements et signaux locaux, -2. stocker les événements techniques importants remontés par les connecteurs, -3. distinguer clairement événements runtime, observations on-chain et événements métier, -4. préparer la conservation locale des signaux utiles à l’analyse, +1. démarrer la version `0.5.4` avec le modèle métier normalisé initial, +2. poser les tables de référence pour les DEX, tokens, pools, paires et listings, +3. séparer clairement les objets métier des observations techniques brutes, +4. préparer les relations nécessaires avant la détection technique `0.6.x`, 5. conserver l’abstraction du backend dès cette phase SQLite, -6. préparer ensuite l’exploitation de ces signaux pour la future détection technique on-chain / RPC. +6. reporter la couche analytique agrégée après la fin de `0.6.x`. diff --git a/kb_lib/src/db.rs b/kb_lib/src/db.rs index fbaf3bb..31f0275 100644 --- a/kb_lib/src/db.rs +++ b/kb_lib/src/db.rs @@ -15,30 +15,40 @@ mod types; pub use crate::db::connection::KbDatabase; pub use crate::db::connection::KbDatabaseConnection; +pub use crate::db::dtos::KbAnalysisSignalDto; pub use crate::db::dtos::KbDbMetadataDto; pub use crate::db::dtos::KbDbRuntimeEventDto; pub use crate::db::dtos::KbKnownHttpEndpointDto; pub use crate::db::dtos::KbKnownWsEndpointDto; pub use crate::db::dtos::KbObservedTokenDto; +pub use crate::db::dtos::KbOnchainObservationDto; +pub use crate::db::entities::KbAnalysisSignalEntity; pub use crate::db::entities::KbDbMetadataEntity; pub use crate::db::entities::KbDbRuntimeEventEntity; pub use crate::db::entities::KbKnownHttpEndpointEntity; pub use crate::db::entities::KbKnownWsEndpointEntity; pub use crate::db::entities::KbObservedTokenEntity; +pub use crate::db::entities::KbOnchainObservationEntity; pub use crate::db::queries::get_db_metadata; pub use crate::db::queries::get_known_http_endpoint; pub use crate::db::queries::get_known_ws_endpoint; pub use crate::db::queries::get_observed_token_by_mint; +pub use crate::db::queries::insert_analysis_signal; pub use crate::db::queries::insert_db_runtime_event; +pub use crate::db::queries::insert_onchain_observation; pub use crate::db::queries::list_db_metadata; pub use crate::db::queries::list_known_http_endpoints; pub use crate::db::queries::list_known_ws_endpoints; pub use crate::db::queries::list_observed_tokens; +pub use crate::db::queries::list_recent_analysis_signals; pub use crate::db::queries::list_recent_db_runtime_events; +pub use crate::db::queries::list_recent_onchain_observations; pub use crate::db::queries::upsert_db_metadata; pub use crate::db::queries::upsert_known_http_endpoint; pub use crate::db::queries::upsert_known_ws_endpoint; pub use crate::db::queries::upsert_observed_token; +pub use crate::db::types::KbAnalysisSignalSeverity; pub use crate::db::types::KbDatabaseBackend; pub use crate::db::types::KbDbRuntimeEventLevel; +pub use crate::db::types::KbObservationSourceKind; pub use crate::db::types::KbObservedTokenStatus; diff --git a/kb_lib/src/db/dtos.rs b/kb_lib/src/db/dtos.rs index 6a2e2e9..b36f2bd 100644 --- a/kb_lib/src/db/dtos.rs +++ b/kb_lib/src/db/dtos.rs @@ -2,14 +2,18 @@ //! Database data transfer objects. +mod analysis_signal; mod db_metadata; mod db_runtime_event; mod known_http_endpoint; mod known_ws_endpoint; mod observed_token; +mod onchain_observation; +pub use crate::db::dtos::analysis_signal::KbAnalysisSignalDto; pub use crate::db::dtos::db_metadata::KbDbMetadataDto; pub use crate::db::dtos::db_runtime_event::KbDbRuntimeEventDto; pub use crate::db::dtos::known_http_endpoint::KbKnownHttpEndpointDto; pub use crate::db::dtos::known_ws_endpoint::KbKnownWsEndpointDto; pub use crate::db::dtos::observed_token::KbObservedTokenDto; +pub use crate::db::dtos::onchain_observation::KbOnchainObservationDto; diff --git a/kb_lib/src/db/dtos/analysis_signal.rs b/kb_lib/src/db/dtos/analysis_signal.rs new file mode 100644 index 0000000..bb0f8e1 --- /dev/null +++ b/kb_lib/src/db/dtos/analysis_signal.rs @@ -0,0 +1,89 @@ +// file: kb_lib/src/db/dtos/analysis_signal.rs + +//! Analysis signal DTO. + +/// Application-facing analysis signal DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbAnalysisSignalDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Signal kind. + pub signal_kind: std::string::String, + /// Signal severity. + pub severity: crate::KbAnalysisSignalSeverity, + /// Logical object key, for example a mint, signature or pool address. + pub object_key: std::string::String, + /// Optional related on-chain observation id. + pub related_observation_id: std::option::Option, + /// Optional numeric score. + pub score: std::option::Option, + /// Signal payload. + pub payload: serde_json::Value, + /// Creation timestamp. + pub created_at: chrono::DateTime, +} + +impl KbAnalysisSignalDto { + /// Creates a new analysis signal DTO with the current timestamp. + pub fn new( + signal_kind: std::string::String, + severity: crate::KbAnalysisSignalSeverity, + object_key: std::string::String, + related_observation_id: std::option::Option, + score: std::option::Option, + payload: serde_json::Value, + ) -> Self { + Self { + id: None, + signal_kind, + severity, + object_key, + related_observation_id, + score, + payload, + created_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbAnalysisSignalDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbAnalysisSignalEntity) -> Result { + let severity_result = crate::KbAnalysisSignalSeverity::from_i16(entity.severity); + let severity = match severity_result { + Ok(severity) => severity, + Err(error) => return Err(error), + }; + let created_at_result = chrono::DateTime::parse_from_rfc3339(&entity.created_at); + let created_at = match created_at_result { + Ok(created_at) => created_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse analysis signal created_at '{}': {}", + entity.created_at, error + ))); + } + }; + let payload_result = serde_json::from_str::(&entity.payload_json); + let payload = match payload_result { + Ok(payload) => payload, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse analysis signal payload_json '{}': {}", + entity.payload_json, error + ))); + } + }; + Ok(Self { + id: Some(entity.id), + signal_kind: entity.signal_kind, + severity, + object_key: entity.object_key, + related_observation_id: entity.related_observation_id, + score: entity.score, + payload, + created_at, + }) + } +} diff --git a/kb_lib/src/db/dtos/onchain_observation.rs b/kb_lib/src/db/dtos/onchain_observation.rs new file mode 100644 index 0000000..b9a6eeb --- /dev/null +++ b/kb_lib/src/db/dtos/onchain_observation.rs @@ -0,0 +1,104 @@ +// file: kb_lib/src/db/dtos/onchain_observation.rs + +//! On-chain observation DTO. + +/// Application-facing on-chain observation DTO. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct KbOnchainObservationDto { + /// Optional numeric primary key. + pub id: std::option::Option, + /// Observation kind. + pub observation_kind: std::string::String, + /// Observation source family. + pub source_kind: crate::KbObservationSourceKind, + /// Optional source endpoint logical name. + pub endpoint_name: std::option::Option, + /// Logical object key, for example a mint, signature or pool address. + pub object_key: std::string::String, + /// Optional slot number. + pub slot: std::option::Option, + /// Raw JSON payload. + pub payload: serde_json::Value, + /// Observation timestamp. + pub observed_at: chrono::DateTime, +} + +impl KbOnchainObservationDto { + /// Creates a new on-chain observation DTO with the current timestamp. + pub fn new( + observation_kind: std::string::String, + source_kind: crate::KbObservationSourceKind, + endpoint_name: std::option::Option, + object_key: std::string::String, + slot: std::option::Option, + payload: serde_json::Value, + ) -> Self { + Self { + id: None, + observation_kind, + source_kind, + endpoint_name, + object_key, + slot, + payload, + observed_at: chrono::Utc::now(), + } + } +} + +impl TryFrom for KbOnchainObservationDto { + type Error = crate::KbError; + + fn try_from(entity: crate::KbOnchainObservationEntity) -> Result { + let source_kind_result = crate::KbObservationSourceKind::from_i16(entity.source_kind); + let source_kind = match source_kind_result { + Ok(source_kind) => source_kind, + Err(error) => return Err(error), + }; + let observed_at_result = chrono::DateTime::parse_from_rfc3339(&entity.observed_at); + let observed_at = match observed_at_result { + Ok(observed_at) => observed_at.with_timezone(&chrono::Utc), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse on-chain observation observed_at '{}': {}", + entity.observed_at, error + ))); + } + }; + let payload_result = serde_json::from_str::(&entity.payload_json); + let payload = match payload_result { + Ok(payload) => payload, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot parse on-chain observation payload_json '{}': {}", + entity.payload_json, error + ))); + } + }; + let slot = match entity.slot { + Some(slot) => { + let slot_result = u64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert on-chain observation slot '{}' to u64: {}", + slot, error + ))); + } + } + } + None => None, + }; + Ok(Self { + id: Some(entity.id), + observation_kind: entity.observation_kind, + source_kind, + endpoint_name: entity.endpoint_name, + object_key: entity.object_key, + slot, + payload, + observed_at, + }) + } +} diff --git a/kb_lib/src/db/entities.rs b/kb_lib/src/db/entities.rs index fdaa036..924b19c 100644 --- a/kb_lib/src/db/entities.rs +++ b/kb_lib/src/db/entities.rs @@ -4,14 +4,18 @@ //! //! These types are close to persisted rows and SQL query results. +mod analysis_signal; mod db_metadata; mod db_runtime_event; mod known_http_endpoint; mod known_ws_endpoint; mod observed_token; +mod onchain_observation; +pub use crate::db::entities::analysis_signal::KbAnalysisSignalEntity; pub use crate::db::entities::db_metadata::KbDbMetadataEntity; pub use crate::db::entities::db_runtime_event::KbDbRuntimeEventEntity; pub use crate::db::entities::known_http_endpoint::KbKnownHttpEndpointEntity; pub use crate::db::entities::known_ws_endpoint::KbKnownWsEndpointEntity; pub use crate::db::entities::observed_token::KbObservedTokenEntity; +pub use crate::db::entities::onchain_observation::KbOnchainObservationEntity; diff --git a/kb_lib/src/db/entities/analysis_signal.rs b/kb_lib/src/db/entities/analysis_signal.rs new file mode 100644 index 0000000..dabfdf6 --- /dev/null +++ b/kb_lib/src/db/entities/analysis_signal.rs @@ -0,0 +1,24 @@ +// file: kb_lib/src/db/entities/analysis_signal.rs + +//! Analysis signal entity. + +/// Persisted analysis signal row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbAnalysisSignalEntity { + /// Numeric primary key. + pub id: i64, + /// Signal kind. + pub signal_kind: std::string::String, + /// Signal severity stored as stable integer. + pub severity: i16, + /// Logical object key, for example a mint, signature or pool address. + pub object_key: std::string::String, + /// Optional related on-chain observation id. + pub related_observation_id: std::option::Option, + /// Optional numeric score. + pub score: std::option::Option, + /// JSON-encoded payload. + pub payload_json: std::string::String, + /// Creation timestamp encoded as RFC3339 UTC text. + pub created_at: std::string::String, +} diff --git a/kb_lib/src/db/entities/onchain_observation.rs b/kb_lib/src/db/entities/onchain_observation.rs new file mode 100644 index 0000000..3453c31 --- /dev/null +++ b/kb_lib/src/db/entities/onchain_observation.rs @@ -0,0 +1,24 @@ +// file: kb_lib/src/db/entities/onchain_observation.rs + +//! On-chain observation entity. + +/// Persisted on-chain observation row. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, sqlx::FromRow)] +pub struct KbOnchainObservationEntity { + /// Numeric primary key. + pub id: i64, + /// Observation kind. + pub observation_kind: std::string::String, + /// Observation source family stored as stable integer. + pub source_kind: i16, + /// Optional source endpoint logical name. + pub endpoint_name: std::option::Option, + /// Logical object key, for example a mint, signature or pool address. + pub object_key: std::string::String, + /// Optional slot number. + pub slot: std::option::Option, + /// JSON-encoded raw payload. + pub payload_json: std::string::String, + /// Observation timestamp encoded as RFC3339 UTC text. + pub observed_at: std::string::String, +} diff --git a/kb_lib/src/db/queries.rs b/kb_lib/src/db/queries.rs index 898189e..f8271af 100644 --- a/kb_lib/src/db/queries.rs +++ b/kb_lib/src/db/queries.rs @@ -2,12 +2,16 @@ //! Database queries. +mod analysis_signal; mod db_metadata; mod db_runtime_event; mod known_http_endpoint; mod known_ws_endpoint; mod observed_token; +mod onchain_observation; +pub use crate::db::queries::analysis_signal::insert_analysis_signal; +pub use crate::db::queries::analysis_signal::list_recent_analysis_signals; pub use crate::db::queries::db_metadata::get_db_metadata; pub use crate::db::queries::db_metadata::list_db_metadata; pub use crate::db::queries::db_metadata::upsert_db_metadata; @@ -22,3 +26,5 @@ pub use crate::db::queries::known_ws_endpoint::upsert_known_ws_endpoint; pub use crate::db::queries::observed_token::get_observed_token_by_mint; pub use crate::db::queries::observed_token::list_observed_tokens; pub use crate::db::queries::observed_token::upsert_observed_token; +pub use crate::db::queries::onchain_observation::insert_onchain_observation; +pub use crate::db::queries::onchain_observation::list_recent_onchain_observations; diff --git a/kb_lib/src/db/queries/analysis_signal.rs b/kb_lib/src/db/queries/analysis_signal.rs new file mode 100644 index 0000000..6f413b8 --- /dev/null +++ b/kb_lib/src/db/queries/analysis_signal.rs @@ -0,0 +1,154 @@ +// file: kb_lib/src/db/queries/analysis_signal.rs + +//! Queries for `kb_analysis_signals`. + +/// Inserts one analysis signal row and returns its numeric id. +pub async fn insert_analysis_signal( + database: &crate::KbDatabase, + dto: &crate::KbAnalysisSignalDto, +) -> Result { + let payload_json_result = serde_json::to_string(&dto.payload); + let payload_json = match payload_json_result { + Ok(payload_json) => payload_json, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot serialize analysis signal payload: {}", + error + ))); + } + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_analysis_signals ( + signal_kind, + severity, + object_key, + related_observation_id, + score, + payload_json, + created_at +) +VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(dto.signal_kind.clone()) + .bind(dto.severity.to_i16()) + .bind(dto.object_key.clone()) + .bind(dto.related_observation_id) + .bind(dto.score) + .bind(payload_json) + .bind(dto.created_at.to_rfc3339()) + .execute(pool) + .await; + let query_result = match query_result { + Ok(query_result) => query_result, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot insert kb_analysis_signals on sqlite: {}", + error + ))); + } + }; + Ok(query_result.last_insert_rowid()) + } + } +} + +/// Lists recent analysis signals ordered from newest to oldest. +pub async fn list_recent_analysis_signals( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + signal_kind, + severity, + object_key, + related_observation_id, + score, + payload_json, + created_at +FROM kb_analysis_signals +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list analysis signals on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbAnalysisSignalDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn analysis_signal_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("analysis_signal.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dto = crate::KbAnalysisSignalDto::new( + "candidate_token".to_string(), + crate::KbAnalysisSignalSeverity::Medium, + "So11111111111111111111111111111111111111112".to_string(), + None, + Some(0.72), + serde_json::json!({ + "reason": "fresh_token_with_activity" + }), + ); + let inserted_id = crate::insert_analysis_signal(&database, &dto) + .await + .expect("insert must succeed"); + assert!(inserted_id > 0); + let listed = crate::list_recent_analysis_signals(&database, 10) + .await + .expect("list must succeed"); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].signal_kind, "candidate_token"); + assert_eq!(listed[0].severity, crate::KbAnalysisSignalSeverity::Medium); + assert_eq!(listed[0].score, Some(0.72)); + } +} diff --git a/kb_lib/src/db/queries/onchain_observation.rs b/kb_lib/src/db/queries/onchain_observation.rs new file mode 100644 index 0000000..83023af --- /dev/null +++ b/kb_lib/src/db/queries/onchain_observation.rs @@ -0,0 +1,170 @@ +// file: kb_lib/src/db/queries/onchain_observation.rs + +//! Queries for `kb_onchain_observations`. + +/// Inserts one on-chain observation row and returns its numeric id. +pub async fn insert_onchain_observation( + database: &crate::KbDatabase, + dto: &crate::KbOnchainObservationDto, +) -> Result { + let payload_json_result = serde_json::to_string(&dto.payload); + let payload_json = match payload_json_result { + Ok(payload_json) => payload_json, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot serialize on-chain observation payload: {}", + error + ))); + } + }; + let slot_i64 = match dto.slot { + Some(slot) => { + let slot_result = i64::try_from(slot); + match slot_result { + Ok(slot) => Some(slot), + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot convert on-chain observation slot '{}' to i64: {}", + slot, error + ))); + } + } + } + None => None, + }; + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query( + r#" +INSERT INTO kb_onchain_observations ( + observation_kind, + source_kind, + endpoint_name, + object_key, + slot, + payload_json, + observed_at +) +VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(dto.observation_kind.clone()) + .bind(dto.source_kind.to_i16()) + .bind(dto.endpoint_name.clone()) + .bind(dto.object_key.clone()) + .bind(slot_i64) + .bind(payload_json) + .bind(dto.observed_at.to_rfc3339()) + .execute(pool) + .await; + let query_result = match query_result { + Ok(query_result) => query_result, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot insert kb_onchain_observations on sqlite: {}", + error + ))); + } + }; + Ok(query_result.last_insert_rowid()) + } + } +} + +/// Lists recent on-chain observations ordered from newest to oldest. +pub async fn list_recent_onchain_observations( + database: &crate::KbDatabase, + limit: u32, +) -> Result, crate::KbError> { + if limit == 0 { + return Ok(std::vec::Vec::new()); + } + match database.connection() { + crate::KbDatabaseConnection::Sqlite(pool) => { + let query_result = sqlx::query_as::( + r#" +SELECT + id, + observation_kind, + source_kind, + endpoint_name, + object_key, + slot, + payload_json, + observed_at +FROM kb_onchain_observations +ORDER BY id DESC +LIMIT ? + "#, + ) + .bind(i64::from(limit)) + .fetch_all(pool) + .await; + let entities = match query_result { + Ok(entities) => entities, + Err(error) => { + return Err(crate::KbError::Db(format!( + "cannot list on-chain observations on sqlite: {}", + error + ))); + } + }; + let mut dtos = std::vec::Vec::new(); + for entity in entities { + let dto_result = crate::KbOnchainObservationDto::try_from(entity); + let dto = match dto_result { + Ok(dto) => dto, + Err(error) => return Err(error), + }; + dtos.push(dto); + } + Ok(dtos) + } + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn onchain_observation_roundtrip_works() { + let tempdir = tempfile::tempdir().expect("tempdir must succeed"); + let database_path = tempdir.path().join("onchain_observation.sqlite3"); + let config = crate::KbDatabaseConfig { + enabled: true, + backend: crate::KbDatabaseBackend::Sqlite, + sqlite: crate::KbSqliteDatabaseConfig { + path: database_path.to_string_lossy().to_string(), + create_if_missing: true, + busy_timeout_ms: 5000, + max_connections: 1, + auto_initialize_schema: true, + use_wal: true, + }, + }; + let database = crate::KbDatabase::connect_and_initialize(&config) + .await + .expect("database init must succeed"); + let dto = crate::KbOnchainObservationDto::new( + "token_discovered".to_string(), + crate::KbObservationSourceKind::WsRpc, + Some("mainnet_public_ws_slots".to_string()), + "So11111111111111111111111111111111111111112".to_string(), + Some(123456u64), + serde_json::json!({ + "mint": "So11111111111111111111111111111111111111112", + "source": "ws" + }), + ); + let inserted_id = crate::insert_onchain_observation(&database, &dto) + .await + .expect("insert must succeed"); + assert!(inserted_id > 0); + let listed = crate::list_recent_onchain_observations(&database, 10) + .await + .expect("list must succeed"); + assert_eq!(listed.len(), 1); + assert_eq!(listed[0].observation_kind, "token_discovered"); + assert_eq!(listed[0].source_kind, crate::KbObservationSourceKind::WsRpc); + assert_eq!(listed[0].slot, Some(123456u64)); + } +} diff --git a/kb_lib/src/db/schema.rs b/kb_lib/src/db/schema.rs index 7e387f5..569b95e 100644 --- a/kb_lib/src/db/schema.rs +++ b/kb_lib/src/db/schema.rs @@ -3,9 +3,7 @@ //! Database schema initialization. /// Ensures that the database schema exists. -pub(crate) async fn ensure_schema( - database: &crate::KbDatabase, -) -> Result<(), crate::KbError> { +pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), crate::KbError> { match database.connection() { crate::KbDatabaseConnection::Sqlite(pool) => { let metadata_table_result = sqlx::query( @@ -153,6 +151,107 @@ ON kb_observed_tokens (status) error ))); } + let onchain_observations_result = sqlx::query( + r#" +CREATE TABLE IF NOT EXISTS kb_onchain_observations ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + observation_kind TEXT NOT NULL, + source_kind INTEGER NOT NULL, + endpoint_name TEXT NULL, + object_key TEXT NOT NULL, + slot INTEGER NULL, + payload_json TEXT NOT NULL, + observed_at TEXT NOT NULL +) + "#, + ) + .execute(pool) + .await; + if let Err(error) = onchain_observations_result { + return Err(crate::KbError::Db(format!( + "cannot create table kb_onchain_observations on sqlite: {}", + error + ))); + } + let onchain_observations_object_key_index_result = sqlx::query( + r#" +CREATE INDEX IF NOT EXISTS kb_idx_onchain_observations_object_key +ON kb_onchain_observations (object_key) + "#, + ) + .execute(pool) + .await; + if let Err(error) = onchain_observations_object_key_index_result { + return Err(crate::KbError::Db(format!( + "cannot create index kb_idx_onchain_observations_object_key on sqlite: {}", + error + ))); + } + let onchain_observations_observed_at_index_result = sqlx::query( + r#" +CREATE INDEX IF NOT EXISTS kb_idx_onchain_observations_observed_at +ON kb_onchain_observations (observed_at) + "#, + ) + .execute(pool) + .await; + if let Err(error) = onchain_observations_observed_at_index_result { + return Err(crate::KbError::Db(format!( + "cannot create index kb_idx_onchain_observations_observed_at on sqlite: {}", + error + ))); + } + let analysis_signals_result = sqlx::query( + r#" +CREATE TABLE IF NOT EXISTS kb_analysis_signals ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + signal_kind TEXT NOT NULL, + severity INTEGER NOT NULL, + object_key TEXT NOT NULL, + related_observation_id INTEGER NULL, + score REAL NULL, + payload_json TEXT NOT NULL, + created_at TEXT NOT NULL, + FOREIGN KEY(related_observation_id) REFERENCES kb_onchain_observations(id) +) + "#, + ) + .execute(pool) + .await; + if let Err(error) = analysis_signals_result { + return Err(crate::KbError::Db(format!( + "cannot create table kb_analysis_signals on sqlite: {}", + error + ))); + } + let analysis_signals_object_key_index_result = sqlx::query( + r#" +CREATE INDEX IF NOT EXISTS kb_idx_analysis_signals_object_key +ON kb_analysis_signals (object_key) + "#, + ) + .execute(pool) + .await; + if let Err(error) = analysis_signals_object_key_index_result { + return Err(crate::KbError::Db(format!( + "cannot create index kb_idx_analysis_signals_object_key on sqlite: {}", + error + ))); + } + let analysis_signals_created_at_index_result = sqlx::query( + r#" +CREATE INDEX IF NOT EXISTS kb_idx_analysis_signals_created_at +ON kb_analysis_signals (created_at) + "#, + ) + .execute(pool) + .await; + if let Err(error) = analysis_signals_created_at_index_result { + return Err(crate::KbError::Db(format!( + "cannot create index kb_idx_analysis_signals_created_at on sqlite: {}", + error + ))); + } let schema_version = crate::KbDbMetadataDto::new( "schema_version".to_string(), env!("CARGO_PKG_VERSION").to_string(), @@ -162,6 +261,6 @@ ON kb_observed_tokens (status) return Err(error); } Ok(()) - }, + } } } diff --git a/kb_lib/src/db/types.rs b/kb_lib/src/db/types.rs index c2cb5a5..4d53a99 100644 --- a/kb_lib/src/db/types.rs +++ b/kb_lib/src/db/types.rs @@ -2,10 +2,14 @@ //! Database shared types. +mod analysis_signal_severity; mod database_backend; +mod observation_source_kind; mod observed_token_status; mod runtime_event_level; +pub use crate::db::types::analysis_signal_severity::KbAnalysisSignalSeverity; pub use crate::db::types::database_backend::KbDatabaseBackend; +pub use crate::db::types::observation_source_kind::KbObservationSourceKind; pub use crate::db::types::observed_token_status::KbObservedTokenStatus; pub use crate::db::types::runtime_event_level::KbDbRuntimeEventLevel; diff --git a/kb_lib/src/db/types/analysis_signal_severity.rs b/kb_lib/src/db/types/analysis_signal_severity.rs new file mode 100644 index 0000000..fb5422b --- /dev/null +++ b/kb_lib/src/db/types/analysis_signal_severity.rs @@ -0,0 +1,48 @@ +// file: kb_lib/src/db/types/analysis_signal_severity.rs + +//! Analysis signal severity. + +/// Severity for one analysis signal. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum KbAnalysisSignalSeverity { + /// Informational signal. + Info, + /// Low-importance signal. + Low, + /// Medium-importance signal. + Medium, + /// High-importance signal. + High, + /// Critical signal. + Critical, +} + +impl KbAnalysisSignalSeverity { + /// Converts the severity to its stable integer representation. + pub fn to_i16(self) -> i16 { + match self { + Self::Info => 0, + Self::Low => 1, + Self::Medium => 2, + Self::High => 3, + Self::Critical => 4, + } + } + + /// Restores a severity from its stable integer representation. + pub fn from_i16( + value: i16, + ) -> Result { + match value { + 0 => Ok(Self::Info), + 1 => Ok(Self::Low), + 2 => Ok(Self::Medium), + 3 => Ok(Self::High), + 4 => Ok(Self::Critical), + _ => Err(crate::KbError::Db(format!( + "invalid KbAnalysisSignalSeverity value: {}", + value + ))), + } + } +} diff --git a/kb_lib/src/db/types/observation_source_kind.rs b/kb_lib/src/db/types/observation_source_kind.rs new file mode 100644 index 0000000..e14b974 --- /dev/null +++ b/kb_lib/src/db/types/observation_source_kind.rs @@ -0,0 +1,46 @@ +// file: kb_lib/src/db/types/observation_source_kind.rs + +//! Observation source kind. + +/// Source family for one on-chain observation. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum KbObservationSourceKind { + /// HTTP RPC source. + HttpRpc, + /// WebSocket RPC source. + WsRpc, + /// Yellowstone gRPC source. + Grpc, + /// DEX connector source. + Dex, + /// Other source kind. + Other, +} + +impl KbObservationSourceKind { + /// Converts the source kind to its stable integer representation. + pub fn to_i16(self) -> i16 { + match self { + Self::HttpRpc => 0, + Self::WsRpc => 1, + Self::Grpc => 2, + Self::Dex => 3, + Self::Other => 4, + } + } + + /// Restores a source kind from its stable integer representation. + pub fn from_i16(value: i16) -> Result { + match value { + 0 => Ok(Self::HttpRpc), + 1 => Ok(Self::WsRpc), + 2 => Ok(Self::Grpc), + 3 => Ok(Self::Dex), + 4 => Ok(Self::Other), + _ => Err(crate::KbError::Db(format!( + "invalid KbObservationSourceKind value: {}", + value + ))), + } + } +} diff --git a/kb_lib/src/lib.rs b/kb_lib/src/lib.rs index c078202..17a34d5 100644 --- a/kb_lib/src/lib.rs +++ b/kb_lib/src/lib.rs @@ -81,6 +81,16 @@ pub use crate::db::KbKnownWsEndpointEntity; pub use crate::db::KbObservedTokenDto; pub use crate::db::KbObservedTokenEntity; pub use crate::db::KbObservedTokenStatus; +pub use crate::db::KbAnalysisSignalDto; +pub use crate::db::KbAnalysisSignalEntity; +pub use crate::db::KbAnalysisSignalSeverity; +pub use crate::db::KbObservationSourceKind; +pub use crate::db::KbOnchainObservationDto; +pub use crate::db::KbOnchainObservationEntity; +pub use crate::db::insert_analysis_signal; +pub use crate::db::insert_onchain_observation; +pub use crate::db::list_recent_analysis_signals; +pub use crate::db::list_recent_onchain_observations; pub use crate::db::get_observed_token_by_mint; pub use crate::db::list_observed_tokens; pub use crate::db::upsert_observed_token;