// file: kb_lib/src/db/queries/wallet_holding.rs //! Queries for `kb_wallet_holdings`. /// Inserts or updates one wallet-holding row and returns its stable internal id. pub async fn upsert_wallet_holding( database: &crate::KbDatabase, dto: &crate::KbWalletHoldingDto, ) -> Result { match database.connection() { crate::KbDatabaseConnection::Sqlite(pool) => { let query_result = sqlx::query( r#" INSERT INTO kb_wallet_holdings ( wallet_id, token_id, first_transaction_id, last_transaction_id, last_decoded_event_id, last_pool_id, last_pair_id, last_role, balance_raw, last_slot_observed, source_kind, source_endpoint_name, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(wallet_id, token_id) DO UPDATE SET last_transaction_id = excluded.last_transaction_id, last_decoded_event_id = excluded.last_decoded_event_id, last_pool_id = excluded.last_pool_id, last_pair_id = excluded.last_pair_id, last_role = excluded.last_role, balance_raw = COALESCE(excluded.balance_raw, kb_wallet_holdings.balance_raw), last_slot_observed = excluded.last_slot_observed, source_kind = excluded.source_kind, source_endpoint_name = excluded.source_endpoint_name, updated_at = excluded.updated_at "#, ) .bind(dto.wallet_id) .bind(dto.token_id) .bind(dto.first_transaction_id) .bind(dto.last_transaction_id) .bind(dto.last_decoded_event_id) .bind(dto.last_pool_id) .bind(dto.last_pair_id) .bind(dto.last_role.clone()) .bind(dto.balance_raw.clone()) .bind(dto.last_slot_observed) .bind(dto.source_kind.to_i16()) .bind(dto.source_endpoint_name.clone()) .bind(dto.created_at.to_rfc3339()) .bind(dto.updated_at.to_rfc3339()) .execute(pool) .await; if let Err(error) = query_result { return Err(crate::KbError::Db(format!( "cannot upsert kb_wallet_holdings on sqlite: {}", error ))); } let id_result = sqlx::query_scalar::( r#" SELECT id FROM kb_wallet_holdings WHERE wallet_id = ? AND token_id = ? LIMIT 1 "#, ) .bind(dto.wallet_id) .bind(dto.token_id) .fetch_one(pool) .await; match id_result { Ok(id) => Ok(id), Err(error) => Err(crate::KbError::Db(format!( "cannot fetch kb_wallet_holdings id for wallet_id '{}' token_id '{}' on sqlite: {}", dto.wallet_id, dto.token_id, error ))), } } } } /// Returns one wallet-holding row identified by `(wallet_id, token_id)`, if it exists. pub async fn get_wallet_holding_by_wallet_and_token( database: &crate::KbDatabase, wallet_id: i64, token_id: i64, ) -> Result, crate::KbError> { match database.connection() { crate::KbDatabaseConnection::Sqlite(pool) => { let query_result = sqlx::query_as::( r#" SELECT id, wallet_id, token_id, first_transaction_id, last_transaction_id, last_decoded_event_id, last_pool_id, last_pair_id, last_role, balance_raw, last_slot_observed, source_kind, source_endpoint_name, created_at, updated_at FROM kb_wallet_holdings WHERE wallet_id = ? AND token_id = ? LIMIT 1 "#, ) .bind(wallet_id) .bind(token_id) .fetch_optional(pool) .await; let entity_option = match query_result { Ok(entity_option) => entity_option, Err(error) => { return Err(crate::KbError::Db(format!( "cannot read kb_wallet_holdings by wallet_id '{}' token_id '{}' on sqlite: {}", wallet_id, token_id, error ))); } }; match entity_option { Some(entity) => crate::KbWalletHoldingDto::try_from(entity).map(Some), None => Ok(None), } } } } /// Lists wallet-holding rows for one wallet id. pub async fn list_wallet_holdings_by_wallet_id( database: &crate::KbDatabase, wallet_id: i64, ) -> Result, crate::KbError> { match database.connection() { crate::KbDatabaseConnection::Sqlite(pool) => { let query_result = sqlx::query_as::( r#" SELECT id, wallet_id, token_id, first_transaction_id, last_transaction_id, last_decoded_event_id, last_pool_id, last_pair_id, last_role, balance_raw, last_slot_observed, source_kind, source_endpoint_name, created_at, updated_at FROM kb_wallet_holdings WHERE wallet_id = ? ORDER BY token_id ASC, id ASC "#, ) .bind(wallet_id) .fetch_all(pool) .await; let entities = match query_result { Ok(entities) => entities, Err(error) => { return Err(crate::KbError::Db(format!( "cannot list kb_wallet_holdings by wallet_id '{}' on sqlite: {}", wallet_id, error ))); } }; let mut dtos = std::vec::Vec::new(); for entity in entities { let dto_result = crate::KbWalletHoldingDto::try_from(entity); let dto = match dto_result { Ok(dto) => dto, Err(error) => return Err(error), }; dtos.push(dto); } Ok(dtos) } } }