This commit is contained in:
2026-04-24 08:53:40 +02:00
parent a7030d7d0f
commit 54778373b8
31 changed files with 1706 additions and 164 deletions

View File

@@ -6,7 +6,7 @@
pub(crate) async fn ensure_schema(database: &crate::KbDatabase) -> Result<(), crate::KbError> {
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let metadata_table_result = sqlx::query(
let statements = vec![
r#"
CREATE TABLE IF NOT EXISTS kb_db_metadata (
key TEXT NOT NULL PRIMARY KEY,
@@ -14,16 +14,6 @@ CREATE TABLE IF NOT EXISTS kb_db_metadata (
updated_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await;
if let Err(error) = metadata_table_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_db_metadata on sqlite: {}",
error
)));
}
let known_http_endpoints_result = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kb_known_http_endpoints (
name TEXT NOT NULL PRIMARY KEY,
@@ -35,16 +25,6 @@ CREATE TABLE IF NOT EXISTS kb_known_http_endpoints (
updated_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await;
if let Err(error) = known_http_endpoints_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_known_http_endpoints on sqlite: {}",
error
)));
}
let known_ws_endpoints_result = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kb_known_ws_endpoints (
name TEXT NOT NULL PRIMARY KEY,
@@ -56,16 +36,6 @@ CREATE TABLE IF NOT EXISTS kb_known_ws_endpoints (
updated_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await;
if let Err(error) = known_ws_endpoints_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_known_ws_endpoints on sqlite: {}",
error
)));
}
let runtime_events_result = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kb_db_runtime_events (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -76,30 +46,10 @@ CREATE TABLE IF NOT EXISTS kb_db_runtime_events (
created_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await;
if let Err(error) = runtime_events_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_db_runtime_events on sqlite: {}",
error
)));
}
let runtime_events_index_result = sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS kb_idx_db_runtime_events_created_at
ON kb_db_runtime_events (created_at)
"#,
)
.execute(pool)
.await;
if let Err(error) = runtime_events_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_db_runtime_events_created_at on sqlite: {}",
error
)));
}
let observed_tokens_result = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kb_observed_tokens (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -114,44 +64,14 @@ CREATE TABLE IF NOT EXISTS kb_observed_tokens (
updated_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await;
if let Err(error) = observed_tokens_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_observed_tokens on sqlite: {}",
error
)));
}
let observed_tokens_mint_index_result = sqlx::query(
r#"
CREATE UNIQUE INDEX IF NOT EXISTS kb_idx_observed_tokens_mint
ON kb_observed_tokens (mint)
"#,
)
.execute(pool)
.await;
if let Err(error) = observed_tokens_mint_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_observed_tokens_mint on sqlite: {}",
error
)));
}
let observed_tokens_status_index_result = sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS kb_idx_observed_tokens_status
ON kb_observed_tokens (status)
"#,
)
.execute(pool)
.await;
if let Err(error) = observed_tokens_status_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_observed_tokens_status on sqlite: {}",
error
)));
}
let onchain_observations_result = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kb_onchain_observations (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -164,44 +84,14 @@ CREATE TABLE IF NOT EXISTS kb_onchain_observations (
observed_at TEXT NOT NULL
)
"#,
)
.execute(pool)
.await;
if let Err(error) = onchain_observations_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_onchain_observations on sqlite: {}",
error
)));
}
let onchain_observations_object_key_index_result = sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS kb_idx_onchain_observations_object_key
ON kb_onchain_observations (object_key)
"#,
)
.execute(pool)
.await;
if let Err(error) = onchain_observations_object_key_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_onchain_observations_object_key on sqlite: {}",
error
)));
}
let onchain_observations_observed_at_index_result = sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS kb_idx_onchain_observations_observed_at
ON kb_onchain_observations (observed_at)
"#,
)
.execute(pool)
.await;
if let Err(error) = onchain_observations_observed_at_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_onchain_observations_observed_at on sqlite: {}",
error
)));
}
let analysis_signals_result = sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kb_analysis_signals (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -215,42 +105,117 @@ CREATE TABLE IF NOT EXISTS kb_analysis_signals (
FOREIGN KEY(related_observation_id) REFERENCES kb_onchain_observations(id)
)
"#,
)
.execute(pool)
.await;
if let Err(error) = analysis_signals_result {
return Err(crate::KbError::Db(format!(
"cannot create table kb_analysis_signals on sqlite: {}",
error
)));
}
let analysis_signals_object_key_index_result = sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS kb_idx_analysis_signals_object_key
ON kb_analysis_signals (object_key)
"#,
)
.execute(pool)
.await;
if let Err(error) = analysis_signals_object_key_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_analysis_signals_object_key on sqlite: {}",
error
)));
}
let analysis_signals_created_at_index_result = sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS kb_idx_analysis_signals_created_at
ON kb_analysis_signals (created_at)
"#,
)
.execute(pool)
.await;
if let Err(error) = analysis_signals_created_at_index_result {
return Err(crate::KbError::Db(format!(
"cannot create index kb_idx_analysis_signals_created_at on sqlite: {}",
error
)));
r#"
CREATE TABLE IF NOT EXISTS kb_dexes (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
program_id TEXT NULL,
router_program_id TEXT NULL,
is_enabled INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"#,
r#"
CREATE TABLE IF NOT EXISTS kb_tokens (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
mint TEXT NOT NULL UNIQUE,
symbol TEXT NULL,
name TEXT NULL,
decimals INTEGER NULL,
token_program TEXT NOT NULL,
is_quote_token INTEGER NOT NULL,
first_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"#,
r#"
CREATE TABLE IF NOT EXISTS kb_pools (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
dex_id INTEGER NOT NULL,
address TEXT NOT NULL UNIQUE,
pool_kind INTEGER NOT NULL,
status INTEGER NOT NULL,
first_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(dex_id) REFERENCES kb_dexes(id)
)
"#,
r#"
CREATE INDEX IF NOT EXISTS kb_idx_pools_dex_id
ON kb_pools (dex_id)
"#,
r#"
CREATE TABLE IF NOT EXISTS kb_pairs (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
dex_id INTEGER NOT NULL,
pool_id INTEGER NOT NULL UNIQUE,
base_token_id INTEGER NOT NULL,
quote_token_id INTEGER NOT NULL,
symbol TEXT NULL,
first_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(dex_id) REFERENCES kb_dexes(id),
FOREIGN KEY(pool_id) REFERENCES kb_pools(id),
FOREIGN KEY(base_token_id) REFERENCES kb_tokens(id),
FOREIGN KEY(quote_token_id) REFERENCES kb_tokens(id)
)
"#,
r#"
CREATE TABLE IF NOT EXISTS kb_pool_tokens (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
pool_id INTEGER NOT NULL,
token_id INTEGER NOT NULL,
role INTEGER NOT NULL,
vault_address TEXT NULL,
token_order INTEGER NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(pool_id) REFERENCES kb_pools(id),
FOREIGN KEY(token_id) REFERENCES kb_tokens(id),
UNIQUE(pool_id, token_id, role)
)
"#,
r#"
CREATE TABLE IF NOT EXISTS kb_pool_listings (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
dex_id INTEGER NOT NULL,
pool_id INTEGER NOT NULL UNIQUE,
pair_id INTEGER NULL,
source_kind INTEGER NOT NULL,
source_endpoint_name TEXT NULL,
detected_at TEXT NOT NULL,
initial_base_reserve REAL NULL,
initial_quote_reserve REAL NULL,
initial_price_quote REAL NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(dex_id) REFERENCES kb_dexes(id),
FOREIGN KEY(pool_id) REFERENCES kb_pools(id),
FOREIGN KEY(pair_id) REFERENCES kb_pairs(id)
)
"#,
r#"
CREATE INDEX IF NOT EXISTS kb_idx_pool_listings_detected_at
ON kb_pool_listings (detected_at)
"#,
];
for statement in statements {
let execute_result = sqlx::query(statement).execute(pool).await;
if let Err(error) = execute_result {
return Err(crate::KbError::Db(format!(
"cannot initialize sqlite schema statement '{}': {}",
statement, error
)));
}
}
let schema_version = crate::KbDbMetadataDto::new(
"schema_version".to_string(),