feat: Store as much as possible from every range (#4660)

### Description

Currently, if Scraper cannot fetch and parse a block or a transaction,
it will fail the whole chunk of logs. It means that a transaction which
can be fetched and parsed, but happens to be in the same chunk as
unparseable transaction, won't be inserted into database.

This PR fixes it. Scraper will do its best to insert as many as possible
blocks and transactions into database from the chunk.

### Related issues

https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4632

### Backward compatibility

Yes

### Testing

Local run of E2E tests.
Local run of Scraper

---------

Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com>
pull/4664/head
Danil Nemirovsky 3 weeks ago committed by GitHub
parent e89f9e35d4
commit bb82b1cc01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 157
      rust/main/agents/scraper/src/chain_scraper/mod.rs
  2. 33
      rust/main/agents/scraper/src/db/message.rs
  3. 16
      rust/main/agents/scraper/src/db/payment.rs

@ -14,7 +14,7 @@ use hyperlane_core::{
HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256, HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256,
}; };
use itertools::Itertools; use itertools::Itertools;
use tracing::trace; use tracing::{trace, warn};
use crate::db::{ use crate::db::{
BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment, BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment,
@ -98,18 +98,12 @@ impl HyperlaneSqlDb {
.collect(); .collect();
trace!(?blocks, "Ensured blocks"); trace!(?blocks, "Ensured blocks");
// all txns we care about // We ensure transactions only from blocks which are inserted into database
let txns_with_ids = let txn_hash_with_block_ids = block_hash_by_txn_hash
self.ensure_txns(block_hash_by_txn_hash.into_iter().map( .into_iter()
move |(txn_hash, block_hash)| { .filter_map(move |(txn, block)| blocks.get(&block).map(|b| (txn, b.id)))
let block_info = *blocks.get(&block_hash).as_ref().unwrap(); .map(|(txn_hash, block_id)| TxnWithBlockId { txn_hash, block_id });
TxnWithBlockId { let txns_with_ids = self.ensure_txns(txn_hash_with_block_ids).await?;
txn_hash,
block_id: block_info.id,
}
},
))
.await?;
Ok(txns_with_ids.map(move |TxnWithId { hash, id: txn_id }| TxnWithId { hash, id: txn_id })) Ok(txns_with_ids.map(move |TxnWithId { hash, id: txn_id }| TxnWithId { hash, id: txn_id }))
} }
@ -118,8 +112,10 @@ impl HyperlaneSqlDb {
/// in. if it is in the database already: /// in. if it is in the database already:
/// Fetches its associated database id /// Fetches its associated database id
/// if it is not in the database already: /// if it is not in the database already:
/// Looks up its data with ethers and then returns the database id after /// Looks up its data with the chain and then returns the database id after
/// inserting it into the database. /// inserting it into the database.
/// if it cannot fetch and parse transaction, the transaction will be skipped and not returned
/// from this method.
async fn ensure_txns( async fn ensure_txns(
&self, &self,
txns: impl Iterator<Item = TxnWithBlockId>, txns: impl Iterator<Item = TxnWithBlockId>,
@ -153,7 +149,13 @@ impl HyperlaneSqlDb {
for mut chunk in as_chunks::<(&H256, &mut (Option<i64>, i64))>(txns_to_fetch, CHUNK_SIZE) { for mut chunk in as_chunks::<(&H256, &mut (Option<i64>, i64))>(txns_to_fetch, CHUNK_SIZE) {
for (hash, (_, block_id)) in chunk.iter() { for (hash, (_, block_id)) in chunk.iter() {
let info = self.provider.get_txn_by_hash(hash).await?; let info = match self.provider.get_txn_by_hash(hash).await {
Ok(info) => info,
Err(e) => {
warn!(?hash, ?e, "error fetching and parsing transaction");
continue;
}
};
hashes_to_insert.push(*hash); hashes_to_insert.push(*hash);
txns_to_insert.push(StorableTxn { txns_to_insert.push(StorableTxn {
info, info,
@ -161,35 +163,41 @@ impl HyperlaneSqlDb {
}); });
} }
// If we have no transactions to insert, we don't need to store them and update
// database transaction ids.
if txns_to_insert.is_empty() {
continue;
}
self.db.store_txns(txns_to_insert.drain(..)).await?; self.db.store_txns(txns_to_insert.drain(..)).await?;
let ids = self.db.get_txn_ids(hashes_to_insert.drain(..)).await?; let ids = self.db.get_txn_ids(hashes_to_insert.drain(..)).await?;
for (hash, (txn_id, _block_id)) in chunk.iter_mut() { for (hash, (txn_id, _block_id)) in chunk.iter_mut() {
let _ = txn_id.insert(ids[hash]); *txn_id = ids.get(hash).copied();
} }
} }
Ok(txns let ensured_txns = txns
.into_iter() .into_iter()
.map(|(hash, (txn_id, _block_id))| TxnWithId { .filter_map(|(hash, (txn_id, _))| txn_id.map(|id| (hash, id)))
hash, .map(|(hash, id)| TxnWithId { hash, id });
id: txn_id.unwrap(),
})) Ok(ensured_txns)
} }
/// Takes a list of block hashes for each block /// Takes a list of block hashes for each block
/// if it is in the database already: /// if it is in the database already:
/// Fetches its associated database id /// Fetches its associated database id
/// if it is not in the database already: /// if it is not in the database already:
/// Looks up its data with ethers and then returns the database id after /// Looks up its data with the chain and then returns the database id after
/// inserting it into the database. /// inserting it into the database.
/// if it cannot fetch and parse block, the block will be skipped and not returned from
/// this method.
async fn ensure_blocks( async fn ensure_blocks(
&self, &self,
block_hashes: impl Iterator<Item = H256>, block_hashes: impl Iterator<Item = H256>,
) -> Result<impl Iterator<Item = BasicBlock>> { ) -> Result<impl Iterator<Item = BasicBlock>> {
// mapping of block hash to the database id and block timestamp. Optionals are // Mapping of block hash to `BasicBlock` which contains database block id and block hash.
// in place because we will find the timestamp first if the block was not
// already in the db.
let mut blocks: HashMap<H256, Option<BasicBlock>> = let mut blocks: HashMap<H256, Option<BasicBlock>> =
block_hashes.map(|b| (b, None)).collect(); block_hashes.map(|b| (b, None)).collect();
@ -222,7 +230,13 @@ impl HyperlaneSqlDb {
for chunk in as_chunks(blocks_to_fetch, CHUNK_SIZE) { for chunk in as_chunks(blocks_to_fetch, CHUNK_SIZE) {
debug_assert!(!chunk.is_empty()); debug_assert!(!chunk.is_empty());
for (hash, block_info) in chunk { for (hash, block_info) in chunk {
let info = self.provider.get_block_by_hash(hash).await?; let info = match self.provider.get_block_by_hash(hash).await {
Ok(info) => info,
Err(e) => {
warn!(?hash, ?e, "error fetching and parsing block");
continue;
}
};
let basic_info_ref = block_info.insert(BasicBlock { let basic_info_ref = block_info.insert(BasicBlock {
id: -1, id: -1,
hash: *hash, hash: *hash,
@ -231,6 +245,12 @@ impl HyperlaneSqlDb {
hashes_to_insert.push(hash); hashes_to_insert.push(hash);
} }
// If we have no blocks to insert, we don't store them and we don't update
// database block ids.
if blocks_to_insert.is_empty() {
continue;
}
self.db self.db
.store_blocks( .store_blocks(
self.domain().id(), self.domain().id(),
@ -249,28 +269,25 @@ impl HyperlaneSqlDb {
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>();
for (block_ref, _) in blocks_to_insert.drain(..) { for (block_ref, _) in blocks_to_insert.drain(..) {
block_ref.id = hashes[&block_ref.hash]; if let Some(id) = hashes.get(&block_ref.hash) {
block_ref.id = *id;
}
} }
} }
// ensure we have updated all the block ids and that we have info for all of let ensured_blocks = blocks
// them.
#[cfg(debug_assertions)]
for (hash, block) in blocks.iter() {
let block = block.as_ref().unwrap();
assert_eq!(hash, &block.hash);
assert!(block.id > 0);
}
Ok(blocks
.into_iter() .into_iter()
.map(|(hash, block_info)| block_info.unwrap())) .filter_map(|(hash, block_info)| block_info.filter(|b| b.id != -1));
Ok(ensured_blocks)
} }
} }
#[async_trait] #[async_trait]
impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb { impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb {
/// Store messages from the origin mailbox into the database. /// Store dispatched messages from the origin mailbox into the database.
/// We store only messages from blocks and transaction which we could successfully insert
/// into database.
async fn store_logs(&self, messages: &[(Indexed<HyperlaneMessage>, LogMeta)]) -> Result<u32> { async fn store_logs(&self, messages: &[(Indexed<HyperlaneMessage>, LogMeta)]) -> Result<u32> {
if messages.is_empty() { if messages.is_empty() {
return Ok(0); return Ok(0);
@ -280,20 +297,18 @@ impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb {
.await? .await?
.map(|t| (t.hash, t)) .map(|t| (t.hash, t))
.collect(); .collect();
let storable = messages.iter().map(|m| { let storable = messages
let txn = txns .iter()
.get( .filter_map(|(message, meta)| {
&m.1.transaction_id txns.get(
&meta
.transaction_id
.try_into() .try_into()
.expect("256-bit transaction ids are the maximum supported at this time"), .expect("256-bit transaction ids are the maximum supported at this time"),
) )
.unwrap(); .map(|t| (message.inner().clone(), meta, t.id))
StorableMessage { })
msg: m.0.inner().clone(), .map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id });
meta: &m.1,
txn_id: txn.id,
}
});
let stored = self let stored = self
.db .db
.store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable) .store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable)
@ -304,6 +319,9 @@ impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb {
#[async_trait] #[async_trait]
impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb { impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {
/// Store delivered message ids from the destination mailbox into the database.
/// We store only delivered messages ids from blocks and transaction which we could successfully
/// insert into database.
async fn store_logs(&self, deliveries: &[(Indexed<Delivery>, LogMeta)]) -> Result<u32> { async fn store_logs(&self, deliveries: &[(Indexed<Delivery>, LogMeta)]) -> Result<u32> {
if deliveries.is_empty() { if deliveries.is_empty() {
return Ok(0); return Ok(0);
@ -313,22 +331,22 @@ impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {
.await? .await?
.map(|t| (t.hash, t)) .map(|t| (t.hash, t))
.collect(); .collect();
let storable = deliveries.iter().map(|(message_id, meta)| { let storable = deliveries
let txn_id = txns .iter()
.get( .filter_map(|(message_id, meta)| {
txns.get(
&meta &meta
.transaction_id .transaction_id
.try_into() .try_into()
.expect("256-bit transaction ids are the maximum supported at this time"), .expect("256-bit transaction ids are the maximum supported at this time"),
) )
.unwrap() .map(|txn| (*message_id.inner(), meta, txn.id))
.id; })
StorableDelivery { .map(|(message_id, meta, txn_id)| StorableDelivery {
message_id: *message_id.inner(), message_id,
meta, meta,
txn_id, txn_id,
} });
});
let stored = self let stored = self
.db .db
@ -340,6 +358,9 @@ impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {
#[async_trait] #[async_trait]
impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb { impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb {
/// Store interchain gas payments into the database.
/// We store only interchain gas payments from blocks and transaction which we could
/// successfully insert into database.
async fn store_logs( async fn store_logs(
&self, &self,
payments: &[(Indexed<InterchainGasPayment>, LogMeta)], payments: &[(Indexed<InterchainGasPayment>, LogMeta)],
@ -352,22 +373,22 @@ impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb {
.await? .await?
.map(|t| (t.hash, t)) .map(|t| (t.hash, t))
.collect(); .collect();
let storable = payments.iter().map(|(payment, meta)| { let storable = payments
let txn_id = txns .iter()
.get( .filter_map(|(payment, meta)| {
txns.get(
&meta &meta
.transaction_id .transaction_id
.try_into() .try_into()
.expect("256-bit transaction ids are the maximum supported at this time"), .expect("256-bit transaction ids are the maximum supported at this time"),
) )
.unwrap() .map(|txn| (payment.inner(), meta, txn.id))
.id; })
StorablePayment { .map(|(payment, meta, txn_id)| StorablePayment {
payment: payment.inner(), payment,
meta, meta,
txn_id, txn_id,
} });
});
let stored = self.db.store_payments(self.domain().id(), storable).await?; let stored = self.db.store_payments(self.domain().id(), storable).await?;
Ok(stored as u32) Ok(stored as u32)

@ -13,6 +13,7 @@ use crate::date_time;
use crate::db::ScraperDb; use crate::db::ScraperDb;
use super::generated::{delivered_message, message}; use super::generated::{delivered_message, message};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StorableDelivery<'a> { pub struct StorableDelivery<'a> {
pub message_id: H256, pub message_id: H256,
@ -178,9 +179,13 @@ impl ScraperDb {
}) })
.collect_vec(); .collect_vec();
debug_assert!(!models.is_empty());
trace!(?models, "Writing delivered messages to database"); trace!(?models, "Writing delivered messages to database");
if models.is_empty() {
debug!("Wrote zero new delivered messages to database");
return Ok(0);
}
Insert::many(models) Insert::many(models)
.on_conflict( .on_conflict(
OnConflict::columns([delivered_message::Column::MsgId]) OnConflict::columns([delivered_message::Column::MsgId])
@ -197,12 +202,10 @@ impl ScraperDb {
.deliveries_count_since_id(domain, destination_mailbox, latest_id_before) .deliveries_count_since_id(domain, destination_mailbox, latest_id_before)
.await?; .await?;
if new_deliveries_count > 0 { debug!(
debug!( messages = new_deliveries_count,
messages = new_deliveries_count, "Wrote new delivered messages to database"
"Wrote new delivered messages to database" );
);
}
Ok(new_deliveries_count) Ok(new_deliveries_count)
} }
@ -272,9 +275,13 @@ impl ScraperDb {
}) })
.collect_vec(); .collect_vec();
debug_assert!(!models.is_empty());
trace!(?models, "Writing messages to database"); trace!(?models, "Writing messages to database");
if models.is_empty() {
debug!("Wrote zero new messages to database");
return Ok(0);
}
Insert::many(models) Insert::many(models)
.on_conflict( .on_conflict(
OnConflict::columns([ OnConflict::columns([
@ -299,12 +306,10 @@ impl ScraperDb {
.dispatch_count_since_id(domain, origin_mailbox, latest_id_before) .dispatch_count_since_id(domain, origin_mailbox, latest_id_before)
.await?; .await?;
if new_dispatch_count > 0 { debug!(
debug!( messages = new_dispatch_count,
messages = new_dispatch_count, "Wrote new messages to database"
"Wrote new messages to database" );
);
}
Ok(new_dispatch_count) Ok(new_dispatch_count)
} }
} }

@ -42,9 +42,13 @@ impl ScraperDb {
}) })
.collect_vec(); .collect_vec();
debug_assert!(!models.is_empty());
trace!(?models, "Writing gas payments to database"); trace!(?models, "Writing gas payments to database");
if models.is_empty() {
debug!("Wrote zero new gas payments to database");
return Ok(0);
}
Insert::many(models) Insert::many(models)
.on_conflict( .on_conflict(
OnConflict::columns([ OnConflict::columns([
@ -67,12 +71,10 @@ impl ScraperDb {
.payments_count_since_id(domain, latest_id_before) .payments_count_since_id(domain, latest_id_before)
.await?; .await?;
if new_payments_count > 0 { debug!(
debug!( payments = new_payments_count,
payments = new_payments_count, "Wrote new gas payments to database"
"Wrote new gas payments to database" );
);
}
Ok(new_payments_count) Ok(new_payments_count)
} }

Loading…
Cancel
Save