This commit is contained in:
2026-04-18 11:03:41 +02:00
parent 0dc797298e
commit b5cde7d512
5 changed files with 128 additions and 2 deletions

View File

@@ -26,11 +26,53 @@ pub async fn run_listener_runtime(
let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms);
let mut interval = tokio::time::interval(tick_duration);
let mut tick_count: u64 = 0;
let http_client_config = crate::KhbbSolanaHttpRpcClientConfig {
url: config.solana_http_rpc_url.clone(),
};
let http_client_result =
crate::KhbbSolanaHttpRpcClient::new(http_client_config);
let http_client = match http_client_result {
Ok(value) => value,
Err(error) => {
return Err(error);
}
};
loop {
tokio::select! {
_ = interval.tick() => {
tick_count = tick_count.saturating_add(1);
let slot_result = http_client.get_slot(tick_count as u64).await;
match slot_result {
Ok(call_output) => {
let status = if call_output.response.error.is_some() {
"error"
} else {
"ok"
};
let insert_result = crate::storage::insert_raw_http_rpc_message(
&pool,
session.id,
call_output.request_id as i64,
&call_output.method,
&call_output.request_body,
&call_output.response_body,
status,
)
.await;
if let Err(error) = insert_result {
tracing::error!(
error = %error,
"failed to insert raw http rpc message"
);
}
}
Err(error) => {
tracing::error!(
error = %error,
"http rpc get_slot failed"
);
}
}
tracing::trace!(
listener_session_id = session.id,
tick_count = tick_count,

View File

@@ -60,6 +60,10 @@ pub struct KhbbHttpRpcCallOutput<TResult>
where
TResult: serde::Serialize + serde::de::DeserializeOwned,
{
/// Request identifier.
pub request_id: u64,
/// Request method.
pub method: std::string::String,
/// Serialized request body sent to the server.
pub request_body: std::string::String,
/// Raw response body received from the server.
@@ -113,6 +117,13 @@ impl KhbbSolanaHttpRpcClient {
TParams: serde::Serialize,
{
let request_result = build_json_rpc_request(rpc_request, params, id);
let method_name_result = rpc_request_method_name(rpc_request);
let method_name = match method_name_result {
Ok(value) => value,
Err(error) => {
return Err(error);
},
};
let request = match request_result {
Ok(value) => value,
Err(error) => {
@@ -169,6 +180,8 @@ impl KhbbSolanaHttpRpcClient {
});
}
Ok(KhbbHttpRpcCallOutput {
request_id: id,
method: std::string::String::from(method_name),
request_body,
response_body,
response: parsed_response,

View File

@@ -107,9 +107,11 @@ CREATE TABLE IF NOT EXISTS listener_sessions (
CREATE TABLE IF NOT EXISTS raw_http_rpc_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
listener_session_id INTEGER NOT NULL,
request_id INTEGER NOT NULL,
method TEXT NOT NULL,
request_body TEXT NOT NULL,
response_body TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id)
);
@@ -279,8 +281,52 @@ WHERE id = ?2;
}
}
pub(crate) async fn insert_raw_http_rpc_message(
pool: &sqlx::SqlitePool,
session_id: i64,
request_id: i64,
method: &str,
request_body: &str,
response_body: &str,
status: &str,
) -> core::result::Result<(), crate::KhbbError> {
let now = chrono::Utc::now().to_rfc3339();
let query_result = sqlx::query(
r#"
INSERT INTO raw_http_rpc_messages (
listener_session_id,
request_id,
method,
request_body,
response_body,
status,
created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(session_id)
.bind(request_id)
.bind(method)
.bind(request_body)
.bind(response_body)
.bind(status)
.bind(now)
.execute(pool)
.await;
match query_result {
Ok(_) => Ok(()),
Err(error) => Err(crate::KhbbError::Database {
context: "insert raw http rpc message",
message: error.to_string(),
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn build_test_config(database_url: std::string::String) -> crate::KhbbAppConfig {
crate::KhbbAppConfig {
database_url,
@@ -404,4 +450,27 @@ WHERE id = ?1;
let status = fetch_result.expect("fetch updated status");
assert_eq!(status, "stopped");
}
#[tokio::test]
async fn insert_raw_http_rpc_message_inserts_row() {
let pool = create_sqlite_pool("sqlite::memory:").await.expect("pool");
ensure_sqlite_schema(&pool).await.expect("schema");
let session = insert_listener_session(
&pool,
&crate::KhbbAppConfig {
database_url: "sqlite::memory:".into(),
solana_http_rpc_url: "http://localhost".into(),
solana_ws_rpc_url: "ws://localhost".into(),
yellowstone_grpc_url: None,
log_filter: "info".into(),
bootstrap_database: false,
listener_poll_interval_ms: 1000,
},
)
.await
.expect("session");
let result =
insert_raw_http_rpc_message(&pool, session.id, 1, "getSlot", "{}", "{}", "ok").await;
assert!(result.is_ok());
}
}