// file: kb_lib/src/db/queries/pool_origin.rs //! Queries for `k_sol_pool_origins`. /// Inserts or updates one pool-origin row and returns its stable internal id. pub async fn query_pool_origins_upsert( database: &crate::Database, dto: &crate::PoolOriginDto, ) -> Result { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let query_result = sqlx::query( r#" INSERT INTO k_sol_pool_origins ( dex_id, pool_id, pair_id, pool_listing_id, founding_transaction_id, founding_decoded_event_id, founding_signature, founding_protocol_name, founding_event_kind, source_kind, source_endpoint_name, launch_attribution_id, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(pool_id) DO UPDATE SET pair_id = COALESCE(k_sol_pool_origins.pair_id, excluded.pair_id), pool_listing_id = COALESCE(k_sol_pool_origins.pool_listing_id, excluded.pool_listing_id), launch_attribution_id = COALESCE(k_sol_pool_origins.launch_attribution_id, excluded.launch_attribution_id), updated_at = excluded.updated_at "#, ) .bind(dto.dex_id) .bind(dto.pool_id) .bind(dto.pair_id) .bind(dto.pool_listing_id) .bind(dto.founding_transaction_id) .bind(dto.founding_decoded_event_id) .bind(dto.founding_signature.clone()) .bind(dto.founding_protocol_name.clone()) .bind(dto.founding_event_kind.clone()) .bind(dto.source_kind.to_i16()) .bind(dto.source_endpoint_name.clone()) .bind(dto.launch_attribution_id) .bind(dto.created_at.to_rfc3339()) .bind(dto.updated_at.to_rfc3339()) .execute(pool) .await; if let Err(error) = query_result { return Err(crate::Error::Db(format!( "cannot upsert k_sol_pool_origins on sqlite: {}", error ))); } let id_result = sqlx::query_scalar::( r#" SELECT id FROM k_sol_pool_origins WHERE pool_id = ? LIMIT 1 "#, ) .bind(dto.pool_id) .fetch_one(pool) .await; match id_result { Ok(id) => return Ok(id), Err(error) => { return Err(crate::Error::Db(format!( "cannot fetch k_sol_pool_origins id for pool_id '{}' on sqlite: {}", dto.pool_id, error ))); }, } }, } } /// Returns one pool-origin row identified by its pool id, if it exists. pub async fn query_pool_origins_get_by_pool_id( database: &crate::Database, pool_id: i64, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let query_result = sqlx::query_as::( r#" SELECT id, dex_id, pool_id, pair_id, pool_listing_id, founding_transaction_id, founding_decoded_event_id, founding_signature, founding_protocol_name, founding_event_kind, source_kind, source_endpoint_name, launch_attribution_id, created_at, updated_at FROM k_sol_pool_origins WHERE pool_id = ? LIMIT 1 "#, ) .bind(pool_id) .fetch_optional(pool) .await; let entity_option = match query_result { Ok(entity_option) => entity_option, Err(error) => { return Err(crate::Error::Db(format!( "cannot read k_sol_pool_origins by pool_id '{}' on sqlite: {}", pool_id, error ))); }, }; match entity_option { Some(entity) => return crate::PoolOriginDto::try_from(entity).map(Some), None => return Ok(None), } }, } } /// Lists all pool-origin rows ordered by creation time then id. pub async fn query_pool_origins_list( database: &crate::Database, ) -> Result, crate::Error> { match database.connection() { crate::DatabaseConnection::Sqlite(pool) => { let query_result = sqlx::query_as::( r#" SELECT id, dex_id, pool_id, pair_id, pool_listing_id, founding_transaction_id, founding_decoded_event_id, founding_signature, founding_protocol_name, founding_event_kind, source_kind, source_endpoint_name, launch_attribution_id, created_at, updated_at FROM k_sol_pool_origins ORDER BY created_at ASC, id ASC "#, ) .fetch_all(pool) .await; let entities = match query_result { Ok(entities) => entities, Err(error) => { return Err(crate::Error::Db(format!( "cannot list k_sol_pool_origins on sqlite: {}", error ))); }, }; let mut dtos = std::vec::Vec::new(); for entity in entities { let dto_result = crate::PoolOriginDto::try_from(entity); let dto = match dto_result { Ok(dto) => dto, Err(error) => return Err(error), }; dtos.push(dto); } return Ok(dtos); }, } }