|
|
|
@ -12,15 +12,11 @@ use tracing::{trace, warn}; |
|
|
|
|
|
|
|
|
|
use hyperlane_base::settings::IndexSettings; |
|
|
|
|
use hyperlane_core::{ |
|
|
|
|
unwrap_or_none_result, BlockId, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, |
|
|
|
|
HyperlaneMessage, HyperlaneProvider, HyperlaneSequenceAwareIndexerStoreReader, |
|
|
|
|
HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256, H512, |
|
|
|
|
BlockId, BlockInfo, HyperlaneDomain, HyperlaneLogStore, HyperlaneProvider, |
|
|
|
|
HyperlaneWatermarkedLogStore, LogMeta, H256, H512, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
use crate::db::{ |
|
|
|
|
BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment, |
|
|
|
|
StorableTxn, |
|
|
|
|
}; |
|
|
|
|
use crate::db::{BasicBlock, BlockCursor, ScraperDb, StorableTxn}; |
|
|
|
|
|
|
|
|
|
/// Maximum number of records to query at a time. This came about because when a
|
|
|
|
|
/// lot of messages are sent in a short period of time we were ending up with a
|
|
|
|
@ -31,16 +27,16 @@ const CHUNK_SIZE: usize = 50; |
|
|
|
|
/// A chain scraper is comprised of all the information and contract/provider
|
|
|
|
|
/// connections needed to scrape the contracts on a single blockchain.
|
|
|
|
|
#[derive(Clone, Debug)] |
|
|
|
|
pub struct HyperlaneSqlDb { |
|
|
|
|
mailbox_address: H256, |
|
|
|
|
domain: HyperlaneDomain, |
|
|
|
|
db: ScraperDb, |
|
|
|
|
pub struct HyperlaneDbStore { |
|
|
|
|
pub(crate) mailbox_address: H256, |
|
|
|
|
pub(crate) domain: HyperlaneDomain, |
|
|
|
|
pub(crate) db: ScraperDb, |
|
|
|
|
provider: Arc<dyn HyperlaneProvider>, |
|
|
|
|
cursor: Arc<BlockCursor>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[allow(unused)] |
|
|
|
|
impl HyperlaneSqlDb { |
|
|
|
|
impl HyperlaneDbStore { |
|
|
|
|
pub async fn new( |
|
|
|
|
db: ScraperDb, |
|
|
|
|
mailbox_address: H256, |
|
|
|
@ -61,21 +57,11 @@ impl HyperlaneSqlDb { |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn domain(&self) -> &HyperlaneDomain { |
|
|
|
|
&self.domain |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub async fn last_message_nonce(&self) -> Result<Option<u32>> { |
|
|
|
|
self.db |
|
|
|
|
.last_message_nonce(self.domain.id(), &self.mailbox_address) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Takes a list of txn and block hashes and ensure they are all in the
|
|
|
|
|
/// database. If any are not it will fetch the data and insert them.
|
|
|
|
|
///
|
|
|
|
|
/// Returns the relevant transaction info.
|
|
|
|
|
async fn ensure_blocks_and_txns( |
|
|
|
|
pub(crate) async fn ensure_blocks_and_txns( |
|
|
|
|
&self, |
|
|
|
|
log_meta: impl Iterator<Item = &LogMeta>, |
|
|
|
|
) -> Result<impl Iterator<Item = TxnWithId>> { |
|
|
|
@ -262,7 +248,7 @@ impl HyperlaneSqlDb { |
|
|
|
|
|
|
|
|
|
self.db |
|
|
|
|
.store_blocks( |
|
|
|
|
self.domain().id(), |
|
|
|
|
self.domain.id(), |
|
|
|
|
blocks_to_insert |
|
|
|
|
.iter_mut() |
|
|
|
|
.map(|(_, info)| info.take().unwrap()), |
|
|
|
@ -293,129 +279,9 @@ impl HyperlaneSqlDb { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb { |
|
|
|
|
/// Store dispatched messages from the origin mailbox into the database.
|
|
|
|
|
/// We store only messages from blocks and transaction which we could successfully insert
|
|
|
|
|
/// into database.
|
|
|
|
|
async fn store_logs(&self, messages: &[(Indexed<HyperlaneMessage>, LogMeta)]) -> Result<u32> { |
|
|
|
|
if messages.is_empty() { |
|
|
|
|
return Ok(0); |
|
|
|
|
} |
|
|
|
|
let txns: HashMap<H512, TxnWithId> = self |
|
|
|
|
.ensure_blocks_and_txns(messages.iter().map(|r| &r.1)) |
|
|
|
|
.await? |
|
|
|
|
.map(|t| (t.hash, t)) |
|
|
|
|
.collect(); |
|
|
|
|
let storable = messages |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|(message, meta)| { |
|
|
|
|
txns.get(&meta.transaction_id) |
|
|
|
|
.map(|t| (message.inner().clone(), meta, t.id)) |
|
|
|
|
}) |
|
|
|
|
.map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id }); |
|
|
|
|
let stored = self |
|
|
|
|
.db |
|
|
|
|
.store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable) |
|
|
|
|
.await?; |
|
|
|
|
Ok(stored as u32) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb { |
|
|
|
|
/// Store delivered message ids from the destination mailbox into the database.
|
|
|
|
|
/// We store only delivered messages ids from blocks and transaction which we could successfully
|
|
|
|
|
/// insert into database.
|
|
|
|
|
async fn store_logs(&self, deliveries: &[(Indexed<Delivery>, LogMeta)]) -> Result<u32> { |
|
|
|
|
if deliveries.is_empty() { |
|
|
|
|
return Ok(0); |
|
|
|
|
} |
|
|
|
|
let txns: HashMap<H512, TxnWithId> = self |
|
|
|
|
.ensure_blocks_and_txns(deliveries.iter().map(|r| &r.1)) |
|
|
|
|
.await? |
|
|
|
|
.map(|t| (t.hash, t)) |
|
|
|
|
.collect(); |
|
|
|
|
let storable = deliveries |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|(message_id, meta)| { |
|
|
|
|
txns.get(&meta.transaction_id) |
|
|
|
|
.map(|txn| (*message_id.inner(), meta, txn.id)) |
|
|
|
|
}) |
|
|
|
|
.map(|(message_id, meta, txn_id)| StorableDelivery { |
|
|
|
|
message_id, |
|
|
|
|
meta, |
|
|
|
|
txn_id, |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
let stored = self |
|
|
|
|
.db |
|
|
|
|
.store_deliveries(self.domain().id(), self.mailbox_address, storable) |
|
|
|
|
.await?; |
|
|
|
|
Ok(stored as u32) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb { |
|
|
|
|
/// Store interchain gas payments into the database.
|
|
|
|
|
/// We store only interchain gas payments from blocks and transaction which we could
|
|
|
|
|
/// successfully insert into database.
|
|
|
|
|
async fn store_logs( |
|
|
|
|
&self, |
|
|
|
|
payments: &[(Indexed<InterchainGasPayment>, LogMeta)], |
|
|
|
|
) -> Result<u32> { |
|
|
|
|
if payments.is_empty() { |
|
|
|
|
return Ok(0); |
|
|
|
|
} |
|
|
|
|
let txns: HashMap<H512, TxnWithId> = self |
|
|
|
|
.ensure_blocks_and_txns(payments.iter().map(|r| &r.1)) |
|
|
|
|
.await? |
|
|
|
|
.map(|t| (t.hash, t)) |
|
|
|
|
.collect(); |
|
|
|
|
let storable = payments |
|
|
|
|
.iter() |
|
|
|
|
.filter_map(|(payment, meta)| { |
|
|
|
|
txns.get(&meta.transaction_id) |
|
|
|
|
.map(|txn| (payment.inner(), meta, txn.id)) |
|
|
|
|
}) |
|
|
|
|
.map(|(payment, meta, txn_id)| StorablePayment { |
|
|
|
|
payment, |
|
|
|
|
meta, |
|
|
|
|
txn_id, |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
let stored = self.db.store_payments(self.domain().id(), storable).await?; |
|
|
|
|
Ok(stored as u32) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl HyperlaneSequenceAwareIndexerStoreReader<HyperlaneMessage> for HyperlaneSqlDb { |
|
|
|
|
/// Gets a message by its nonce.
|
|
|
|
|
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<HyperlaneMessage>> { |
|
|
|
|
let message = self |
|
|
|
|
.db |
|
|
|
|
.retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence) |
|
|
|
|
.await?; |
|
|
|
|
Ok(message) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Gets the block number at which the log occurred.
|
|
|
|
|
async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result<Option<u64>> { |
|
|
|
|
let tx_id = unwrap_or_none_result!( |
|
|
|
|
self.db |
|
|
|
|
.retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, sequence) |
|
|
|
|
.await? |
|
|
|
|
); |
|
|
|
|
let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?); |
|
|
|
|
Ok(self.db.retrieve_block_number(block_id).await?) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl<T> HyperlaneWatermarkedLogStore<T> for HyperlaneSqlDb |
|
|
|
|
impl<T> HyperlaneWatermarkedLogStore<T> for HyperlaneDbStore |
|
|
|
|
where |
|
|
|
|
HyperlaneSqlDb: HyperlaneLogStore<T>, |
|
|
|
|
HyperlaneDbStore: HyperlaneLogStore<T>, |
|
|
|
|
{ |
|
|
|
|
/// Gets the block number high watermark
|
|
|
|
|
async fn retrieve_high_watermark(&self) -> Result<Option<u32>> { |
|
|
|
@ -429,9 +295,9 @@ where |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)] |
|
|
|
|
struct TxnWithId { |
|
|
|
|
hash: H512, |
|
|
|
|
id: i64, |
|
|
|
|
pub(crate) struct TxnWithId { |
|
|
|
|
pub hash: H512, |
|
|
|
|
pub id: i64, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)] |