This commit is contained in:
2026-06-09 10:13:03 +02:00
parent f2ea1a392f
commit bfdb2e69ae
41 changed files with 4485 additions and 1124 deletions

View File

@@ -482,38 +482,24 @@ impl NonTradeEventMaterializationService {
Some(decoded_event_id) => decoded_event_id,
None => return Ok(()),
};
match self.database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let delete_result = sqlx::query(
r#"
DELETE FROM k_sol_pool_admin_events
WHERE decoded_event_id = ?
"#,
)
.bind(decoded_event_id)
.execute(pool)
.await;
let delete_result = match delete_result {
Ok(delete_result) => delete_result,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot delete stale k_sol_pool_admin_events for lifecycle decoded_event_id '{}' on sqlite: {}",
decoded_event_id, error
)));
},
};
let deleted_count = delete_result.rows_affected();
if deleted_count > 0 {
tracing::debug!(
decoded_event_id = decoded_event_id,
event_kind = %decoded_event.event_kind,
deleted_count = deleted_count,
"removed stale pool admin materialization for lifecycle event"
);
}
return Ok(());
},
let delete_result = crate::query_pool_admin_events_delete_by_decoded_event_id(
self.database.as_ref(),
decoded_event_id,
)
.await;
let deleted_count = match delete_result {
Ok(deleted_count) => deleted_count,
Err(error) => return Err(error),
};
if deleted_count > 0 {
tracing::debug!(
decoded_event_id = decoded_event_id,
event_kind = %decoded_event.event_kind,
deleted_count = deleted_count,
"removed stale pool admin materialization for lifecycle event"
);
}
return Ok(());
}
async fn materialize_pool_admin_event(
@@ -712,132 +698,28 @@ WHERE decoded_event_id = ?
},
None => None,
};
match self.database.connection() {
crate::DatabaseConnection::Sqlite(pool) => {
let existing_result = sqlx::query_scalar::<sqlx::Sqlite, i64>(
r#"
SELECT id
FROM k_sol_launch_events
WHERE decoded_event_id = ?
LIMIT 1
"#,
)
.bind(decoded_event_id)
.fetch_optional(pool)
.await;
let existing_id = match existing_result {
Ok(existing_id) => existing_id,
Err(error) => {
return Err(crate::Error::Db(format!(
"cannot fetch k_sol_launch_events id for decoded_event_id '{}' on sqlite: {}",
decoded_event_id, error
)));
},
};
if let Some(existing_id) = existing_id {
let update_result = sqlx::query(
r#"
UPDATE k_sol_launch_events
SET
transaction_id = ?,
dex_id = ?,
pool_id = ?,
pair_id = ?,
signature = ?,
slot = ?,
protocol_name = ?,
program_id = ?,
event_kind = ?,
pool_account = ?,
actor_wallet = ?,
event_role = ?,
related_account = ?,
related_mint = ?,
payload_json = ?,
executed_at = ?
WHERE id = ?
"#,
)
.bind(transaction_id)
.bind(context.dex_id)
.bind(context.pool_id)
.bind(context.pair_id)
.bind(transaction.signature.clone())
.bind(slot_i64)
.bind(decoded_event.protocol_name.clone())
.bind(decoded_event.program_id.clone())
.bind(decoded_event.event_kind.clone())
.bind(decoded_event.pool_account.clone())
.bind(actor_wallet.clone())
.bind(event_role.clone())
.bind(related_account.clone())
.bind(related_mint.clone())
.bind(decoded_event.payload_json.clone())
.bind(chrono::Utc::now().to_rfc3339())
.bind(existing_id)
.execute(pool)
.await;
if let Err(error) = update_result {
return Err(crate::Error::Db(format!(
"cannot update k_sol_launch_events id '{}' on sqlite: {}",
existing_id, error
)));
}
return Ok(true);
}
let insert_result = sqlx::query(
r#"
INSERT INTO k_sol_launch_events (
transaction_id,
decoded_event_id,
dex_id,
pool_id,
pair_id,
signature,
slot,
protocol_name,
program_id,
event_kind,
pool_account,
actor_wallet,
event_role,
related_account,
related_mint,
payload_json,
executed_at,
created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(transaction_id)
.bind(decoded_event_id)
.bind(context.dex_id)
.bind(context.pool_id)
.bind(context.pair_id)
.bind(transaction.signature.clone())
.bind(slot_i64)
.bind(decoded_event.protocol_name.clone())
.bind(decoded_event.program_id.clone())
.bind(decoded_event.event_kind.clone())
.bind(decoded_event.pool_account.clone())
.bind(actor_wallet)
.bind(event_role)
.bind(related_account)
.bind(related_mint)
.bind(decoded_event.payload_json.clone())
.bind(chrono::Utc::now().to_rfc3339())
.bind(chrono::Utc::now().to_rfc3339())
.execute(pool)
.await;
if let Err(error) = insert_result {
return Err(crate::Error::Db(format!(
"cannot insert k_sol_launch_events on sqlite: {}",
error
)));
}
return Ok(true);
},
let input = crate::LaunchEventUpsertInput {
transaction_id,
decoded_event_id,
dex_id: context.dex_id,
pool_id: context.pool_id,
pair_id: context.pair_id,
signature: transaction.signature.clone(),
slot: slot_i64,
protocol_name: decoded_event.protocol_name.clone(),
program_id: decoded_event.program_id.clone(),
event_kind: decoded_event.event_kind.clone(),
pool_account: decoded_event.pool_account.clone(),
actor_wallet,
event_role,
related_account,
related_mint,
payload_json: payload.clone(),
};
let upsert_result = crate::query_launch_events_upsert(self.database.as_ref(), &input).await;
match upsert_result {
Ok(_) => return Ok(true),
Err(error) => return Err(error),
}
}
@@ -861,15 +743,51 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
};
let dex_id = match context.dex_id {
Some(dex_id) => dex_id,
None => return Ok(false),
None => {
let annotate_result = self
.annotate_decoded_event_payload(
decoded_event,
"skipLiquidityReason",
"missing_dex_catalog",
)
.await;
if let Err(error) = annotate_result {
return Err(error);
}
return Ok(false);
},
};
let pool_id = match context.pool_id {
Some(pool_id) => pool_id,
None => return Ok(false),
None => {
let annotate_result = self
.annotate_decoded_event_payload(
decoded_event,
"skipLiquidityReason",
"missing_pool_catalog",
)
.await;
if let Err(error) = annotate_result {
return Err(error);
}
return Ok(false);
},
};
let pair = match context.pair {
Some(pair) => pair,
None => return Ok(false),
None => {
let annotate_result = self
.annotate_decoded_event_payload(
decoded_event,
"skipLiquidityReason",
"missing_pair_catalog",
)
.await;
if let Err(error) = annotate_result {
return Err(error);
}
return Ok(false);
},
};
let pair_id = match pair.id {
Some(pair_id) => Some(pair_id),
@@ -1087,6 +1005,57 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
}
}
async fn annotate_decoded_event_payload(
&self,
decoded_event: &crate::DexDecodedEventDto,
reason_key: &str,
reason_value: &str,
) -> Result<(), crate::Error> {
let decoded_event_id = match decoded_event.id {
Some(decoded_event_id) => decoded_event_id,
None => return Ok(()),
};
let payload_result = serde_json::from_str::<serde_json::Value>(
decoded_event.payload_json.as_str(),
);
let mut object = match payload_result {
Ok(serde_json::Value::Object(object)) => object,
Ok(other) => {
let mut object = serde_json::Map::new();
object.insert("rawPayload".to_string(), other);
object
},
Err(_) => serde_json::Map::new(),
};
let existing_reason = match object.get(reason_key).and_then(serde_json::Value::as_str) {
Some(existing_reason) => existing_reason.trim().to_string(),
None => std::string::String::new(),
};
if existing_reason.is_empty() {
object.insert(
reason_key.to_string(),
serde_json::Value::String(reason_value.to_string()),
);
}
if reason_key == "skipLiquidityReason" {
object.insert(
"skipCatalogReason".to_string(),
serde_json::Value::String(reason_value.to_string()),
);
}
let payload_json = serde_json::Value::Object(object).to_string();
let update_result = crate::query_dex_decoded_events_update_payload_json_by_id(
self.database.as_ref(),
decoded_event_id,
payload_json.as_str(),
)
.await;
match update_result {
Ok(_) => return Ok(()),
Err(error) => return Err(error),
}
}
async fn resolve_liquidity_context(
&self,
transaction: &crate::ChainTransactionDto,