This commit is contained in:
2026-04-24 05:47:31 +02:00
parent 6d00c0ddf4
commit a7030d7d0f
18 changed files with 842 additions and 21 deletions

View File

@@ -0,0 +1,154 @@
// file: kb_lib/src/db/queries/analysis_signal.rs
//! Queries for `kb_analysis_signals`.
/// Inserts one analysis signal row and returns its numeric id.
pub async fn insert_analysis_signal(
database: &crate::KbDatabase,
dto: &crate::KbAnalysisSignalDto,
) -> Result<i64, crate::KbError> {
let payload_json_result = serde_json::to_string(&dto.payload);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot serialize analysis signal payload: {}",
error
)));
}
};
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
INSERT INTO kb_analysis_signals (
signal_kind,
severity,
object_key,
related_observation_id,
score,
payload_json,
created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(dto.signal_kind.clone())
.bind(dto.severity.to_i16())
.bind(dto.object_key.clone())
.bind(dto.related_observation_id)
.bind(dto.score)
.bind(payload_json)
.bind(dto.created_at.to_rfc3339())
.execute(pool)
.await;
let query_result = match query_result {
Ok(query_result) => query_result,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot insert kb_analysis_signals on sqlite: {}",
error
)));
}
};
Ok(query_result.last_insert_rowid())
}
}
}
/// Lists recent analysis signals ordered from newest to oldest.
pub async fn list_recent_analysis_signals(
database: &crate::KbDatabase,
limit: u32,
) -> Result<std::vec::Vec<crate::KbAnalysisSignalDto>, crate::KbError> {
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbAnalysisSignalEntity>(
r#"
SELECT
id,
signal_kind,
severity,
object_key,
related_observation_id,
score,
payload_json,
created_at
FROM kb_analysis_signals
ORDER BY id DESC
LIMIT ?
"#,
)
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list analysis signals on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbAnalysisSignalDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn analysis_signal_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("analysis_signal.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
let dto = crate::KbAnalysisSignalDto::new(
"candidate_token".to_string(),
crate::KbAnalysisSignalSeverity::Medium,
"So11111111111111111111111111111111111111112".to_string(),
None,
Some(0.72),
serde_json::json!({
"reason": "fresh_token_with_activity"
}),
);
let inserted_id = crate::insert_analysis_signal(&database, &dto)
.await
.expect("insert must succeed");
assert!(inserted_id > 0);
let listed = crate::list_recent_analysis_signals(&database, 10)
.await
.expect("list must succeed");
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].signal_kind, "candidate_token");
assert_eq!(listed[0].severity, crate::KbAnalysisSignalSeverity::Medium);
assert_eq!(listed[0].score, Some(0.72));
}
}

View File

@@ -0,0 +1,170 @@
// file: kb_lib/src/db/queries/onchain_observation.rs
//! Queries for `kb_onchain_observations`.
/// Inserts one on-chain observation row and returns its numeric id.
pub async fn insert_onchain_observation(
database: &crate::KbDatabase,
dto: &crate::KbOnchainObservationDto,
) -> Result<i64, crate::KbError> {
let payload_json_result = serde_json::to_string(&dto.payload);
let payload_json = match payload_json_result {
Ok(payload_json) => payload_json,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot serialize on-chain observation payload: {}",
error
)));
}
};
let slot_i64 = match dto.slot {
Some(slot) => {
let slot_result = i64::try_from(slot);
match slot_result {
Ok(slot) => Some(slot),
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot convert on-chain observation slot '{}' to i64: {}",
slot, error
)));
}
}
}
None => None,
};
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query(
r#"
INSERT INTO kb_onchain_observations (
observation_kind,
source_kind,
endpoint_name,
object_key,
slot,
payload_json,
observed_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(dto.observation_kind.clone())
.bind(dto.source_kind.to_i16())
.bind(dto.endpoint_name.clone())
.bind(dto.object_key.clone())
.bind(slot_i64)
.bind(payload_json)
.bind(dto.observed_at.to_rfc3339())
.execute(pool)
.await;
let query_result = match query_result {
Ok(query_result) => query_result,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot insert kb_onchain_observations on sqlite: {}",
error
)));
}
};
Ok(query_result.last_insert_rowid())
}
}
}
/// Lists recent on-chain observations ordered from newest to oldest.
pub async fn list_recent_onchain_observations(
database: &crate::KbDatabase,
limit: u32,
) -> Result<std::vec::Vec<crate::KbOnchainObservationDto>, crate::KbError> {
if limit == 0 {
return Ok(std::vec::Vec::new());
}
match database.connection() {
crate::KbDatabaseConnection::Sqlite(pool) => {
let query_result = sqlx::query_as::<sqlx::Sqlite, crate::KbOnchainObservationEntity>(
r#"
SELECT
id,
observation_kind,
source_kind,
endpoint_name,
object_key,
slot,
payload_json,
observed_at
FROM kb_onchain_observations
ORDER BY id DESC
LIMIT ?
"#,
)
.bind(i64::from(limit))
.fetch_all(pool)
.await;
let entities = match query_result {
Ok(entities) => entities,
Err(error) => {
return Err(crate::KbError::Db(format!(
"cannot list on-chain observations on sqlite: {}",
error
)));
}
};
let mut dtos = std::vec::Vec::new();
for entity in entities {
let dto_result = crate::KbOnchainObservationDto::try_from(entity);
let dto = match dto_result {
Ok(dto) => dto,
Err(error) => return Err(error),
};
dtos.push(dto);
}
Ok(dtos)
}
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn onchain_observation_roundtrip_works() {
let tempdir = tempfile::tempdir().expect("tempdir must succeed");
let database_path = tempdir.path().join("onchain_observation.sqlite3");
let config = crate::KbDatabaseConfig {
enabled: true,
backend: crate::KbDatabaseBackend::Sqlite,
sqlite: crate::KbSqliteDatabaseConfig {
path: database_path.to_string_lossy().to_string(),
create_if_missing: true,
busy_timeout_ms: 5000,
max_connections: 1,
auto_initialize_schema: true,
use_wal: true,
},
};
let database = crate::KbDatabase::connect_and_initialize(&config)
.await
.expect("database init must succeed");
let dto = crate::KbOnchainObservationDto::new(
"token_discovered".to_string(),
crate::KbObservationSourceKind::WsRpc,
Some("mainnet_public_ws_slots".to_string()),
"So11111111111111111111111111111111111111112".to_string(),
Some(123456u64),
serde_json::json!({
"mint": "So11111111111111111111111111111111111111112",
"source": "ws"
}),
);
let inserted_id = crate::insert_onchain_observation(&database, &dto)
.await
.expect("insert must succeed");
assert!(inserted_id > 0);
let listed = crate::list_recent_onchain_observations(&database, 10)
.await
.expect("list must succeed");
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].observation_kind, "token_discovered");
assert_eq!(listed[0].source_kind, crate::KbObservationSourceKind::WsRpc);
assert_eq!(listed[0].slot, Some(123456u64));
}
}