// file: kb_app/src/demo_pipeline.rs //! Tauri commands for the pipeline inspection demo window. use tauri::Manager; /// Request payload for one pipeline inspection by signature. #[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectRequest.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineInspectRequest { /// Transaction signature to inspect. pub signature: std::string::String, /// Optional custom timeframe in seconds for on-demand candle rebuild. pub custom_timeframe_seconds: std::option::Option, } /// Response payload for one pipeline inspection. #[derive(Clone, Debug, serde::Serialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPayload.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineInspectPayload { /// Inspected signature. pub signature: std::string::String, /// Summary JSON block. pub summary_json: std::string::String, /// Resolved transaction JSON block. pub transaction_json: std::string::String, /// Decoded events JSON block. pub decoded_events_json: std::string::String, /// Pools JSON block. pub pools_json: std::string::String, /// Pairs JSON block. pub pairs_json: std::string::String, /// Launch attributions JSON block. pub launch_attributions_json: std::string::String, /// Pool origins JSON block. pub pool_origins_json: std::string::String, /// Wallet inspection JSON block. pub wallets_json: std::string::String, /// Trade events JSON block. pub trade_events_json: std::string::String, /// Pair metrics JSON block. pub pair_metrics_json: std::string::String, /// Pair candles JSON block. pub pair_candles_json: std::string::String, /// Pair analytic signals JSON block. pub pair_analytic_signals_json: std::string::String, } /// Request payload for one pipeline inspection by token mint. #[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectTokenRequest.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineInspectTokenRequest { /// Token mint to inspect. pub token_mint: std::string::String, /// Optional custom timeframe in seconds for on-demand candle rebuild. pub custom_timeframe_seconds: std::option::Option, } /// Request payload for one pipeline inspection by pair id. #[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPairRequest.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineInspectPairRequest { /// Pair id to inspect. pub pair_id: i64, /// Optional custom timeframe in seconds for on-demand candle rebuild. pub custom_timeframe_seconds: std::option::Option, } /// Request payload for one pipeline inspection by pool address. #[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineInspectPoolRequest.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineInspectPoolRequest { /// Pool address to inspect. pub pool_address: std::string::String, /// Optional custom timeframe in seconds for on-demand candle rebuild. pub custom_timeframe_seconds: std::option::Option, } /// Request payload for one token backfill launched from `kb_app`. #[derive(Clone, Debug, serde::Deserialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillTokenRequest.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineBackfillTokenRequest { /// Token mint to backfill. pub token_mint: std::string::String, /// HTTP role used to select one endpoint in the pool. pub http_role: std::option::Option, /// Maximum number of signatures fetched directly from the mint. pub mint_signature_limit: usize, /// Maximum number of signatures fetched from each discovered pool. pub pool_signature_limit: usize, } /// Response payload for one token backfill launched from `kb_app`. #[derive(Clone, Debug, serde::Serialize, ts_rs::TS)] #[ts(export, export_to = "../frontend/ts/bindings/KbDemoPipelineBackfillTokenPayload.ts")] #[serde(rename_all = "camelCase")] pub(crate) struct KbDemoPipelineBackfillTokenPayload { /// Backfilled token mint. pub token_mint: std::string::String, /// HTTP role used during backfill. pub http_role: std::string::String, /// Pretty JSON summary returned by `KbTokenBackfillService`. pub backfill_json: std::string::String, /// Whether the token exists in persisted token objects after backfill. pub token_persisted_after_backfill: bool, } /// Launches one token backfill through the persisted `kb_lib` services. #[tauri::command] pub(crate) async fn demo_pipeline_backfill_token_mint( state: tauri::State<'_, crate::KbAppState>, request: KbDemoPipelineBackfillTokenRequest, ) -> Result { let token_mint = request.token_mint.trim().to_string(); if token_mint.is_empty() { return Err("demo pipeline backfill token mint must not be empty".to_string()); } let http_role = match request.http_role.clone() { Some(http_role) => { let trimmed = http_role.trim().to_string(); if trimmed.is_empty() { "history_backfill".to_string() } else { trimmed } } None => "history_backfill".to_string(), }; if request.mint_signature_limit == 0 { return Err("demo pipeline mintSignatureLimit must be > 0".to_string()); } if request.pool_signature_limit == 0 { return Err("demo pipeline poolSignatureLimit must be > 0".to_string()); } let database = state.database.clone(); let http_pool = std::sync::Arc::new(state.http_pool.clone()); let service = kb_lib::KbTokenBackfillService::new(http_pool, database.clone(), http_role.clone()); let backfill_result = service .backfill_token_by_mint( token_mint.as_str(), request.mint_signature_limit, request.pool_signature_limit, ) .await; let backfill = match backfill_result { Ok(backfill) => backfill, Err(error) => { return Err(format!( "cannot backfill token mint '{}' with role '{}': {}", token_mint, http_role, error )); } }; let backfill_json_result = serde_json::to_string_pretty(&backfill); let backfill_json = match backfill_json_result { Ok(backfill_json) => backfill_json, Err(error) => { return Err(format!( "cannot serialize token backfill result for '{}': {}", token_mint, error )); } }; let token_result = kb_lib::get_token_by_mint(database.as_ref(), token_mint.as_str()).await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => { return Err(format!( "cannot verify persisted token mint '{}' after backfill with role '{}': {}", token_mint, http_role, error )); } }; let token_persisted_after_backfill = token_option.is_some(); Ok(KbDemoPipelineBackfillTokenPayload { token_mint, http_role, backfill_json, token_persisted_after_backfill, }) } /// Inspects one pair id through the persisted `kb_lib` pipeline state. #[tauri::command] pub(crate) async fn demo_pipeline_inspect_pair_id( state: tauri::State<'_, crate::KbAppState>, request: KbDemoPipelineInspectPairRequest, ) -> Result { if request.pair_id <= 0 { return Err("demo pipeline pair id must be > 0".to_string()); } let database = state.database.clone(); let pairs_result = kb_lib::list_pairs(database.as_ref()).await; let all_pairs = match pairs_result { Ok(all_pairs) => all_pairs, Err(error) => { return Err(format!("cannot list pairs from database: {}", error)); } }; let mut inspected_pair_option = std::option::Option::::None; for pair in all_pairs { let pair_id_option = pair.id; let pair_id = match pair_id_option { Some(pair_id) => pair_id, None => continue, }; if pair_id == request.pair_id { inspected_pair_option = Some(pair); break; } } let inspected_pair = match inspected_pair_option { Some(inspected_pair) => inspected_pair, None => { return Err(format!( "unknown pair id '{}' in local pipeline database '{}'", request.pair_id, state.database.database_url() )); } }; kb_demo_pipeline_build_pair_payload( state.database.clone(), state.database.database_url().to_string(), inspected_pair, request.custom_timeframe_seconds, format!("pair:{}", request.pair_id), ) .await } /// Inspects one pool address through the persisted `kb_lib` pipeline state. #[tauri::command] pub(crate) async fn demo_pipeline_inspect_pool_address( state: tauri::State<'_, crate::KbAppState>, request: KbDemoPipelineInspectPoolRequest, ) -> Result { let pool_address = request.pool_address.trim().to_string(); if pool_address.is_empty() { return Err("demo pipeline pool address must not be empty".to_string()); } let database = state.database.clone(); let pool_result = kb_lib::get_pool_by_address(database.as_ref(), pool_address.as_str()).await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => { return Err(format!( "cannot read pool address '{}' from database: {}", pool_address, error )); } }; let pool = match pool_option { Some(pool) => pool, None => { return Err(format!( "unknown pool address '{}' in local pipeline database '{}'", pool_address, state.database.database_url() )); } }; let pool_id = match pool.id { Some(pool_id) => pool_id, None => { return Err(format!("pool '{}' has no internal id", pool.address)); } }; let pair_result = kb_lib::get_pair_by_pool_id(database.as_ref(), pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => { return Err(format!( "cannot fetch pair by pool_id '{}' for pool '{}': {}", pool_id, pool.address, error )); } }; let pair = match pair_option { Some(pair) => pair, None => { return Err(format!( "pool '{}' has no associated pair in local pipeline database", pool.address )); } }; kb_demo_pipeline_build_pair_payload( state.database.clone(), state.database.database_url().to_string(), pair, request.custom_timeframe_seconds, format!("pool:{}", pool_address), ) .await } /// Inspects one token mint through the persisted `kb_lib` pipeline state. #[tauri::command] pub(crate) async fn demo_pipeline_inspect_token_mint( state: tauri::State<'_, crate::KbAppState>, request: KbDemoPipelineInspectTokenRequest, ) -> Result { let token_mint = request.token_mint.trim().to_string(); if token_mint.is_empty() { return Err("demo pipeline token mint must not be empty".to_string()); } let database = state.database.clone(); let token_result = kb_lib::get_token_by_mint(database.as_ref(), token_mint.as_str()).await; let token_option = match token_result { Ok(token_option) => token_option, Err(error) => { return Err(format!( "cannot read token mint '{}' from database: {}", token_mint, error )); } }; let token = match token_option { Some(token) => token, None => { return Err(format!( "unknown token mint '{}' in local pipeline database '{}'", token_mint, state.database.database_url() )); } }; let token_id = match token.id { Some(token_id) => token_id, None => { return Err(format!("token mint '{}' has no internal id", token.mint)); } }; let pools_result = kb_lib::list_pools(database.as_ref()).await; let all_pools = match pools_result { Ok(all_pools) => all_pools, Err(error) => { return Err(format!("cannot list pools from database: {}", error)); } }; let pairs_result = kb_lib::list_pairs(database.as_ref()).await; let all_pairs = match pairs_result { Ok(all_pairs) => all_pairs, Err(error) => { return Err(format!("cannot list pairs from database: {}", error)); } }; let pool_listings_result = kb_lib::list_pool_listings(database.as_ref()).await; let all_pool_listings = match pool_listings_result { Ok(all_pool_listings) => all_pool_listings, Err(error) => { return Err(format!( "cannot list pool listings from database: {}", error )); } }; let mut pools = std::vec::Vec::::new(); let mut pool_ids = std::collections::BTreeSet::::new(); for pool in all_pools { let pool_id = match pool.id { Some(pool_id) => pool_id, None => continue, }; let pool_tokens_result = kb_lib::list_pool_tokens_by_pool_id(database.as_ref(), pool_id).await; let pool_tokens = match pool_tokens_result { Ok(pool_tokens) => pool_tokens, Err(error) => { return Err(format!( "cannot list pool tokens for pool_id '{}': {}", pool_id, error )); } }; let mut contains_token = false; for pool_token in pool_tokens { if pool_token.token_id == token_id { contains_token = true; break; } } if contains_token { pool_ids.insert(pool_id); pools.push(pool); } } let mut pairs = std::vec::Vec::::new(); let mut pair_ids = std::collections::BTreeSet::::new(); for pair in all_pairs { let pair_id = match pair.id { Some(pair_id) => pair_id, None => continue, }; if pair.base_token_id == token_id || pair.quote_token_id == token_id { pair_ids.insert(pair_id); pairs.push(pair); } } let mut pool_listings = std::vec::Vec::::new(); for listing in all_pool_listings { if pool_ids.contains(&listing.pool_id) { pool_listings.push(listing); } } let mut launch_attributions = std::vec::Vec::::new(); for pool_id in &pool_ids { let attributions_result = kb_lib::list_launch_attributions_by_pool_id(database.as_ref(), *pool_id).await; let attributions = match attributions_result { Ok(attributions) => attributions, Err(error) => { return Err(format!( "cannot list launch attributions for pool_id '{}': {}", pool_id, error )); } }; for attribution in attributions { launch_attributions.push(attribution); } } let mut pool_origins = std::vec::Vec::::new(); for pool_id in &pool_ids { let pool_origin_result = kb_lib::get_pool_origin_by_pool_id(database.as_ref(), *pool_id).await; let pool_origin_option = match pool_origin_result { Ok(pool_origin_option) => pool_origin_option, Err(error) => { return Err(format!( "cannot fetch pool origin for pool_id '{}': {}", pool_id, error )); } }; if let Some(pool_origin) = pool_origin_option { pool_origins.push(pool_origin); } } let mut wallet_holding_groups = std::vec::Vec::::new(); let wallets_result = kb_lib::list_wallets(database.as_ref()).await; let wallets = match wallets_result { Ok(wallets) => wallets, Err(error) => { return Err(format!("cannot list wallets from database: {}", error)); } }; for wallet in wallets { let wallet_id = match wallet.id { Some(wallet_id) => wallet_id, None => continue, }; let holdings_result = kb_lib::list_wallet_holdings_by_wallet_id(database.as_ref(), wallet_id).await; let holdings = match holdings_result { Ok(holdings) => holdings, Err(error) => { return Err(format!( "cannot list wallet holdings for wallet_id '{}': {}", wallet_id, error )); } }; let mut filtered_holdings = std::vec::Vec::new(); for holding in holdings { if holding.token_id == token_id { filtered_holdings.push(holding); } } if !filtered_holdings.is_empty() { let wallet_value_result = serde_json::to_value(&wallet); let wallet_value = match wallet_value_result { Ok(wallet_value) => wallet_value, Err(error) => { return Err(format!( "cannot serialize wallet '{}' to JSON value: {}", wallet.address, error )); } }; let holdings_value_result = serde_json::to_value(&filtered_holdings); let holdings_value = match holdings_value_result { Ok(holdings_value) => holdings_value, Err(error) => { return Err(format!( "cannot serialize holdings for wallet '{}' to JSON value: {}", wallet.address, error )); } }; wallet_holding_groups.push(serde_json::json!({ "walletAddress": wallet.address, "wallet": wallet_value, "holdings": holdings_value })); } } let mut pair_metrics = std::vec::Vec::::new(); let mut pair_candle_groups = std::vec::Vec::::new(); let mut pair_analytic_signal_groups = std::vec::Vec::::new(); let query_service = kb_lib::KbPairCandleQueryService::new(database.clone()); let mut timeframes = vec![60_i64, 300_i64, 900_i64, 3600_i64]; if let Some(custom_timeframe_seconds) = request.custom_timeframe_seconds { if custom_timeframe_seconds > 0 && !timeframes.contains(&custom_timeframe_seconds) { timeframes.push(custom_timeframe_seconds); } } for pair_id in &pair_ids { let pair_metric_result = kb_lib::get_pair_metric_by_pair_id(database.as_ref(), *pair_id).await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => { return Err(format!( "cannot fetch pair metric for pair_id '{}': {}", pair_id, error )); } }; if let Some(pair_metric) = pair_metric_option { pair_metrics.push(pair_metric); } let pair_signals_result = kb_lib::list_pair_analytic_signals_by_pair_id(database.as_ref(), *pair_id).await; let pair_signals = match pair_signals_result { Ok(pair_signals) => pair_signals, Err(error) => { return Err(format!( "cannot list pair analytic signals for pair_id '{}': {}", pair_id, error )); } }; let pair_signals_value_result = serde_json::to_value(&pair_signals); let pair_signals_value = match pair_signals_value_result { Ok(pair_signals_value) => pair_signals_value, Err(error) => { return Err(format!( "cannot serialize pair analytic signals for pair_id '{}': {}", pair_id, error )); } }; pair_analytic_signal_groups.push(serde_json::json!({ "pairId": pair_id, "signals": pair_signals_value })); for timeframe_seconds in &timeframes { let candles_result = if *timeframe_seconds == 60 || *timeframe_seconds == 300 || *timeframe_seconds == 900 || *timeframe_seconds == 3600 { kb_lib::list_pair_candles_by_pair_and_timeframe( database.as_ref(), *pair_id, *timeframe_seconds, ) .await } else { query_service .list_pair_candles(*pair_id, *timeframe_seconds, None, None, false) .await }; let candles = match candles_result { Ok(candles) => candles, Err(error) => { return Err(format!( "cannot list/rebuild pair candles for pair_id '{}' timeframe '{}': {}", pair_id, timeframe_seconds, error )); } }; let candles_value_result = serde_json::to_value(&candles); let candles_value = match candles_value_result { Ok(candles_value) => candles_value, Err(error) => { return Err(format!( "cannot serialize pair candles for pair_id '{}' timeframe '{}': {}", pair_id, timeframe_seconds, error )); } }; pair_candle_groups.push(serde_json::json!({ "pairId": pair_id, "timeframeSeconds": timeframe_seconds, "candles": candles_value })); } } let summary_value = serde_json::json!({ "mode": "tokenMint", "databaseUrl": state.database.database_url(), "tokenMint": token.mint, "tokenId": token_id, "customTimeframeSeconds": request.custom_timeframe_seconds, "poolCount": pools.len(), "pairCount": pairs.len(), "poolListingCount": pool_listings.len(), "launchAttributionCount": launch_attributions.len(), "poolOriginCount": pool_origins.len(), "walletHoldingGroupCount": wallet_holding_groups.len(), "pairMetricCount": pair_metrics.len(), "pairCandleGroupCount": pair_candle_groups.len(), "pairAnalyticSignalGroupCount": pair_analytic_signal_groups.len() }); let summary_json = kb_to_pretty_json(&summary_value, "summary")?; let transaction_json = kb_to_pretty_json(&token, "token")?; let decoded_events_json = kb_to_pretty_json(&pool_listings, "pool listings")?; let pools_json = kb_to_pretty_json(&pools, "pools")?; let pairs_json = kb_to_pretty_json(&pairs, "pairs")?; let launch_attributions_json = kb_to_pretty_json(&launch_attributions, "launch attributions")?; let pool_origins_json = kb_to_pretty_json(&pool_origins, "pool origins")?; let wallets_json = kb_to_pretty_json(&wallet_holding_groups, "wallet holdings")?; let trade_events_json = kb_to_pretty_json(&std::vec::Vec::::new(), "trade events")?; let pair_metrics_json = kb_to_pretty_json(&pair_metrics, "pair metrics")?; let pair_candles_json = kb_to_pretty_json(&pair_candle_groups, "pair candles")?; let pair_analytic_signals_json = kb_to_pretty_json(&pair_analytic_signal_groups, "pair analytic signals")?; Ok(KbDemoPipelineInspectPayload { signature: token_mint, summary_json, transaction_json, decoded_events_json, pools_json, pairs_json, launch_attributions_json, pool_origins_json, wallets_json, trade_events_json, pair_metrics_json, pair_candles_json, pair_analytic_signals_json, }) } /// Opens the dedicated pipeline inspection window. #[tauri::command] pub(crate) fn open_demo_pipeline_window( app_handle: tauri::AppHandle, ) -> Result<(), std::string::String> { let existing_window_option = app_handle.get_webview_window("demo_pipeline"); let demo_window = match existing_window_option { Some(demo_window) => demo_window, None => { let builder = tauri::WebviewWindowBuilder::new( &app_handle, "demo_pipeline", tauri::WebviewUrl::App("demo_pipeline.html".into()), ) .title("Demo Pipeline") .inner_size(1480.0, 920.0) .min_inner_size(1000.0, 700.0) .center() .visible(true) .transparent(false) .decorations(true); let build_result = builder.build(); match build_result { Ok(window) => window, Err(error) => { return Err(format!("cannot create demo_pipeline window: {error:?}")); } } } }; let show_result = demo_window.show(); if let Err(error) = show_result { return Err(format!("cannot show demo_pipeline window: {error:?}")); } let focus_result = demo_window.set_focus(); if let Err(error) = focus_result { return Err(format!("cannot focus demo_pipeline window: {error:?}")); } Ok(()) } /// Inspects one transaction signature through the persisted `kb_lib` pipeline state. #[tauri::command] pub(crate) async fn demo_pipeline_inspect_signature( state: tauri::State<'_, crate::KbAppState>, request: KbDemoPipelineInspectRequest, ) -> Result { let signature = request.signature.trim().to_string(); if signature.is_empty() { return Err("demo pipeline signature must not be empty".to_string()); } let database = state.database.clone(); let transaction_result = kb_lib::get_chain_transaction_by_signature(database.as_ref(), signature.as_str()).await; let transaction_option = match transaction_result { Ok(transaction_option) => transaction_option, Err(error) => { return Err(format!( "cannot read chain transaction '{}' from database: {}", signature, error )); } }; let transaction = match transaction_option { Some(transaction) => transaction, None => { return Err(format!( "unknown transaction signature '{}' in local pipeline database", signature )); } }; let transaction_id = match transaction.id { Some(transaction_id) => transaction_id, None => { return Err(format!("transaction '{}' has no internal id", signature)); } }; let decoded_events_result = kb_lib::list_dex_decoded_events_by_transaction_id(database.as_ref(), transaction_id).await; let decoded_events = match decoded_events_result { Ok(decoded_events) => decoded_events, Err(error) => { return Err(format!( "cannot list decoded events for transaction '{}': {}", signature, error )); } }; let mut decoded_event_ids = std::collections::BTreeSet::::new(); let mut pool_ids = std::collections::BTreeSet::::new(); let mut pair_ids = std::collections::BTreeSet::::new(); let mut wallet_addresses = std::collections::BTreeSet::::new(); let mut pools = std::vec::Vec::::new(); let mut pairs = std::vec::Vec::::new(); let mut launch_attributions = std::vec::Vec::::new(); let mut pool_origins = std::vec::Vec::::new(); for decoded_event in &decoded_events { if let Some(decoded_event_id) = decoded_event.id { decoded_event_ids.insert(decoded_event_id); let launch_attribution_result = kb_lib::get_launch_attribution_by_decoded_event_id( database.as_ref(), decoded_event_id, ) .await; let launch_attribution_option = match launch_attribution_result { Ok(launch_attribution_option) => launch_attribution_option, Err(error) => { return Err(format!( "cannot fetch launch attribution for decoded_event_id '{}': {}", decoded_event_id, error )); } }; if let Some(launch_attribution) = launch_attribution_option { launch_attributions.push(launch_attribution); } } let payload_parse_result = serde_json::from_str::(decoded_event.payload_json.as_str()); let payload_value = match payload_parse_result { Ok(payload_value) => payload_value, Err(error) => { return Err(format!( "cannot parse decoded_event payload_json for '{}': {}", signature, error )); } }; let extracted_wallets = kb_extract_wallet_addresses_from_value(&payload_value); for wallet_address in extracted_wallets { wallet_addresses.insert(wallet_address); } if let Some(token_a_mint) = decoded_event.token_a_mint.clone() { let _ = token_a_mint; } if let Some(token_b_mint) = decoded_event.token_b_mint.clone() { let _ = token_b_mint; } if let Some(pool_address) = decoded_event.pool_account.clone() { let pool_result = kb_lib::get_pool_by_address(database.as_ref(), pool_address.as_str()).await; let pool_option = match pool_result { Ok(pool_option) => pool_option, Err(error) => { return Err(format!( "cannot fetch pool by address '{}' for '{}': {}", pool_address, signature, error )); } }; if let Some(pool) = pool_option { let pool_id = match pool.id { Some(pool_id) => pool_id, None => { return Err(format!("pool '{}' has no internal id", pool.address)); } }; if !pool_ids.contains(&pool_id) { pool_ids.insert(pool_id); pools.push(pool.clone()); let pool_origin_result = kb_lib::get_pool_origin_by_pool_id(database.as_ref(), pool_id).await; let pool_origin_option = match pool_origin_result { Ok(pool_origin_option) => pool_origin_option, Err(error) => { return Err(format!( "cannot fetch pool origin for pool_id '{}': {}", pool_id, error )); } }; if let Some(pool_origin) = pool_origin_option { pool_origins.push(pool_origin); } let pair_result = kb_lib::get_pair_by_pool_id(database.as_ref(), pool_id).await; let pair_option = match pair_result { Ok(pair_option) => pair_option, Err(error) => { return Err(format!( "cannot fetch pair by pool_id '{}': {}", pool_id, error )); } }; if let Some(pair) = pair_option { let pair_id = match pair.id { Some(pair_id) => pair_id, None => { return Err(format!( "pair for pool '{}' has no internal id", pool_id )); } }; if !pair_ids.contains(&pair_id) { pair_ids.insert(pair_id); pairs.push(pair); } } } } } } let mut wallet_contexts = std::vec::Vec::::new(); let mut wallet_participation_count = 0_usize; let mut wallet_holding_count = 0_usize; for wallet_address in wallet_addresses { let wallet_result = kb_lib::get_wallet_by_address(database.as_ref(), wallet_address.as_str()).await; let wallet_option = match wallet_result { Ok(wallet_option) => wallet_option, Err(error) => { return Err(format!( "cannot fetch wallet by address '{}': {}", wallet_address, error )); } }; let wallet = match wallet_option { Some(wallet) => wallet, None => continue, }; let wallet_id = match wallet.id { Some(wallet_id) => wallet_id, None => { return Err(format!("wallet '{}' has no internal id", wallet.address)); } }; let participations_result = kb_lib::list_wallet_participations_by_wallet_id(database.as_ref(), wallet_id).await; let participations = match participations_result { Ok(participations) => participations, Err(error) => { return Err(format!( "cannot list wallet participations for wallet_id '{}': {}", wallet_id, error )); } }; let mut filtered_participations = std::vec::Vec::new(); for participation in participations { let mut include_participation = false; if participation.transaction_id == transaction_id { include_participation = true; } if !include_participation { if let Some(decoded_event_id) = participation.decoded_event_id { if decoded_event_ids.contains(&decoded_event_id) { include_participation = true; } } } if include_participation { filtered_participations.push(participation); } } let holdings_result = kb_lib::list_wallet_holdings_by_wallet_id(database.as_ref(), wallet_id).await; let holdings = match holdings_result { Ok(holdings) => holdings, Err(error) => { return Err(format!( "cannot list wallet holdings for wallet_id '{}': {}", wallet_id, error )); } }; let mut filtered_holdings = std::vec::Vec::new(); for holding in holdings { if holding.last_transaction_id == transaction_id { filtered_holdings.push(holding); } } wallet_participation_count += filtered_participations.len(); wallet_holding_count += filtered_holdings.len(); let wallet_value_result = serde_json::to_value(&wallet); let wallet_value = match wallet_value_result { Ok(wallet_value) => wallet_value, Err(error) => { return Err(format!( "cannot serialize wallet '{}' to JSON value: {}", wallet.address, error )); } }; let participations_value_result = serde_json::to_value(&filtered_participations); let participations_value = match participations_value_result { Ok(participations_value) => participations_value, Err(error) => { return Err(format!( "cannot serialize wallet participations for '{}' to JSON value: {}", wallet.address, error )); } }; let holdings_value_result = serde_json::to_value(&filtered_holdings); let holdings_value = match holdings_value_result { Ok(holdings_value) => holdings_value, Err(error) => { return Err(format!( "cannot serialize wallet holdings for '{}' to JSON value: {}", wallet.address, error )); } }; wallet_contexts.push(serde_json::json!({ "walletAddress": wallet.address, "wallet": wallet_value, "participations": participations_value, "holdings": holdings_value })); } let trade_events_result = kb_lib::list_trade_events_by_transaction_id(database.as_ref(), transaction_id).await; let transaction_trade_events = match trade_events_result { Ok(transaction_trade_events) => transaction_trade_events, Err(error) => { return Err(format!( "cannot list trade events for transaction_id '{}': {}", transaction_id, error )); } }; let mut pair_metrics = std::vec::Vec::::new(); let mut pair_candle_groups = std::vec::Vec::::new(); let mut pair_candle_count = 0_usize; let mut pair_analytic_signal_groups = std::vec::Vec::::new(); let mut pair_analytic_signal_count = 0_usize; let query_service = kb_lib::KbPairCandleQueryService::new(database.clone()); let mut timeframes = vec![60_i64, 300_i64, 900_i64, 3600_i64]; if let Some(custom_timeframe_seconds) = request.custom_timeframe_seconds { if custom_timeframe_seconds > 0 && !timeframes.contains(&custom_timeframe_seconds) { timeframes.push(custom_timeframe_seconds); } } for pair_id in &pair_ids { let pair_metric_result = kb_lib::get_pair_metric_by_pair_id(database.as_ref(), *pair_id).await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => { return Err(format!( "cannot fetch pair metric for pair_id '{}': {}", pair_id, error )); } }; if let Some(pair_metric) = pair_metric_option { pair_metrics.push(pair_metric); } let pair_signals_result = kb_lib::list_pair_analytic_signals_by_pair_id(database.as_ref(), *pair_id).await; let pair_signals = match pair_signals_result { Ok(pair_signals) => pair_signals, Err(error) => { return Err(format!( "cannot list pair analytic signals for pair_id '{}': {}", pair_id, error )); } }; pair_analytic_signal_count += pair_signals.len(); let pair_signals_value_result = serde_json::to_value(&pair_signals); let pair_signals_value = match pair_signals_value_result { Ok(pair_signals_value) => pair_signals_value, Err(error) => { return Err(format!( "cannot serialize pair analytic signals for pair_id '{}': {}", pair_id, error )); } }; pair_analytic_signal_groups.push(serde_json::json!({ "pairId": pair_id, "signals": pair_signals_value })); for timeframe_seconds in &timeframes { let candles_result = if *timeframe_seconds == 60 || *timeframe_seconds == 300 || *timeframe_seconds == 900 || *timeframe_seconds == 3600 { kb_lib::list_pair_candles_by_pair_and_timeframe( database.as_ref(), *pair_id, *timeframe_seconds, ) .await } else { query_service .list_pair_candles(*pair_id, *timeframe_seconds, None, None, false) .await }; let candles = match candles_result { Ok(candles) => candles, Err(error) => { return Err(format!( "cannot list/rebuild pair candles for pair_id '{}' timeframe '{}': {}", pair_id, timeframe_seconds, error )); } }; pair_candle_count += candles.len(); let candles_value_result = serde_json::to_value(&candles); let candles_value = match candles_value_result { Ok(candles_value) => candles_value, Err(error) => { return Err(format!( "cannot serialize pair candles for pair_id '{}' timeframe '{}': {}", pair_id, timeframe_seconds, error )); } }; pair_candle_groups.push(serde_json::json!({ "pairId": pair_id, "timeframeSeconds": timeframe_seconds, "candles": candles_value })); } } let summary_value = serde_json::json!({ "signature": signature, "customTimeframeSeconds": request.custom_timeframe_seconds, "decodedEventCount": decoded_events.len(), "poolCount": pools.len(), "pairCount": pairs.len(), "launchAttributionCount": launch_attributions.len(), "poolOriginCount": pool_origins.len(), "walletCount": wallet_contexts.len(), "walletParticipationCount": wallet_participation_count, "walletHoldingCount": wallet_holding_count, "transactionTradeEventCount": transaction_trade_events.len(), "pairMetricCount": pair_metrics.len(), "pairCandleGroupCount": pair_candle_groups.len(), "pairCandleCount": pair_candle_count, "pairAnalyticSignalGroupCount": pair_analytic_signal_groups.len(), "pairAnalyticSignalCount": pair_analytic_signal_count }); let summary_json_result = kb_to_pretty_json(&summary_value, "summary"); let summary_json = match summary_json_result { Ok(summary_json) => summary_json, Err(error) => return Err(error), }; let transaction_json_result = kb_to_pretty_json(&transaction, "transaction"); let transaction_json = match transaction_json_result { Ok(transaction_json) => transaction_json, Err(error) => return Err(error), }; let decoded_events_json_result = kb_to_pretty_json(&decoded_events, "decoded events"); let decoded_events_json = match decoded_events_json_result { Ok(decoded_events_json) => decoded_events_json, Err(error) => return Err(error), }; let pools_json_result = kb_to_pretty_json(&pools, "pools"); let pools_json = match pools_json_result { Ok(pools_json) => pools_json, Err(error) => return Err(error), }; let pairs_json_result = kb_to_pretty_json(&pairs, "pairs"); let pairs_json = match pairs_json_result { Ok(pairs_json) => pairs_json, Err(error) => return Err(error), }; let launch_attributions_json_result = kb_to_pretty_json(&launch_attributions, "launch attributions"); let launch_attributions_json = match launch_attributions_json_result { Ok(launch_attributions_json) => launch_attributions_json, Err(error) => return Err(error), }; let pool_origins_json_result = kb_to_pretty_json(&pool_origins, "pool origins"); let pool_origins_json = match pool_origins_json_result { Ok(pool_origins_json) => pool_origins_json, Err(error) => return Err(error), }; let wallets_json_result = kb_to_pretty_json(&wallet_contexts, "wallet contexts"); let wallets_json = match wallets_json_result { Ok(wallets_json) => wallets_json, Err(error) => return Err(error), }; let trade_events_json_result = kb_to_pretty_json(&transaction_trade_events, "trade events"); let trade_events_json = match trade_events_json_result { Ok(trade_events_json) => trade_events_json, Err(error) => return Err(error), }; let pair_metrics_json_result = kb_to_pretty_json(&pair_metrics, "pair metrics"); let pair_metrics_json = match pair_metrics_json_result { Ok(pair_metrics_json) => pair_metrics_json, Err(error) => return Err(error), }; let pair_candles_json_result = kb_to_pretty_json(&pair_candle_groups, "pair candles"); let pair_candles_json = match pair_candles_json_result { Ok(pair_candles_json) => pair_candles_json, Err(error) => return Err(error), }; let pair_analytic_signals_json_result = kb_to_pretty_json(&pair_analytic_signal_groups, "pair analytic signals"); let pair_analytic_signals_json = match pair_analytic_signals_json_result { Ok(pair_analytic_signals_json) => pair_analytic_signals_json, Err(error) => return Err(error), }; Ok(KbDemoPipelineInspectPayload { signature, summary_json, transaction_json, decoded_events_json, pools_json, pairs_json, launch_attributions_json, pool_origins_json, wallets_json, trade_events_json, pair_metrics_json, pair_candles_json, pair_analytic_signals_json, }) } async fn kb_demo_pipeline_build_pair_payload( database: std::sync::Arc, database_url: std::string::String, inspected_pair: kb_lib::KbPairDto, custom_timeframe_seconds: std::option::Option, object_key: std::string::String, ) -> Result { let pair_id = match inspected_pair.id { Some(pair_id) => pair_id, None => { return Err("inspected pair has no internal id".to_string()); } }; let pools_result = kb_lib::list_pools(database.as_ref()).await; let all_pools = match pools_result { Ok(all_pools) => all_pools, Err(error) => { return Err(format!("cannot list pools from database: {}", error)); } }; let mut inspected_pool_option = std::option::Option::::None; for pool in all_pools { let pool_id_option = pool.id; let pool_id = match pool_id_option { Some(pool_id) => pool_id, None => continue, }; if pool_id == inspected_pair.pool_id { inspected_pool_option = Some(pool); break; } } let inspected_pool = match inspected_pool_option { Some(inspected_pool) => inspected_pool, None => { return Err(format!( "pair '{}' references unknown pool_id '{}'", pair_id, inspected_pair.pool_id )); } }; let pool_listings_result = kb_lib::list_pool_listings(database.as_ref()).await; let all_pool_listings = match pool_listings_result { Ok(all_pool_listings) => all_pool_listings, Err(error) => { return Err(format!( "cannot list pool listings from database: {}", error )); } }; let mut pool_listings = std::vec::Vec::::new(); for listing in all_pool_listings { if listing.pool_id == inspected_pair.pool_id { pool_listings.push(listing); } } let launch_attributions_result = kb_lib::list_launch_attributions_by_pool_id(database.as_ref(), inspected_pair.pool_id) .await; let launch_attributions = match launch_attributions_result { Ok(launch_attributions) => launch_attributions, Err(error) => { return Err(format!( "cannot list launch attributions for pool_id '{}': {}", inspected_pair.pool_id, error )); } }; let pool_origin_result = kb_lib::get_pool_origin_by_pool_id(database.as_ref(), inspected_pair.pool_id).await; let pool_origin_option = match pool_origin_result { Ok(pool_origin_option) => pool_origin_option, Err(error) => { return Err(format!( "cannot fetch pool origin for pool_id '{}': {}", inspected_pair.pool_id, error )); } }; let trade_events_result = kb_lib::list_trade_events_by_pair_id(database.as_ref(), pair_id).await; let trade_events = match trade_events_result { Ok(trade_events) => trade_events, Err(error) => { return Err(format!( "cannot list trade events for pair_id '{}': {}", pair_id, error )); } }; let pair_metric_result = kb_lib::get_pair_metric_by_pair_id(database.as_ref(), pair_id).await; let pair_metric_option = match pair_metric_result { Ok(pair_metric_option) => pair_metric_option, Err(error) => { return Err(format!( "cannot fetch pair metric for pair_id '{}': {}", pair_id, error )); } }; let pair_signals_result = kb_lib::list_pair_analytic_signals_by_pair_id(database.as_ref(), pair_id).await; let pair_signals = match pair_signals_result { Ok(pair_signals) => pair_signals, Err(error) => { return Err(format!( "cannot list pair analytic signals for pair_id '{}': {}", pair_id, error )); } }; let query_service = kb_lib::KbPairCandleQueryService::new(database.clone()); let mut timeframes = vec![60_i64, 300_i64, 900_i64, 3600_i64]; if let Some(custom_timeframe_seconds_value) = custom_timeframe_seconds { if custom_timeframe_seconds_value > 0 && !timeframes.contains(&custom_timeframe_seconds_value) { timeframes.push(custom_timeframe_seconds_value); } } let mut pair_candle_groups = std::vec::Vec::::new(); for timeframe_seconds in &timeframes { let candles_result = if *timeframe_seconds == 60 || *timeframe_seconds == 300 || *timeframe_seconds == 900 || *timeframe_seconds == 3600 { kb_lib::list_pair_candles_by_pair_and_timeframe( database.as_ref(), pair_id, *timeframe_seconds, ) .await } else { query_service .list_pair_candles(pair_id, *timeframe_seconds, None, None, false) .await }; let candles = match candles_result { Ok(candles) => candles, Err(error) => { return Err(format!( "cannot list/rebuild pair candles for pair_id '{}' timeframe '{}': {}", pair_id, timeframe_seconds, error )); } }; let candles_value_result = serde_json::to_value(&candles); let candles_value = match candles_value_result { Ok(candles_value) => candles_value, Err(error) => { return Err(format!( "cannot serialize pair candles for pair_id '{}' timeframe '{}': {}", pair_id, timeframe_seconds, error )); } }; pair_candle_groups.push(serde_json::json!({ "pairId": pair_id, "timeframeSeconds": timeframe_seconds, "candles": candles_value })); } let wallets_result = kb_lib::list_wallets(database.as_ref()).await; let wallets = match wallets_result { Ok(wallets) => wallets, Err(error) => { return Err(format!("cannot list wallets from database: {}", error)); } }; let mut wallet_contexts = std::vec::Vec::::new(); for wallet in wallets { let wallet_id = match wallet.id { Some(wallet_id) => wallet_id, None => continue, }; let participations_result = kb_lib::list_wallet_participations_by_wallet_id(database.as_ref(), wallet_id).await; let participations = match participations_result { Ok(participations) => participations, Err(error) => { return Err(format!( "cannot list wallet participations for wallet_id '{}': {}", wallet_id, error )); } }; let mut filtered_participations = std::vec::Vec::new(); for participation in participations { let mut include_participation = false; if let Some(participation_pair_id) = participation.pair_id { if participation_pair_id == pair_id { include_participation = true; } } if !include_participation { if let Some(participation_pool_id) = participation.pool_id { if participation_pool_id == inspected_pair.pool_id { include_participation = true; } } } if include_participation { filtered_participations.push(participation); } } let holdings_result = kb_lib::list_wallet_holdings_by_wallet_id(database.as_ref(), wallet_id).await; let holdings = match holdings_result { Ok(holdings) => holdings, Err(error) => { return Err(format!( "cannot list wallet holdings for wallet_id '{}': {}", wallet_id, error )); } }; let mut filtered_holdings = std::vec::Vec::new(); for holding in holdings { let mut include_holding = false; if let Some(last_pair_id) = holding.last_pair_id { if last_pair_id == pair_id { include_holding = true; } } if !include_holding { if let Some(last_pool_id) = holding.last_pool_id { if last_pool_id == inspected_pair.pool_id { include_holding = true; } } } if include_holding { filtered_holdings.push(holding); } } if !filtered_participations.is_empty() || !filtered_holdings.is_empty() { let wallet_value_result = serde_json::to_value(&wallet); let wallet_value = match wallet_value_result { Ok(wallet_value) => wallet_value, Err(error) => { return Err(format!( "cannot serialize wallet '{}' to JSON value: {}", wallet.address, error )); } }; let participations_value_result = serde_json::to_value(&filtered_participations); let participations_value = match participations_value_result { Ok(participations_value) => participations_value, Err(error) => { return Err(format!( "cannot serialize wallet participations for '{}' to JSON value: {}", wallet.address, error )); } }; let holdings_value_result = serde_json::to_value(&filtered_holdings); let holdings_value = match holdings_value_result { Ok(holdings_value) => holdings_value, Err(error) => { return Err(format!( "cannot serialize wallet holdings for '{}' to JSON value: {}", wallet.address, error )); } }; wallet_contexts.push(serde_json::json!({ "walletAddress": wallet.address, "wallet": wallet_value, "participations": participations_value, "holdings": holdings_value })); } } let mut pair_metrics = std::vec::Vec::::new(); if let Some(pair_metric) = pair_metric_option { pair_metrics.push(pair_metric); } let pair_signals_value_result = serde_json::to_value(&pair_signals); let pair_signals_value = match pair_signals_value_result { Ok(pair_signals_value) => pair_signals_value, Err(error) => { return Err(format!( "cannot serialize pair analytic signals for pair_id '{}': {}", pair_id, error )); } }; let pair_signal_groups = vec![serde_json::json!({ "pairId": pair_id, "signals": pair_signals_value })]; let summary_value = serde_json::json!({ "mode": "pairOrPool", "databaseUrl": database_url, "objectKey": object_key, "pairId": pair_id, "poolId": inspected_pair.pool_id, "poolAddress": inspected_pool.address, "customTimeframeSeconds": custom_timeframe_seconds, "poolListingCount": pool_listings.len(), "launchAttributionCount": launch_attributions.len(), "hasPoolOrigin": pool_origin_option.is_some(), "walletContextCount": wallet_contexts.len(), "tradeEventCount": trade_events.len(), "pairMetricCount": pair_metrics.len(), "pairCandleGroupCount": pair_candle_groups.len(), "pairAnalyticSignalCount": pair_signals.len() }); let summary_json_result = kb_to_pretty_json(&summary_value, "summary"); let summary_json = match summary_json_result { Ok(summary_json) => summary_json, Err(error) => return Err(error), }; let transaction_json_result = kb_to_pretty_json(&inspected_pair, "pair"); let transaction_json = match transaction_json_result { Ok(transaction_json) => transaction_json, Err(error) => return Err(error), }; let decoded_events_json_result = kb_to_pretty_json(&pool_listings, "pool listings"); let decoded_events_json = match decoded_events_json_result { Ok(decoded_events_json) => decoded_events_json, Err(error) => return Err(error), }; let pools_json_result = kb_to_pretty_json(&vec![inspected_pool], "pool"); let pools_json = match pools_json_result { Ok(pools_json) => pools_json, Err(error) => return Err(error), }; let pairs_json_result = kb_to_pretty_json(&vec![inspected_pair], "pair"); let pairs_json = match pairs_json_result { Ok(pairs_json) => pairs_json, Err(error) => return Err(error), }; let launch_attributions_json_result = kb_to_pretty_json(&launch_attributions, "launch attributions"); let launch_attributions_json = match launch_attributions_json_result { Ok(launch_attributions_json) => launch_attributions_json, Err(error) => return Err(error), }; let pool_origins_json_result = kb_to_pretty_json(&pool_origin_option, "pool origin"); let pool_origins_json = match pool_origins_json_result { Ok(pool_origins_json) => pool_origins_json, Err(error) => return Err(error), }; let wallets_json_result = kb_to_pretty_json(&wallet_contexts, "wallet contexts"); let wallets_json = match wallets_json_result { Ok(wallets_json) => wallets_json, Err(error) => return Err(error), }; let trade_events_json_result = kb_to_pretty_json(&trade_events, "trade events"); let trade_events_json = match trade_events_json_result { Ok(trade_events_json) => trade_events_json, Err(error) => return Err(error), }; let pair_metrics_json_result = kb_to_pretty_json(&pair_metrics, "pair metrics"); let pair_metrics_json = match pair_metrics_json_result { Ok(pair_metrics_json) => pair_metrics_json, Err(error) => return Err(error), }; let pair_candles_json_result = kb_to_pretty_json(&pair_candle_groups, "pair candles"); let pair_candles_json = match pair_candles_json_result { Ok(pair_candles_json) => pair_candles_json, Err(error) => return Err(error), }; let pair_analytic_signals_json_result = kb_to_pretty_json(&pair_signal_groups, "pair analytic signals"); let pair_analytic_signals_json = match pair_analytic_signals_json_result { Ok(pair_analytic_signals_json) => pair_analytic_signals_json, Err(error) => return Err(error), }; Ok(KbDemoPipelineInspectPayload { signature: object_key, summary_json, transaction_json, decoded_events_json, pools_json, pairs_json, launch_attributions_json, pool_origins_json, wallets_json, trade_events_json, pair_metrics_json, pair_candles_json, pair_analytic_signals_json, }) } fn kb_to_pretty_json( value: &T, label: &str, ) -> Result { let json_result = serde_json::to_string_pretty(value); match json_result { Ok(json) => Ok(json), Err(error) => Err(format!("cannot serialize {} as JSON: {}", label, error)), } } fn kb_extract_wallet_addresses_from_value( value: &serde_json::Value, ) -> std::collections::BTreeSet { let mut addresses = std::collections::BTreeSet::::new(); let candidate_keys = ["creator", "poolCreator", "payer", "funder", "owner", "user"]; kb_extract_wallet_addresses_from_value_inner(value, &candidate_keys, &mut addresses); addresses } fn kb_extract_wallet_addresses_from_value_inner( value: &serde_json::Value, candidate_keys: &[&str], addresses: &mut std::collections::BTreeSet, ) { if let Some(object) = value.as_object() { for candidate_key in candidate_keys { let direct_option = object.get(*candidate_key); if let Some(direct) = direct_option { let text_option = direct.as_str(); if let Some(text) = text_option { if !text.is_empty() { addresses.insert(text.to_string()); } } } } for nested_value in object.values() { kb_extract_wallet_addresses_from_value_inner(nested_value, candidate_keys, addresses); } return; } if let Some(array) = value.as_array() { for nested_value in array { kb_extract_wallet_addresses_from_value_inner(nested_value, candidate_keys, addresses); } } }