diff --git a/.gitignore b/.gitignore index 29202a6..3f251b9 100644 --- a/.gitignore +++ b/.gitignore @@ -36,7 +36,9 @@ var/ !.env.dev config.json + # sqlite +dbdata *.db *.db-shm *.db-wal diff --git a/Cargo.toml b/Cargo.toml index 767675c..50b0a7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ ] [workspace.package] -version = "0.3.1" +version = "0.3.2" edition = "2024" license = "MIT" repository = "https://git.sasedev.com/Sasedev/khadhroony-bobot" diff --git a/khbb_lib/src/listener.rs b/khbb_lib/src/listener.rs index 2df6c22..abc4ffa 100644 --- a/khbb_lib/src/listener.rs +++ b/khbb_lib/src/listener.rs @@ -26,11 +26,53 @@ pub async fn run_listener_runtime( let tick_duration = std::time::Duration::from_millis(config.listener_poll_interval_ms); let mut interval = tokio::time::interval(tick_duration); let mut tick_count: u64 = 0; + let http_client_config = crate::KhbbSolanaHttpRpcClientConfig { + url: config.solana_http_rpc_url.clone(), + }; + let http_client_result = + crate::KhbbSolanaHttpRpcClient::new(http_client_config); + let http_client = match http_client_result { + Ok(value) => value, + Err(error) => { + return Err(error); + } + }; loop { tokio::select! { _ = interval.tick() => { tick_count = tick_count.saturating_add(1); - + let slot_result = http_client.get_slot(tick_count as u64).await; + match slot_result { + Ok(call_output) => { + let status = if call_output.response.error.is_some() { + "error" + } else { + "ok" + }; + let insert_result = crate::storage::insert_raw_http_rpc_message( + &pool, + session.id, + call_output.request_id as i64, + &call_output.method, + &call_output.request_body, + &call_output.response_body, + status, + ) + .await; + if let Err(error) = insert_result { + tracing::error!( + error = %error, + "failed to insert raw http rpc message" + ); + } + } + Err(error) => { + tracing::error!( + error = %error, + "http rpc get_slot failed" + ); + } + } tracing::trace!( listener_session_id = session.id, tick_count = tick_count, diff --git a/khbb_lib/src/solana_rpc_http.rs b/khbb_lib/src/solana_rpc_http.rs index e8d2236..d065728 100644 --- a/khbb_lib/src/solana_rpc_http.rs +++ b/khbb_lib/src/solana_rpc_http.rs @@ -60,6 +60,10 @@ pub struct KhbbHttpRpcCallOutput where TResult: serde::Serialize + serde::de::DeserializeOwned, { + /// Request identifier. + pub request_id: u64, + /// Request method. + pub method: std::string::String, /// Serialized request body sent to the server. pub request_body: std::string::String, /// Raw response body received from the server. @@ -113,6 +117,13 @@ impl KhbbSolanaHttpRpcClient { TParams: serde::Serialize, { let request_result = build_json_rpc_request(rpc_request, params, id); + let method_name_result = rpc_request_method_name(rpc_request); + let method_name = match method_name_result { + Ok(value) => value, + Err(error) => { + return Err(error); + }, + }; let request = match request_result { Ok(value) => value, Err(error) => { @@ -169,6 +180,8 @@ impl KhbbSolanaHttpRpcClient { }); } Ok(KhbbHttpRpcCallOutput { + request_id: id, + method: std::string::String::from(method_name), request_body, response_body, response: parsed_response, diff --git a/khbb_lib/src/storage.rs b/khbb_lib/src/storage.rs index 52a13f8..c4a07da 100644 --- a/khbb_lib/src/storage.rs +++ b/khbb_lib/src/storage.rs @@ -107,9 +107,11 @@ CREATE TABLE IF NOT EXISTS listener_sessions ( CREATE TABLE IF NOT EXISTS raw_http_rpc_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, listener_session_id INTEGER NOT NULL, + request_id INTEGER NOT NULL, method TEXT NOT NULL, request_body TEXT NOT NULL, response_body TEXT NOT NULL, + status TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(listener_session_id) REFERENCES listener_sessions(id) ); @@ -279,8 +281,52 @@ WHERE id = ?2; } } +pub(crate) async fn insert_raw_http_rpc_message( + pool: &sqlx::SqlitePool, + session_id: i64, + request_id: i64, + method: &str, + request_body: &str, + response_body: &str, + status: &str, +) -> core::result::Result<(), crate::KhbbError> { + let now = chrono::Utc::now().to_rfc3339(); + let query_result = sqlx::query( + r#" +INSERT INTO raw_http_rpc_messages ( + listener_session_id, + request_id, + method, + request_body, + response_body, + status, + created_at +) +VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(session_id) + .bind(request_id) + .bind(method) + .bind(request_body) + .bind(response_body) + .bind(status) + .bind(now) + .execute(pool) + .await; + match query_result { + Ok(_) => Ok(()), + Err(error) => Err(crate::KhbbError::Database { + context: "insert raw http rpc message", + message: error.to_string(), + }), + } +} + #[cfg(test)] mod tests { + use super::*; + fn build_test_config(database_url: std::string::String) -> crate::KhbbAppConfig { crate::KhbbAppConfig { database_url, @@ -404,4 +450,27 @@ WHERE id = ?1; let status = fetch_result.expect("fetch updated status"); assert_eq!(status, "stopped"); } + + #[tokio::test] + async fn insert_raw_http_rpc_message_inserts_row() { + let pool = create_sqlite_pool("sqlite::memory:").await.expect("pool"); + ensure_sqlite_schema(&pool).await.expect("schema"); + let session = insert_listener_session( + &pool, + &crate::KhbbAppConfig { + database_url: "sqlite::memory:".into(), + solana_http_rpc_url: "http://localhost".into(), + solana_ws_rpc_url: "ws://localhost".into(), + yellowstone_grpc_url: None, + log_filter: "info".into(), + bootstrap_database: false, + listener_poll_interval_ms: 1000, + }, + ) + .await + .expect("session"); + let result = + insert_raw_http_rpc_message(&pool, session.id, 1, "getSlot", "{}", "{}", "ok").await; + assert!(result.is_ok()); + } }