Message explorer cleanup (#1251)
* Better hex processing * Cleanup clippy errors * Fix parsing of 160 bit values * Support multiple contracts with cursors * InboxIndexer for getting message processed events * Work on inbox indexing and delivered message linking * Use message hashes to link deliveries and also add hash indexes for common lookups * split up settings struct * Support scraper config for main fn * Multiple config parsing for scraper * Better docs and more reliant config loading * Refactoring building of contract items * Cut down on duplication * Support inbox indexer building * fmt * Scraper settings loading re-work * Fix errors, warnings, and cleanup * Fix clippy warnings * Fix hash index not supporting uniqueness * Fix after entity update * Remove special case * Better error messages for missing config values * Env prefix filtering * Improve error context for url without base error * Fix inbox domain bug * Actually I was wrong about domains, misinterpreted config * Fix loading settings method * Fix upsert error * Debugged and fixed incorrect txn linking with messages and deliveries * Cleanup * Update config for scraper * Update config for scraper * Delivered message linking; again. * Reduce call frequency of message linking query * Fixed typo * Fix init mistake * changed to agent_prefix * Domain settings * Use a loop * Add more docs * Remove semicolen * Increase linking frequency * Add moonbeam domain * Fmt * Renamed settings struct * Add docs * Add test for FROM_HEX_CHARS array * WIP * WIP * Work on provider building * WIP attempt to create different make provider with connection * Create AbacusProviders * Update tables * Update tables and revert seaorm version * Pull data from providers * Better span tracing * Fixes after merge * Apparently this file was excluded * fix addr len error * Better contexts for error decyphering during config parsing * Add optimismgoerli * Add other rollups from #1176 * correct gas used field * Fix build * Cleanup * Update transactions to have additional EIP-1559 data and also default times for easier manual ops * Fix warnings * Some splits * Flatten dir structure a bit * WIP refactoring * #1229 add index on message timestamp * Refactor out db txn fns * Refactor out db block fns * Remove the message linker * Split of db logic into sub mods * Update rust version and sea-orm and satisfy new clippy requirments * new rust version feature + cleanup * Split up syncing logic * Remove use of cell since it was making things more complicated * Fix tracing * Doc cleanup * Record data even if no messages were returned * Reduce tuple use * Keep sea-orm 0.9.3 for now * Update comment. * Remove old commentˆ * Store instead of record * Fix message leaf index metric * Move dead code allowance to fns * Remove redundant type def * Docs * fmt * Move delivery impl * More docs * Fmt * Fixes after testing * Reduce logging verbosity from span * Use IN instead of OR for finding lists of hashes * Remove unused codepull/1277/head
parent
d92ea5124f
commit
0b09754922
@ -1,3 +1,3 @@ |
||||
[toolchain] |
||||
channel = "1.63" |
||||
channel = "1.65" |
||||
profile = "default" |
||||
|
@ -0,0 +1,201 @@ |
||||
use std::collections::HashMap; |
||||
use std::sync::Arc; |
||||
|
||||
use async_trait::async_trait; |
||||
use eyre::WrapErr; |
||||
use tokio::task::JoinHandle; |
||||
use tracing::instrument::Instrumented; |
||||
use tracing::{info_span, trace, Instrument}; |
||||
|
||||
use abacus_base::{ |
||||
run_all, BaseAgent, ChainSetup, ContractSyncMetrics, CoreMetrics, DomainSettings, |
||||
InboxAddresses, IndexSettings, |
||||
}; |
||||
use abacus_core::{AbacusChain, Inbox}; |
||||
|
||||
use crate::chain_scraper::{Local, Remote, SqlChainScraper}; |
||||
use crate::db::ScraperDb; |
||||
use crate::settings::ScraperSettings; |
||||
|
||||
/// A message explorer scraper agent
|
||||
#[derive(Debug)] |
||||
#[allow(unused)] |
||||
pub struct Scraper { |
||||
db: ScraperDb, |
||||
metrics: Arc<CoreMetrics>, |
||||
/// A map of scrapers by domain.
|
||||
scrapers: HashMap<u32, SqlChainScraper>, |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl BaseAgent for Scraper { |
||||
const AGENT_NAME: &'static str = "scraper"; |
||||
type Settings = ScraperSettings; |
||||
|
||||
async fn from_settings( |
||||
settings: Self::Settings, |
||||
metrics: Arc<CoreMetrics>, |
||||
) -> eyre::Result<Self> |
||||
where |
||||
Self: Sized, |
||||
{ |
||||
let db = ScraperDb::connect(&settings.app.db).await?; |
||||
|
||||
// so the challenge here is that the config files were written in a way that
|
||||
// makes a lot of sense for relayers but not a lot of sense for scraping
|
||||
// all data from a given chain at a time...
|
||||
//
|
||||
// Basically the format provided is Outbox + all destination Inboxes that
|
||||
// messages from the outbox will get written to.
|
||||
//
|
||||
// Instead, we want the Outbox + all Inboxes that are on the same local chain.
|
||||
|
||||
// outboxes by their local_domain
|
||||
let mut locals: HashMap<u32, Local> = HashMap::new(); |
||||
// index settings for each domain
|
||||
let mut index_settings: HashMap<u32, IndexSettings> = HashMap::new(); |
||||
// inboxes by their local_domain, remote_domain
|
||||
let mut remotes: HashMap<u32, HashMap<u32, Remote>> = HashMap::new(); |
||||
|
||||
for (outbox_domain, chain_config) in settings.chains.into_iter() { |
||||
let ctx = || format!("Loading chain {}", chain_config.outbox.name); |
||||
if let Some(local) = Self::load_local(&chain_config, &metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
{ |
||||
trace!(domain = outbox_domain, "Created outbox and outbox indexer"); |
||||
assert_eq!(local.outbox.local_domain(), outbox_domain); |
||||
locals.insert(outbox_domain, local); |
||||
} |
||||
|
||||
for (_, inbox_config) in chain_config.inboxes.iter() { |
||||
if let Some(remote) = Self::load_remote(&chain_config, inbox_config, &metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
{ |
||||
let inbox_remote_domain = remote.inbox.remote_domain(); |
||||
let inbox_local_domain = remote.inbox.local_domain(); |
||||
assert_eq!(inbox_remote_domain, outbox_domain); |
||||
assert_ne!( |
||||
inbox_local_domain, outbox_domain, |
||||
"Attempting to load inbox for the chain we are on" |
||||
); |
||||
|
||||
trace!( |
||||
local_domain = inbox_local_domain, |
||||
remote_domain = inbox_remote_domain, |
||||
"Created inbox and inbox indexer" |
||||
); |
||||
remotes |
||||
.entry(inbox_local_domain) |
||||
.or_default() |
||||
.insert(inbox_remote_domain, remote); |
||||
} |
||||
} |
||||
|
||||
index_settings.insert(outbox_domain, chain_config.index); |
||||
} |
||||
|
||||
let contract_sync_metrics = ContractSyncMetrics::new(metrics.clone()); |
||||
let mut scrapers: HashMap<u32, SqlChainScraper> = HashMap::new(); |
||||
for (local_domain, local) in locals.into_iter() { |
||||
let remotes = remotes.remove(&local_domain).unwrap_or_default(); |
||||
let index_settings = index_settings |
||||
.remove(&local_domain) |
||||
.expect("Missing index settings for domain"); |
||||
|
||||
let scraper = SqlChainScraper::new( |
||||
db.clone(), |
||||
local, |
||||
remotes, |
||||
&index_settings, |
||||
contract_sync_metrics.clone(), |
||||
) |
||||
.await?; |
||||
scrapers.insert(local_domain, scraper); |
||||
} |
||||
|
||||
trace!(domain_count = scrapers.len(), "Creating scraper"); |
||||
|
||||
Ok(Self { |
||||
db, |
||||
metrics, |
||||
scrapers, |
||||
}) |
||||
} |
||||
|
||||
#[allow(clippy::async_yields_async)] |
||||
async fn run(&self) -> Instrumented<JoinHandle<eyre::Result<()>>> { |
||||
let tasks = self |
||||
.scrapers |
||||
.iter() |
||||
.map(|(name, scraper)| { |
||||
let span = info_span!("ChainContractSync", %name, chain = scraper.chain_name()); |
||||
tokio::spawn(scraper.clone().sync()).instrument(span) |
||||
}) |
||||
.collect(); |
||||
|
||||
run_all(tasks) |
||||
} |
||||
} |
||||
|
||||
impl Scraper { |
||||
async fn load_local( |
||||
config: &DomainSettings, |
||||
metrics: &Arc<CoreMetrics>, |
||||
) -> eyre::Result<Option<Local>> { |
||||
Ok( |
||||
if config |
||||
.outbox |
||||
.disabled |
||||
.as_ref() |
||||
.and_then(|d| d.parse::<bool>().ok()) |
||||
.unwrap_or(false) |
||||
{ |
||||
None |
||||
} else { |
||||
let ctx = || format!("Loading local {}", config.outbox.name); |
||||
Some(Local { |
||||
provider: config.try_provider(metrics).await.with_context(ctx)?.into(), |
||||
outbox: config.try_outbox(metrics).await.with_context(ctx)?.into(), |
||||
indexer: config |
||||
.try_outbox_indexer(metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
.into(), |
||||
}) |
||||
}, |
||||
) |
||||
} |
||||
|
||||
async fn load_remote( |
||||
config: &DomainSettings, |
||||
inbox_config: &ChainSetup<InboxAddresses>, |
||||
metrics: &Arc<CoreMetrics>, |
||||
) -> eyre::Result<Option<Remote>> { |
||||
Ok( |
||||
if inbox_config |
||||
.disabled |
||||
.as_ref() |
||||
.and_then(|d| d.parse::<bool>().ok()) |
||||
.unwrap_or(false) |
||||
{ |
||||
None |
||||
} else { |
||||
let ctx = || format!("Loading remote {}", inbox_config.name); |
||||
Some(Remote { |
||||
inbox: config |
||||
.try_inbox(inbox_config, metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
.into(), |
||||
indexer: config |
||||
.try_inbox_indexer(inbox_config, metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
.into(), |
||||
}) |
||||
}, |
||||
) |
||||
} |
||||
} |
@ -0,0 +1,407 @@ |
||||
//! This module (and children) are responsible for scraping blockchain data and
|
||||
//! keeping things updated.
|
||||
|
||||
use std::collections::HashMap; |
||||
use std::future::Future; |
||||
use std::sync::Arc; |
||||
|
||||
use ethers::types::H256; |
||||
use eyre::{eyre, Result}; |
||||
use futures::TryFutureExt; |
||||
use sea_orm::prelude::TimeDateTime; |
||||
use tracing::trace; |
||||
|
||||
use abacus_base::{ContractSyncMetrics, IndexSettings}; |
||||
use abacus_core::{ |
||||
AbacusContract, AbacusProvider, BlockInfo, CommittedMessage, Inbox, InboxIndexer, LogMeta, |
||||
Outbox, OutboxIndexer, RawCommittedMessage, |
||||
}; |
||||
|
||||
use crate::chain_scraper::sync::Syncer; |
||||
use crate::date_time; |
||||
use crate::db::{ |
||||
BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorableTxn, |
||||
}; |
||||
|
||||
mod sync; |
||||
|
||||
/// Remote chain components which are on the current chain.
|
||||
/// (e.g. inbox for a remote chain deployed on the current chain).
|
||||
#[derive(Debug, Clone)] |
||||
pub struct Remote { |
||||
pub inbox: Arc<dyn Inbox>, |
||||
pub indexer: Arc<dyn InboxIndexer>, |
||||
} |
||||
|
||||
/// Local chain components like the outbox.
|
||||
#[derive(Debug, Clone)] |
||||
pub struct Local { |
||||
pub outbox: Arc<dyn Outbox>, |
||||
pub indexer: Arc<dyn OutboxIndexer>, |
||||
pub provider: Arc<dyn AbacusProvider>, |
||||
} |
||||
|
||||
/// A chain scraper is comprised of all the information and contract/provider
|
||||
/// connections needed to scrape the contracts on a single blockchain.
|
||||
#[derive(Clone, Debug)] |
||||
pub struct SqlChainScraper { |
||||
db: ScraperDb, |
||||
local: Local, |
||||
remotes: HashMap<u32, Remote>, |
||||
chunk_size: u32, |
||||
metrics: ContractSyncMetrics, |
||||
cursor: Arc<BlockCursor>, |
||||
} |
||||
|
||||
#[allow(unused)] |
||||
impl SqlChainScraper { |
||||
pub async fn new( |
||||
db: ScraperDb, |
||||
local: Local, |
||||
remotes: HashMap<u32, Remote>, |
||||
index_settings: &IndexSettings, |
||||
metrics: ContractSyncMetrics, |
||||
) -> Result<Self> { |
||||
let cursor = Arc::new( |
||||
db.block_cursor(local.outbox.local_domain(), index_settings.from() as u64) |
||||
.await?, |
||||
); |
||||
Ok(Self { |
||||
db, |
||||
local, |
||||
remotes, |
||||
chunk_size: index_settings.chunk_size(), |
||||
metrics, |
||||
cursor, |
||||
}) |
||||
} |
||||
|
||||
pub fn chain_name(&self) -> &str { |
||||
self.local.outbox.chain_name() |
||||
} |
||||
|
||||
pub fn local_domain(&self) -> u32 { |
||||
self.local.outbox.local_domain() |
||||
} |
||||
|
||||
pub fn remote_domains(&self) -> impl Iterator<Item = u32> + '_ { |
||||
self.remotes.keys().copied() |
||||
} |
||||
|
||||
pub async fn get_finalized_block_number(&self) -> Result<u32> { |
||||
self.local.indexer.get_finalized_block_number().await |
||||
} |
||||
|
||||
/// Sync contract data and other blockchain with the current chain state.
|
||||
/// This will create a long-running task that should be spawned.
|
||||
pub fn sync(self) -> impl Future<Output = Result<()>> + Send + 'static { |
||||
Syncer::new(self).and_then(|syncer| syncer.run()) |
||||
} |
||||
|
||||
/// Fetch the highest message leaf index we have seen for the local domain.
|
||||
async fn last_message_leaf_index(&self) -> Result<Option<u32>> { |
||||
self.db |
||||
.last_message_leaf_index(self.local_domain(), &self.local.outbox.address()) |
||||
.await |
||||
} |
||||
|
||||
/// Store messages from the outbox into the database.
|
||||
///
|
||||
/// Returns the highest message leaf index which was provided to this
|
||||
/// function.
|
||||
async fn store_messages( |
||||
&self, |
||||
messages: &[RawMsgWithMeta], |
||||
txns: &HashMap<H256, TxnWithIdAndTime>, |
||||
) -> Result<u32> { |
||||
debug_assert!(!messages.is_empty()); |
||||
|
||||
let max_leaf_id = messages |
||||
.iter() |
||||
.map(|m| m.raw.leaf_index) |
||||
.max() |
||||
.ok_or_else(|| eyre!("Received empty list"))?; |
||||
let parsed: Vec<(CommittedMessage, &LogMeta)> = messages |
||||
.iter() |
||||
.map(|RawMsgWithMeta { raw, meta }| { |
||||
let msg = CommittedMessage::try_from(raw)?; |
||||
debug_assert_eq!(self.local_domain(), msg.message.origin); |
||||
Ok((msg, meta)) |
||||
}) |
||||
.collect::<Result<_>>()?; |
||||
self.db |
||||
.store_messages( |
||||
&self.local.outbox.address(), |
||||
parsed.into_iter().map(|(msg, meta)| { |
||||
let txn = txns.get(&meta.transaction_hash).unwrap(); |
||||
StorableMessage { |
||||
msg, |
||||
meta, |
||||
txn_id: txn.id, |
||||
timestamp: txn.timestamp, |
||||
} |
||||
}), |
||||
) |
||||
.await?; |
||||
|
||||
Ok(max_leaf_id) |
||||
} |
||||
|
||||
/// Record that a message was delivered.
|
||||
async fn store_deliveries( |
||||
&self, |
||||
deliveries: &[Delivery], |
||||
txns: &HashMap<H256, TxnWithIdAndTime>, |
||||
) -> Result<()> { |
||||
if deliveries.is_empty() { |
||||
return Ok(()); |
||||
} |
||||
|
||||
let storable = deliveries.iter().map(|delivery| { |
||||
let txn_id = txns.get(&delivery.meta.transaction_hash).unwrap().id; |
||||
delivery.as_storable(txn_id) |
||||
}); |
||||
|
||||
self.db |
||||
.store_deliveries(self.local_domain(), storable) |
||||
.await |
||||
} |
||||
|
||||
/// Takes a list of txn and block hashes and ensure they are all in the
|
||||
/// database. If any are not it will fetch the data and insert them.
|
||||
///
|
||||
/// Returns the relevant transaction info.
|
||||
async fn ensure_blocks_and_txns( |
||||
&self, |
||||
message_metadata: impl Iterator<Item = &LogMeta>, |
||||
) -> Result<impl Iterator<Item = TxnWithIdAndTime>> { |
||||
let block_hash_by_txn_hash: HashMap<H256, H256> = message_metadata |
||||
.map(|meta| (meta.transaction_hash, meta.block_hash)) |
||||
.collect(); |
||||
|
||||
// all blocks we care about
|
||||
// hash of block maps to the block id and timestamp
|
||||
let blocks: HashMap<_, _> = self |
||||
.ensure_blocks(block_hash_by_txn_hash.values().copied()) |
||||
.await? |
||||
.into_iter() |
||||
.map(|block| (block.hash, block)) |
||||
.collect(); |
||||
trace!(?blocks, "Ensured blocks"); |
||||
|
||||
// not sure why rust can't detect the lifetimes here are valid, but just
|
||||
// wrapping with the Arc/mutex for now.
|
||||
let block_timestamps_by_txn: Arc<std::sync::Mutex<HashMap<H256, TimeDateTime>>> = |
||||
Default::default(); |
||||
|
||||
let block_timestamps_by_txn_clone = block_timestamps_by_txn.clone(); |
||||
// all txns we care about
|
||||
let txns_with_ids = |
||||
self.ensure_txns(block_hash_by_txn_hash.into_iter().map( |
||||
move |(txn_hash, block_hash)| { |
||||
let mut block_timestamps_by_txn = block_timestamps_by_txn_clone.lock().unwrap(); |
||||
let block_info = *blocks.get(&block_hash).as_ref().unwrap(); |
||||
block_timestamps_by_txn.insert(txn_hash, block_info.timestamp); |
||||
TxnWithBlockId { |
||||
txn_hash, |
||||
block_id: block_info.id, |
||||
} |
||||
}, |
||||
)) |
||||
.await?; |
||||
|
||||
Ok( |
||||
txns_with_ids.map(move |TxnWithId { hash, id: txn_id }| TxnWithIdAndTime { |
||||
hash, |
||||
id: txn_id, |
||||
timestamp: *block_timestamps_by_txn.lock().unwrap().get(&hash).unwrap(), |
||||
}), |
||||
) |
||||
} |
||||
|
||||
/// Takes a list of transaction hashes and the block id the transaction is
|
||||
/// in. if it is in the database already:
|
||||
/// Fetches its associated database id
|
||||
/// if it is not in the database already:
|
||||
/// Looks up its data with ethers and then returns the database id after
|
||||
/// inserting it into the database.
|
||||
async fn ensure_txns( |
||||
&self, |
||||
txns: impl Iterator<Item = TxnWithBlockId>, |
||||
) -> Result<impl Iterator<Item = TxnWithId>> { |
||||
// mapping of txn hash to (txn_id, block_id).
|
||||
let mut txns: HashMap<H256, (Option<i64>, i64)> = txns |
||||
.map(|TxnWithBlockId { txn_hash, block_id }| (txn_hash, (None, block_id))) |
||||
.collect(); |
||||
|
||||
let db_txns = if !txns.is_empty() { |
||||
self.db.get_txn_ids(txns.keys()).await? |
||||
} else { |
||||
HashMap::new() |
||||
}; |
||||
for (hash, id) in db_txns { |
||||
// insert the txn id now that we have it to the Option value in txns
|
||||
let _ = txns |
||||
.get_mut(&hash) |
||||
.expect("We found a txn that we did not request") |
||||
.0 |
||||
.insert(id); |
||||
} |
||||
|
||||
// insert any txns that were not known and get their IDs
|
||||
// use this vec as temporary list of mut refs so we can update once we get back
|
||||
// the ids.
|
||||
let mut txns_to_insert: Vec<(&H256, &mut (Option<i64>, i64))> = |
||||
txns.iter_mut().filter(|(_, id)| id.0.is_none()).collect(); |
||||
|
||||
let mut storable: Vec<StorableTxn> = Vec::with_capacity(txns_to_insert.len()); |
||||
let as_f64 = ethers::types::U256::to_f64_lossy; |
||||
for (hash, (_, block_id)) in txns_to_insert.iter() { |
||||
storable.push(StorableTxn { |
||||
info: self.local.provider.get_txn_by_hash(hash).await?, |
||||
block_id: *block_id, |
||||
}); |
||||
} |
||||
|
||||
if !storable.is_empty() { |
||||
let mut cur_id = self.db.store_txns(storable.into_iter()).await?; |
||||
for (_hash, (txn_id, _block_id)) in txns_to_insert.iter_mut() { |
||||
debug_assert!(cur_id > 0); |
||||
let _ = txn_id.insert(cur_id); |
||||
cur_id += 1; |
||||
} |
||||
} |
||||
drop(txns_to_insert); |
||||
|
||||
Ok(txns |
||||
.into_iter() |
||||
.map(|(hash, (txn_id, _block_id))| TxnWithId { |
||||
hash, |
||||
id: txn_id.unwrap(), |
||||
})) |
||||
} |
||||
|
||||
/// Takes a list of block hashes for each block
|
||||
/// if it is in the database already:
|
||||
/// Fetches its associated database id
|
||||
/// if it is not in the database already:
|
||||
/// Looks up its data with ethers and then returns the database id after
|
||||
/// inserting it into the database.
|
||||
async fn ensure_blocks( |
||||
&self, |
||||
block_hashes: impl Iterator<Item = H256>, |
||||
) -> Result<impl Iterator<Item = BasicBlock>> { |
||||
// mapping of block hash to the database id and block timestamp. Optionals are
|
||||
// in place because we will find the timestamp first if the block was not
|
||||
// already in the db.
|
||||
let mut blocks: HashMap<H256, Option<BasicBlock>> = |
||||
block_hashes.map(|b| (b, None)).collect(); |
||||
|
||||
let db_blocks: Vec<BasicBlock> = if !blocks.is_empty() { |
||||
// check database to see which blocks we already know and fetch their IDs
|
||||
self.db |
||||
.get_block_basic(blocks.iter().map(|(hash, _)| hash)) |
||||
.await? |
||||
} else { |
||||
vec![] |
||||
}; |
||||
|
||||
for block in db_blocks { |
||||
let _ = blocks |
||||
.get_mut(&block.hash) |
||||
.expect("We found a block that we did not request") |
||||
.insert(block); |
||||
} |
||||
|
||||
// insert any blocks that were not known and get their IDs
|
||||
// use this vec as temporary list of mut refs so we can update their ids once we
|
||||
// have inserted them into the database.
|
||||
// Block info is an option so we can move it, must always be Some before
|
||||
// inserted into db.
|
||||
let mut blocks_to_insert: Vec<(&mut BasicBlock, Option<BlockInfo>)> = vec![]; |
||||
let blocks_to_fetch = blocks |
||||
.iter_mut() |
||||
.filter(|(_, block_info)| block_info.is_none()); |
||||
for (hash, block_info) in blocks_to_fetch { |
||||
let info = self.local.provider.get_block_by_hash(hash).await?; |
||||
let basic_info_ref = block_info.insert(BasicBlock { |
||||
id: -1, |
||||
hash: *hash, |
||||
timestamp: date_time::from_unix_timestamp_s(info.timestamp), |
||||
}); |
||||
blocks_to_insert.push((basic_info_ref, Some(info))); |
||||
} |
||||
|
||||
if !blocks_to_insert.is_empty() { |
||||
let mut cur_id = self |
||||
.db |
||||
.store_blocks( |
||||
self.local_domain(), |
||||
blocks_to_insert |
||||
.iter_mut() |
||||
.map(|(_, info)| info.take().unwrap()), |
||||
) |
||||
.await?; |
||||
for (block_ref, _) in blocks_to_insert.into_iter() { |
||||
block_ref.id = cur_id; |
||||
cur_id += 1; |
||||
} |
||||
} |
||||
|
||||
// ensure we have updated all the block ids and that we have info for all of
|
||||
// them.
|
||||
#[cfg(debug_assertions)] |
||||
for (hash, block) in blocks.iter() { |
||||
let block = block.as_ref().unwrap(); |
||||
assert_eq!(hash, &block.hash); |
||||
assert!(block.id > 0); |
||||
} |
||||
|
||||
Ok(blocks |
||||
.into_iter() |
||||
.map(|(hash, block_info)| block_info.unwrap())) |
||||
} |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct Delivery { |
||||
inbox: H256, |
||||
message_hash: H256, |
||||
meta: LogMeta, |
||||
} |
||||
|
||||
impl Delivery { |
||||
fn as_storable(&self, txn_id: i64) -> StorableDelivery { |
||||
StorableDelivery { |
||||
inbox: self.inbox, |
||||
message_hash: self.message_hash, |
||||
meta: &self.meta, |
||||
txn_id, |
||||
} |
||||
} |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct TxnWithIdAndTime { |
||||
hash: H256, |
||||
id: i64, |
||||
timestamp: TimeDateTime, |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct TxnWithId { |
||||
hash: H256, |
||||
id: i64, |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct TxnWithBlockId { |
||||
txn_hash: H256, |
||||
block_id: i64, |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct RawMsgWithMeta { |
||||
raw: RawCommittedMessage, |
||||
meta: LogMeta, |
||||
} |
@ -0,0 +1,272 @@ |
||||
use std::cmp::min; |
||||
use std::collections::HashMap; |
||||
use std::ops::Deref; |
||||
use std::time::Duration; |
||||
|
||||
use ethers::prelude::H256; |
||||
use eyre::Result; |
||||
use prometheus::{IntCounter, IntGauge, IntGaugeVec}; |
||||
use tokio::time::sleep; |
||||
use tracing::{debug, info, instrument, warn}; |
||||
|
||||
use abacus_base::last_message::validate_message_continuity; |
||||
use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity}; |
||||
|
||||
use crate::chain_scraper::{Delivery, RawMsgWithMeta, SqlChainScraper, TxnWithIdAndTime}; |
||||
|
||||
/// Workhorse of synchronization. This consumes a `SqlChainScraper` which has
|
||||
/// the needed connections and information to work and then adds additional
|
||||
/// running state that can be modified. This is a fn-like struct which allows us
|
||||
/// to pass a bunch of state around without having a lot of arguments to
|
||||
/// functions.
|
||||
///
|
||||
/// Conceptually this is *just* sync loop code with initial vars being
|
||||
/// configured but as a struct + multiple functions.
|
||||
pub(super) struct Syncer { |
||||
scraper: SqlChainScraper, |
||||
indexed_message_height: IntGauge, |
||||
indexed_deliveries_height: IntGauge, |
||||
stored_messages: IntCounter, |
||||
stored_deliveries: IntCounter, |
||||
missed_messages: IntCounter, |
||||
message_leaf_index: IntGaugeVec, |
||||
chunk_size: u32, |
||||
|
||||
from: u32, |
||||
last_valid_range_start_block: u32, |
||||
last_leaf_index: u32, |
||||
} |
||||
|
||||
impl Deref for Syncer { |
||||
type Target = SqlChainScraper; |
||||
|
||||
fn deref(&self) -> &Self::Target { |
||||
&self.scraper |
||||
} |
||||
} |
||||
|
||||
impl Syncer { |
||||
/// Create a new syncer from the `SqlChainScraper` which holds the needed
|
||||
/// information and connections to create the running state.
|
||||
///
|
||||
/// **Note:** Run must be called for syncing to commence.
|
||||
#[instrument(skip_all)] |
||||
pub async fn new(scraper: SqlChainScraper) -> Result<Self> { |
||||
let chain_name = scraper.chain_name(); |
||||
let message_labels = ["messages", chain_name]; |
||||
let deliveries_labels = ["deliveries", chain_name]; |
||||
|
||||
let indexed_message_height = scraper |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&message_labels); |
||||
let indexed_deliveries_height = scraper |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&deliveries_labels); |
||||
let stored_messages = scraper |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&message_labels); |
||||
let stored_deliveries = scraper |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&deliveries_labels); |
||||
let missed_messages = scraper |
||||
.metrics |
||||
.missed_events |
||||
.with_label_values(&message_labels); |
||||
let message_leaf_index = scraper.metrics.message_leaf_index.clone(); |
||||
|
||||
let chunk_size = scraper.chunk_size; |
||||
let from = scraper.cursor.height().await as u32; |
||||
let last_valid_range_start_block = from; |
||||
let last_leaf_index = scraper.last_message_leaf_index().await?.unwrap_or(0); |
||||
|
||||
Ok(Self { |
||||
scraper, |
||||
indexed_message_height, |
||||
indexed_deliveries_height, |
||||
stored_messages, |
||||
stored_deliveries, |
||||
missed_messages, |
||||
message_leaf_index, |
||||
chunk_size, |
||||
from, |
||||
last_valid_range_start_block, |
||||
last_leaf_index, |
||||
}) |
||||
} |
||||
|
||||
/// Sync contract and other blockchain data with the current chain state.
|
||||
#[instrument(skip(self), fields(chain_name = self.chain_name(), chink_size = self.chunk_size))] |
||||
pub async fn run(mut self) -> Result<()> { |
||||
info!(from = self.from, "Resuming chain sync"); |
||||
|
||||
loop { |
||||
self.indexed_message_height.set(self.from as i64); |
||||
self.indexed_deliveries_height.set(self.from as i64); |
||||
sleep(Duration::from_secs(5)).await; |
||||
|
||||
let Ok(tip) = self.get_finalized_block_number().await else { |
||||
continue; |
||||
}; |
||||
if tip <= self.from { |
||||
sleep(Duration::from_secs(10)).await; |
||||
continue; |
||||
} |
||||
|
||||
let to = min(tip, self.from + self.chunk_size); |
||||
let full_chunk_from = to.checked_sub(self.chunk_size).unwrap_or_default(); |
||||
debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain()); |
||||
let (sorted_messages, deliveries) = self.scrape_range(full_chunk_from, to).await?; |
||||
|
||||
let validation = validate_message_continuity( |
||||
Some(self.last_leaf_index), |
||||
&sorted_messages.iter().map(|r| &r.raw).collect::<Vec<_>>(), |
||||
); |
||||
match validation { |
||||
ListValidity::Valid => { |
||||
let max_leaf_index_of_batch = |
||||
self.record_data(sorted_messages, deliveries).await?; |
||||
|
||||
self.cursor.update(full_chunk_from as u64).await; |
||||
if let Some(idx) = max_leaf_index_of_batch { |
||||
self.last_leaf_index = idx; |
||||
} |
||||
self.last_valid_range_start_block = full_chunk_from; |
||||
self.from = to + 1; |
||||
} |
||||
ListValidity::Empty => { |
||||
let _ = self.record_data(sorted_messages, deliveries).await?; |
||||
self.from = to + 1; |
||||
} |
||||
ListValidity::InvalidContinuation => { |
||||
self.missed_messages.inc(); |
||||
warn!( |
||||
last_leaf_index = self.last_leaf_index, |
||||
start_block = self.from, |
||||
end_block = to, |
||||
last_valid_range_start_block = self.last_valid_range_start_block, |
||||
"Found invalid continuation in range. Re-indexing from the start block of the last successful range." |
||||
); |
||||
self.from = self.last_valid_range_start_block; |
||||
} |
||||
ListValidity::ContainsGaps => { |
||||
self.missed_messages.inc(); |
||||
warn!( |
||||
last_leaf_index = self.last_leaf_index, |
||||
start_block = self.from, |
||||
end_block = to, |
||||
last_valid_range_start_block = self.last_valid_range_start_block, |
||||
"Found gaps in the message in range, re-indexing the same range." |
||||
); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Fetch contract data from a given block range.
|
||||
#[instrument(skip(self))] |
||||
async fn scrape_range( |
||||
&self, |
||||
from: u32, |
||||
to: u32, |
||||
) -> Result<(Vec<RawMsgWithMeta>, Vec<Delivery>)> { |
||||
let sorted_messages = self.local.indexer.fetch_sorted_messages(from, to).await?; |
||||
|
||||
let deliveries = self.deliveries(from, to).await?; |
||||
|
||||
info!( |
||||
from, |
||||
to, |
||||
message_count = sorted_messages.len(), |
||||
deliveries_count = deliveries.len(), |
||||
"Indexed block range for chain" |
||||
); |
||||
|
||||
let sorted_messages = sorted_messages |
||||
.into_iter() |
||||
.map(|(raw, meta)| RawMsgWithMeta { raw, meta }) |
||||
.filter(|m| m.raw.leaf_index > self.last_leaf_index) |
||||
.collect::<Vec<_>>(); |
||||
|
||||
debug!( |
||||
from, |
||||
to, |
||||
message_count = sorted_messages.len(), |
||||
"Filtered any messages already indexed for outbox." |
||||
); |
||||
|
||||
Ok((sorted_messages, deliveries)) |
||||
} |
||||
|
||||
/// get the deliveries for a given range from the inboxes.
|
||||
#[instrument(skip(self))] |
||||
async fn deliveries(&self, from: u32, to: u32) -> Result<Vec<Delivery>> { |
||||
let mut delivered = vec![]; |
||||
for (_, remote) in self.remotes.iter() { |
||||
debug_assert_eq!(remote.inbox.local_domain(), self.local_domain()); |
||||
delivered.extend( |
||||
remote |
||||
.indexer |
||||
.fetch_processed_messages(from, to) |
||||
.await? |
||||
.into_iter() |
||||
.map(|(message_hash, meta)| Delivery { |
||||
inbox: remote.inbox.address(), |
||||
message_hash, |
||||
meta, |
||||
}), |
||||
) |
||||
} |
||||
Ok(delivered) |
||||
} |
||||
|
||||
/// Record messages and deliveries, will fetch any extra data needed to do
|
||||
/// so. Returns the max leaf index or None if no messages were provided.
|
||||
#[instrument(
|
||||
skip_all, |
||||
fields(sorted_messages = sorted_messages.len(), deliveries = deliveries.len()) |
||||
)] |
||||
async fn record_data( |
||||
&self, |
||||
sorted_messages: Vec<RawMsgWithMeta>, |
||||
deliveries: Vec<Delivery>, |
||||
) -> Result<Option<u32>> { |
||||
let txns: HashMap<H256, TxnWithIdAndTime> = self |
||||
.ensure_blocks_and_txns( |
||||
sorted_messages |
||||
.iter() |
||||
.map(|r| &r.meta) |
||||
.chain(deliveries.iter().map(|d| &d.meta)), |
||||
) |
||||
.await? |
||||
.map(|t| (t.hash, t)) |
||||
.collect(); |
||||
|
||||
if !deliveries.is_empty() { |
||||
self.store_deliveries(&deliveries, &txns).await?; |
||||
self.stored_deliveries.inc_by(deliveries.len() as u64); |
||||
} |
||||
|
||||
if !sorted_messages.is_empty() { |
||||
let max_leaf_index_of_batch = self.store_messages(&sorted_messages, &txns).await?; |
||||
self.stored_messages.inc_by(sorted_messages.len() as u64); |
||||
|
||||
for m in sorted_messages.iter() { |
||||
let parsed = CommittedMessage::try_from(&m.raw).ok(); |
||||
let idx = m.raw.leaf_index; |
||||
let dst = parsed |
||||
.and_then(|msg| name_from_domain_id(msg.message.destination)) |
||||
.unwrap_or_else(|| "unknown".into()); |
||||
self.message_leaf_index |
||||
.with_label_values(&["dispatch", self.chain_name(), &dst]) |
||||
.set(idx as i64); |
||||
} |
||||
Ok(Some(max_leaf_index_of_batch)) |
||||
} else { |
||||
Ok(None) |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,24 @@ |
||||
use ethers::prelude::H256; |
||||
|
||||
/// Convert a hex string (without 0x prefix) to a H256. This handles the case where it is actually
|
||||
/// as H160 and will correctly return a H256 with the most significant bits as zero.
|
||||
pub fn parse_h256<T: AsRef<[u8]>>(data: T) -> eyre::Result<H256> { |
||||
if data.as_ref().len() == 40 { |
||||
Ok(H256(hex::parse_h256_raw::<40>( |
||||
data.as_ref().try_into().unwrap(), |
||||
)?)) |
||||
} else { |
||||
Ok(H256(hex::parse_h256_raw::<64>(data.as_ref().try_into()?)?)) |
||||
} |
||||
} |
||||
|
||||
/// Formats a H256 as a lowercase hex string without a 0x prefix. This will correctly determine if
|
||||
/// the data fits within a H160 (enough of the most significant bits are zero) and will write it as
|
||||
/// such. This will pad with zeros to fit either a H256 of H160 depending.
|
||||
pub fn format_h256(data: &H256) -> String { |
||||
if hex::is_h160(data.as_fixed_bytes()) { |
||||
hex::format_h160_raw(data.as_fixed_bytes()[12..32].try_into().unwrap()) |
||||
} else { |
||||
hex::format_h256_raw(data.as_fixed_bytes()) |
||||
} |
||||
} |
@ -0,0 +1,90 @@ |
||||
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
|
||||
|
||||
use sea_orm::entity::prelude::*; |
||||
|
||||
#[derive(Copy, Clone, Default, Debug, DeriveEntity)] |
||||
pub struct Entity; |
||||
|
||||
impl EntityName for Entity { |
||||
fn table_name(&self) -> &str { |
||||
"block" |
||||
} |
||||
} |
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel)] |
||||
pub struct Model { |
||||
pub id: i64, |
||||
pub time_created: TimeDateTime, |
||||
pub domain: i32, |
||||
pub hash: String, |
||||
pub height: i64, |
||||
pub timestamp: TimeDateTime, |
||||
} |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] |
||||
pub enum Column { |
||||
Id, |
||||
TimeCreated, |
||||
Domain, |
||||
Hash, |
||||
Height, |
||||
Timestamp, |
||||
} |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] |
||||
pub enum PrimaryKey { |
||||
Id, |
||||
} |
||||
|
||||
impl PrimaryKeyTrait for PrimaryKey { |
||||
type ValueType = i64; |
||||
fn auto_increment() -> bool { |
||||
true |
||||
} |
||||
} |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter)] |
||||
pub enum Relation { |
||||
Domain, |
||||
Transaction, |
||||
} |
||||
|
||||
impl ColumnTrait for Column { |
||||
type EntityName = Entity; |
||||
fn def(&self) -> ColumnDef { |
||||
match self { |
||||
Self::Id => ColumnType::BigInteger.def(), |
||||
Self::TimeCreated => ColumnType::DateTime.def(), |
||||
Self::Domain => ColumnType::Integer.def(), |
||||
Self::Hash => ColumnType::String(Some(64u32)).def().unique(), |
||||
Self::Height => ColumnType::BigInteger.def(), |
||||
Self::Timestamp => ColumnType::DateTime.def(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl RelationTrait for Relation { |
||||
fn def(&self) -> RelationDef { |
||||
match self { |
||||
Self::Domain => Entity::belongs_to(super::domain::Entity) |
||||
.from(Column::Domain) |
||||
.to(super::domain::Column::Id) |
||||
.into(), |
||||
Self::Transaction => Entity::has_many(super::transaction::Entity).into(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl Related<super::domain::Entity> for Entity { |
||||
fn to() -> RelationDef { |
||||
Relation::Domain.def() |
||||
} |
||||
} |
||||
|
||||
impl Related<super::transaction::Entity> for Entity { |
||||
fn to() -> RelationDef { |
||||
Relation::Transaction.def() |
||||
} |
||||
} |
||||
|
||||
impl ActiveModelBehavior for ActiveModel {} |
@ -0,0 +1,119 @@ |
||||
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
|
||||
|
||||
use sea_orm::entity::prelude::*; |
||||
|
||||
#[derive(Copy, Clone, Default, Debug, DeriveEntity)] |
||||
pub struct Entity; |
||||
|
||||
impl EntityName for Entity { |
||||
fn table_name(&self) -> &str { |
||||
"message" |
||||
} |
||||
} |
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel)] |
||||
pub struct Model { |
||||
pub id: i64, |
||||
pub time_created: TimeDateTime, |
||||
pub hash: String, |
||||
pub origin: i32, |
||||
pub destination: i32, |
||||
pub leaf_index: i32, |
||||
pub sender: String, |
||||
pub recipient: String, |
||||
pub msg_body: Option<Vec<u8>>, |
||||
pub outbox_address: String, |
||||
pub timestamp: TimeDateTime, |
||||
pub origin_tx_id: i64, |
||||
} |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] |
||||
pub enum Column { |
||||
Id, |
||||
TimeCreated, |
||||
Hash, |
||||
Origin, |
||||
Destination, |
||||
LeafIndex, |
||||
Sender, |
||||
Recipient, |
||||
MsgBody, |
||||
OutboxAddress, |
||||
Timestamp, |
||||
OriginTxId, |
||||
} |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] |
||||
pub enum PrimaryKey { |
||||
Id, |
||||
} |
||||
|
||||
impl PrimaryKeyTrait for PrimaryKey { |
||||
type ValueType = i64; |
||||
fn auto_increment() -> bool { |
||||
true |
||||
} |
||||
} |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter)] |
||||
pub enum Relation { |
||||
Domain, |
||||
Transaction, |
||||
MessageState, |
||||
} |
||||
|
||||
impl ColumnTrait for Column { |
||||
type EntityName = Entity; |
||||
fn def(&self) -> ColumnDef { |
||||
match self { |
||||
Self::Id => ColumnType::BigInteger.def(), |
||||
Self::TimeCreated => ColumnType::DateTime.def(), |
||||
Self::Hash => ColumnType::String(Some(64u32)).def().unique(), |
||||
Self::Origin => ColumnType::Integer.def(), |
||||
Self::Destination => ColumnType::Integer.def(), |
||||
Self::LeafIndex => ColumnType::Integer.def(), |
||||
Self::Sender => ColumnType::String(Some(64u32)).def(), |
||||
Self::Recipient => ColumnType::String(Some(64u32)).def(), |
||||
Self::MsgBody => ColumnType::Binary.def().null(), |
||||
Self::OutboxAddress => ColumnType::String(Some(64u32)).def(), |
||||
Self::Timestamp => ColumnType::DateTime.def(), |
||||
Self::OriginTxId => ColumnType::BigInteger.def(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl RelationTrait for Relation { |
||||
fn def(&self) -> RelationDef { |
||||
match self { |
||||
Self::Domain => Entity::belongs_to(super::domain::Entity) |
||||
.from(Column::Origin) |
||||
.to(super::domain::Column::Id) |
||||
.into(), |
||||
Self::Transaction => Entity::belongs_to(super::transaction::Entity) |
||||
.from(Column::OriginTxId) |
||||
.to(super::transaction::Column::Id) |
||||
.into(), |
||||
Self::MessageState => Entity::has_many(super::message_state::Entity).into(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl Related<super::domain::Entity> for Entity { |
||||
fn to() -> RelationDef { |
||||
Relation::Domain.def() |
||||
} |
||||
} |
||||
|
||||
impl Related<super::transaction::Entity> for Entity { |
||||
fn to() -> RelationDef { |
||||
Relation::Transaction.def() |
||||
} |
||||
} |
||||
|
||||
impl Related<super::message_state::Entity> for Entity { |
||||
fn to() -> RelationDef { |
||||
Relation::MessageState.def() |
||||
} |
||||
} |
||||
|
||||
impl ActiveModelBehavior for ActiveModel {} |
@ -0,0 +1,15 @@ |
||||
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
|
||||
|
||||
pub mod prelude; |
||||
|
||||
pub mod block; |
||||
pub mod checkpoint; |
||||
pub mod checkpoint_update; |
||||
pub mod cursor; |
||||
pub mod delivered_message; |
||||
pub mod domain; |
||||
pub mod gas_payment; |
||||
pub mod message; |
||||
pub mod message_state; |
||||
pub mod sea_orm_active_enums; |
||||
pub mod transaction; |
@ -1,15 +1,41 @@ |
||||
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
|
||||
|
||||
pub mod prelude; |
||||
|
||||
pub mod block; |
||||
pub mod checkpoint; |
||||
pub mod checkpoint_update; |
||||
pub mod cursor; |
||||
pub mod delivered_message; |
||||
pub mod domain; |
||||
pub mod gas_payment; |
||||
pub mod message; |
||||
pub mod message_state; |
||||
pub mod sea_orm_active_enums; |
||||
pub mod transaction; |
||||
use std::ops::Deref; |
||||
|
||||
use eyre::Result; |
||||
use sea_orm::{Database, DbConn}; |
||||
use tracing::instrument; |
||||
|
||||
use abacus_core::TxnInfo; |
||||
pub use block::*; |
||||
pub use block_cursor::BlockCursor; |
||||
pub use message::*; |
||||
pub use txn::*; |
||||
|
||||
#[allow(clippy::all)] |
||||
mod generated; |
||||
|
||||
// These modules implement additional functionality for the ScraperDb
|
||||
mod block; |
||||
mod block_cursor; |
||||
mod message; |
||||
mod txn; |
||||
|
||||
impl Deref for StorableTxn { |
||||
type Target = TxnInfo; |
||||
|
||||
fn deref(&self) -> &Self::Target { |
||||
&self.info |
||||
} |
||||
} |
||||
|
||||
/// Database interface to the message explorer database for the scraper. This is
|
||||
/// focused on writing data to the database.
|
||||
#[derive(Clone, Debug)] |
||||
pub struct ScraperDb(DbConn); |
||||
|
||||
impl ScraperDb { |
||||
#[instrument] |
||||
pub async fn connect(url: &str) -> Result<Self> { |
||||
let db = Database::connect(url).await?; |
||||
Ok(Self(db)) |
||||
} |
||||
} |
||||
|
@ -0,0 +1,86 @@ |
||||
use std::collections::HashMap; |
||||
|
||||
use ethers::prelude::H256; |
||||
use eyre::{eyre, Context, Result}; |
||||
use sea_orm::{prelude::*, ActiveValue::*, DeriveColumn, EnumIter, Insert, NotSet, QuerySelect}; |
||||
use tracing::{instrument, trace}; |
||||
|
||||
use abacus_core::TxnInfo; |
||||
|
||||
use crate::conversions::{format_h256, parse_h256}; |
||||
use crate::date_time; |
||||
use crate::db::ScraperDb; |
||||
|
||||
use super::generated::transaction; |
||||
|
||||
#[derive(Debug, Clone)] |
||||
pub struct StorableTxn { |
||||
pub info: TxnInfo, |
||||
pub block_id: i64, |
||||
} |
||||
|
||||
impl ScraperDb { |
||||
/// Lookup transactions and find their ids. Any transactions which are not
|
||||
/// found be excluded from the hashmap.
|
||||
pub async fn get_txn_ids( |
||||
&self, |
||||
hashes: impl Iterator<Item = &H256>, |
||||
) -> Result<HashMap<H256, i64>> { |
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] |
||||
enum QueryAs { |
||||
Id, |
||||
Hash, |
||||
} |
||||
|
||||
// check database to see which txns we already know and fetch their IDs
|
||||
transaction::Entity::find() |
||||
.filter(transaction::Column::Hash.is_in(hashes.map(format_h256))) |
||||
.select_only() |
||||
.column_as(transaction::Column::Id, QueryAs::Id) |
||||
.column_as(transaction::Column::Hash, QueryAs::Hash) |
||||
.into_values::<(i64, String), QueryAs>() |
||||
.all(&self.0) |
||||
.await |
||||
.context("When fetching transactions")? |
||||
.into_iter() |
||||
.map(|(id, hash)| Ok((parse_h256(&hash)?, id))) |
||||
.collect::<Result<_>>() |
||||
} |
||||
|
||||
/// Store a new transaction into the database (or update an existing one).
|
||||
#[instrument(skip_all)] |
||||
pub async fn store_txns(&self, txns: impl Iterator<Item = StorableTxn>) -> Result<i64> { |
||||
let as_f64 = ethers::types::U256::to_f64_lossy; |
||||
let models = txns |
||||
.map(|txn| { |
||||
let receipt = txn |
||||
.receipt |
||||
.as_ref() |
||||
.ok_or_else(|| eyre!("Transaction is not yet included"))?; |
||||
|
||||
Ok(transaction::ActiveModel { |
||||
id: NotSet, |
||||
block_id: Unchanged(txn.block_id), |
||||
gas_limit: Set(as_f64(txn.gas_limit)), |
||||
max_priority_fee_per_gas: Set(txn.max_priority_fee_per_gas.map(as_f64)), |
||||
hash: Unchanged(format_h256(&txn.hash)), |
||||
time_created: Set(date_time::now()), |
||||
gas_used: Set(as_f64(receipt.gas_used)), |
||||
gas_price: Set(txn.gas_price.map(as_f64)), |
||||
effective_gas_price: Set(receipt.effective_gas_price.map(as_f64)), |
||||
nonce: Set(txn.nonce as i64), |
||||
sender: Set(format_h256(&txn.sender)), |
||||
recipient: Set(txn.recipient.as_ref().map(format_h256)), |
||||
max_fee_per_gas: Set(txn.max_fee_per_gas.map(as_f64)), |
||||
cumulative_gas_used: Set(as_f64(receipt.cumulative_gas_used)), |
||||
}) |
||||
}) |
||||
.collect::<Result<Vec<_>>>()?; |
||||
|
||||
debug_assert!(!models.is_empty()); |
||||
trace!(?models, "Writing txns to database"); |
||||
// this is actually the ID that was first inserted for postgres
|
||||
let first_id = Insert::many(models).exec(&self.0).await?.last_insert_id; |
||||
Ok(first_id) |
||||
} |
||||
} |
@ -1,928 +0,0 @@ |
||||
use std::cmp::min; |
||||
use std::collections::HashMap; |
||||
use std::sync::Arc; |
||||
use std::time::Duration; |
||||
|
||||
use async_trait::async_trait; |
||||
use ethers::types::H256; |
||||
use eyre::{eyre, Context, Result}; |
||||
use sea_orm::prelude::TimeDateTime; |
||||
use sea_orm::{Database, DbConn}; |
||||
use tokio::task::JoinHandle; |
||||
use tokio::time::sleep; |
||||
use tracing::instrument::Instrumented; |
||||
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; |
||||
|
||||
use abacus_base::last_message::validate_message_continuity; |
||||
use abacus_base::{ |
||||
run_all, BaseAgent, ChainSetup, ContractSyncMetrics, CoreMetrics, DomainSettings, |
||||
InboxAddresses, IndexSettings, |
||||
}; |
||||
use abacus_core::{ |
||||
name_from_domain_id, AbacusContract, AbacusProvider, BlockInfo, CommittedMessage, Inbox, |
||||
InboxIndexer, ListValidity, LogMeta, Outbox, OutboxIndexer, RawCommittedMessage, |
||||
}; |
||||
|
||||
use crate::scraper::block_cursor::BlockCursor; |
||||
use crate::settings::ScraperSettings; |
||||
use crate::{date_time, format_h256, parse_h256}; |
||||
|
||||
mod block_cursor; |
||||
|
||||
/// A message explorer scraper agent
|
||||
#[derive(Debug)] |
||||
#[allow(unused)] |
||||
pub struct Scraper { |
||||
db: DbConn, |
||||
metrics: Arc<CoreMetrics>, |
||||
/// A map of scrapers by domain.
|
||||
scrapers: HashMap<u32, SqlChainScraper>, |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl BaseAgent for Scraper { |
||||
const AGENT_NAME: &'static str = "scraper"; |
||||
type Settings = ScraperSettings; |
||||
|
||||
async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self> |
||||
where |
||||
Self: Sized, |
||||
{ |
||||
let db = Database::connect(&settings.app.db).await?; |
||||
|
||||
// so the challenge here is that the config files were written in a way that
|
||||
// makes a lot of sense for relayers but not a lot of sense for scraping
|
||||
// all data from a given chain at a time...
|
||||
//
|
||||
// Basically the format provided is Outbox + all destination Inboxes that
|
||||
// messages from the outbox will get written to.
|
||||
//
|
||||
// Instead, we want the Outbox + all Inboxes that are on the same local chain.
|
||||
|
||||
// outboxes by their local_domain
|
||||
let mut locals: HashMap<u32, Local> = HashMap::new(); |
||||
// index settings for each domain
|
||||
let mut index_settings: HashMap<u32, IndexSettings> = HashMap::new(); |
||||
// inboxes by their local_domain, remote_domain
|
||||
let mut remotes: HashMap<u32, HashMap<u32, Remote>> = HashMap::new(); |
||||
|
||||
for (outbox_domain, chain_config) in settings.chains.into_iter() { |
||||
let ctx = || format!("Loading chain {}", chain_config.outbox.name); |
||||
if let Some(local) = Self::load_local(&chain_config, &metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
{ |
||||
trace!(domain = outbox_domain, "Created outbox and outbox indexer"); |
||||
assert_eq!(local.outbox.local_domain(), outbox_domain); |
||||
locals.insert(outbox_domain, local); |
||||
} |
||||
|
||||
for (_, inbox_config) in chain_config.inboxes.iter() { |
||||
if let Some(remote) = Self::load_remote(&chain_config, inbox_config, &metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
{ |
||||
let inbox_remote_domain = remote.inbox.remote_domain(); |
||||
let inbox_local_domain = remote.inbox.local_domain(); |
||||
assert_eq!(inbox_remote_domain, outbox_domain); |
||||
assert_ne!( |
||||
inbox_local_domain, outbox_domain, |
||||
"Attempting to load inbox for the chain we are on" |
||||
); |
||||
|
||||
trace!( |
||||
local_domain = inbox_local_domain, |
||||
remote_domain = inbox_remote_domain, |
||||
"Created inbox and inbox indexer" |
||||
); |
||||
remotes |
||||
.entry(inbox_local_domain) |
||||
.or_default() |
||||
.insert(inbox_remote_domain, remote); |
||||
} |
||||
} |
||||
|
||||
index_settings.insert(outbox_domain, chain_config.index); |
||||
} |
||||
|
||||
let contract_sync_metrics = ContractSyncMetrics::new(metrics.clone()); |
||||
let mut scrapers: HashMap<u32, SqlChainScraper> = HashMap::new(); |
||||
for (local_domain, local) in locals.into_iter() { |
||||
let remotes = remotes.remove(&local_domain).unwrap_or_default(); |
||||
let index_settings = index_settings |
||||
.remove(&local_domain) |
||||
.expect("Missing index settings for domain"); |
||||
|
||||
let scraper = SqlChainScraper::new( |
||||
db.clone(), |
||||
local, |
||||
remotes, |
||||
&index_settings, |
||||
contract_sync_metrics.clone(), |
||||
) |
||||
.await?; |
||||
scrapers.insert(local_domain, scraper); |
||||
} |
||||
|
||||
trace!(domain_count = scrapers.len(), "Creating scraper"); |
||||
|
||||
Ok(Self { |
||||
db, |
||||
metrics, |
||||
scrapers, |
||||
}) |
||||
} |
||||
|
||||
#[allow(clippy::async_yields_async)] |
||||
async fn run(&self) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let tasks = self |
||||
.scrapers |
||||
.iter() |
||||
.map(|(name, scraper)| { |
||||
let span = info_span!("ChainContractSync", %name, chain = scraper.local.outbox.chain_name()); |
||||
let syncer = scraper.clone().sync(); |
||||
tokio::spawn(syncer).instrument(span) |
||||
}) |
||||
.chain( |
||||
// TODO: remove this during refactoring if we no longer need it
|
||||
[tokio::spawn(delivered_message_linker(self.db.clone())) |
||||
.instrument(info_span!("DeliveredMessageLinker"))] |
||||
.into_iter(), |
||||
) |
||||
.collect(); |
||||
|
||||
run_all(tasks) |
||||
} |
||||
} |
||||
|
||||
impl Scraper { |
||||
async fn load_local( |
||||
config: &DomainSettings, |
||||
metrics: &Arc<CoreMetrics>, |
||||
) -> Result<Option<Local>> { |
||||
Ok( |
||||
if config |
||||
.outbox |
||||
.disabled |
||||
.as_ref() |
||||
.and_then(|d| d.parse::<bool>().ok()) |
||||
.unwrap_or(false) |
||||
{ |
||||
None |
||||
} else { |
||||
let ctx = || format!("Loading local {}", config.outbox.name); |
||||
Some(Local { |
||||
provider: config.try_provider(metrics).await.with_context(ctx)?.into(), |
||||
outbox: config.try_outbox(metrics).await.with_context(ctx)?.into(), |
||||
indexer: config |
||||
.try_outbox_indexer(metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
.into(), |
||||
}) |
||||
}, |
||||
) |
||||
} |
||||
|
||||
async fn load_remote( |
||||
config: &DomainSettings, |
||||
inbox_config: &ChainSetup<InboxAddresses>, |
||||
metrics: &Arc<CoreMetrics>, |
||||
) -> Result<Option<Remote>> { |
||||
Ok( |
||||
if inbox_config |
||||
.disabled |
||||
.as_ref() |
||||
.and_then(|d| d.parse::<bool>().ok()) |
||||
.unwrap_or(false) |
||||
{ |
||||
None |
||||
} else { |
||||
let ctx = || format!("Loading remote {}", inbox_config.name); |
||||
Some(Remote { |
||||
inbox: config |
||||
.try_inbox(inbox_config, metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
.into(), |
||||
indexer: config |
||||
.try_inbox_indexer(inbox_config, metrics) |
||||
.await |
||||
.with_context(ctx)? |
||||
.into(), |
||||
}) |
||||
}, |
||||
) |
||||
} |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct Remote { |
||||
pub inbox: Arc<dyn Inbox>, |
||||
pub indexer: Arc<dyn InboxIndexer>, |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct Local { |
||||
pub outbox: Arc<dyn Outbox>, |
||||
pub indexer: Arc<dyn OutboxIndexer>, |
||||
pub provider: Arc<dyn AbacusProvider>, |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct Delivery { |
||||
pub inbox: H256, |
||||
pub message_hash: H256, |
||||
pub meta: LogMeta, |
||||
} |
||||
|
||||
#[derive(Debug, Clone)] |
||||
struct SqlChainScraper { |
||||
db: DbConn, |
||||
/// Contracts on this chain representing this chain (e.g. outbox)
|
||||
local: Local, |
||||
/// Contracts on this chain representing remote chains (e.g. inboxes) by
|
||||
/// domain of the remote.
|
||||
remotes: HashMap<u32, Remote>, |
||||
chunk_size: u32, |
||||
metrics: ContractSyncMetrics, |
||||
cursor: Arc<BlockCursor>, |
||||
} |
||||
|
||||
#[allow(unused)] |
||||
impl SqlChainScraper { |
||||
pub async fn new( |
||||
db: DbConn, |
||||
local: Local, |
||||
remotes: HashMap<u32, Remote>, |
||||
index_settings: &IndexSettings, |
||||
metrics: ContractSyncMetrics, |
||||
) -> Result<Self> { |
||||
let cursor = Arc::new( |
||||
BlockCursor::new( |
||||
db.clone(), |
||||
local.outbox.local_domain(), |
||||
index_settings.from() as u64, |
||||
) |
||||
.await?, |
||||
); |
||||
Ok(Self { |
||||
db, |
||||
local, |
||||
remotes, |
||||
chunk_size: index_settings.chunk_size(), |
||||
metrics, |
||||
cursor, |
||||
}) |
||||
} |
||||
|
||||
fn chain_name(&self) -> &str { |
||||
self.local.outbox.chain_name() |
||||
} |
||||
|
||||
fn local_domain(&self) -> u32 { |
||||
self.local.outbox.local_domain() |
||||
} |
||||
|
||||
fn remote_domains(&self) -> impl Iterator<Item = u32> + '_ { |
||||
self.remotes.keys().copied() |
||||
} |
||||
|
||||
async fn get_finalized_block_number(&self) -> Result<u32> { |
||||
self.local.indexer.get_finalized_block_number().await |
||||
} |
||||
|
||||
/// Sync outbox messages.
|
||||
///
|
||||
/// This code is very similar to the outbox contract sync code in
|
||||
/// abacus-base.
|
||||
///
|
||||
/// TODO: merge duplicate logic?
|
||||
/// TODO: better handling for errors to auto-restart without bringing down
|
||||
/// the whole service?
|
||||
#[instrument(skip(self))] |
||||
pub async fn sync(self) -> Result<()> { |
||||
// TODO: pull this into a fn-like struct for ticks?
|
||||
let chain_name = self.chain_name(); |
||||
let message_labels = ["messages", chain_name]; |
||||
let deliveries_labels = ["deliveries", chain_name]; |
||||
|
||||
let indexed_message_height = self |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&message_labels); |
||||
let indexed_deliveries_height = self |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&deliveries_labels); |
||||
let stored_messages = self |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&message_labels); |
||||
let stored_deliveries = self |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&deliveries_labels); |
||||
let missed_messages = self |
||||
.metrics |
||||
.missed_events |
||||
.with_label_values(&message_labels); |
||||
let message_leaf_index = self.metrics.message_leaf_index.clone(); |
||||
|
||||
let chunk_size = self.chunk_size; |
||||
let mut from = self.cursor.height().await as u32; |
||||
let mut last_valid_range_start_block = from; |
||||
let mut last_leaf_index = self.last_message_leaf_index().await?.unwrap_or(0); |
||||
|
||||
info!(from, chunk_size, chain_name, "Resuming chain sync"); |
||||
|
||||
loop { |
||||
indexed_message_height.set(from as i64); |
||||
indexed_deliveries_height.set(from as i64); |
||||
sleep(Duration::from_secs(5)).await; |
||||
|
||||
let tip = if let Ok(num) = self.get_finalized_block_number().await { |
||||
num |
||||
} else { |
||||
continue; |
||||
}; |
||||
if tip <= from { |
||||
// Sleep if caught up to tip
|
||||
sleep(Duration::from_secs(10)).await; |
||||
continue; |
||||
} |
||||
|
||||
let to = min(tip, from + chunk_size); |
||||
let full_chunk_from = to.checked_sub(chunk_size).unwrap_or_default(); |
||||
debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain()); |
||||
let mut sorted_messages = self |
||||
.local |
||||
.indexer |
||||
.fetch_sorted_messages(full_chunk_from, to) |
||||
.await?; |
||||
|
||||
let deliveries: Vec<Delivery> = { |
||||
let mut delivered = vec![]; |
||||
for (_, remote) in self.remotes.iter() { |
||||
debug_assert_eq!(remote.inbox.local_domain(), self.local_domain()); |
||||
delivered.extend( |
||||
remote |
||||
.indexer |
||||
.fetch_processed_messages(full_chunk_from, to) |
||||
.await? |
||||
.into_iter() |
||||
.map(|(message_hash, meta)| Delivery { |
||||
inbox: remote.inbox.address(), |
||||
message_hash, |
||||
meta, |
||||
}), |
||||
) |
||||
} |
||||
delivered |
||||
}; |
||||
|
||||
info!( |
||||
from = full_chunk_from, |
||||
to, |
||||
message_count = sorted_messages.len(), |
||||
deliveries_count = deliveries.len(), |
||||
chain_name, |
||||
"Indexed block range for chain" |
||||
); |
||||
|
||||
sorted_messages = sorted_messages |
||||
.into_iter() |
||||
.filter(|m| m.0.leaf_index > last_leaf_index) |
||||
.collect(); |
||||
|
||||
debug!( |
||||
from = full_chunk_from, |
||||
to, |
||||
message_count = sorted_messages.len(), |
||||
chain_name, |
||||
"Filtered any messages already indexed for outbox." |
||||
); |
||||
|
||||
match validate_message_continuity( |
||||
Some(last_leaf_index), |
||||
&sorted_messages |
||||
.iter() |
||||
.map(|(msg, _)| msg) |
||||
.collect::<Vec<_>>(), |
||||
) { |
||||
ListValidity::Valid => { |
||||
// transaction (database_id, timestamp) by transaction hash
|
||||
let txns: HashMap<H256, (i64, TimeDateTime)> = self |
||||
.ensure_blocks_and_txns( |
||||
sorted_messages |
||||
.iter() |
||||
.map(|(_, meta)| meta) |
||||
.chain(deliveries.iter().map(|d| &d.meta)), |
||||
) |
||||
.await? |
||||
.collect(); |
||||
|
||||
let max_leaf_index_of_batch = |
||||
self.store_messages(&sorted_messages, &txns).await?; |
||||
stored_messages.inc_by(sorted_messages.len() as u64); |
||||
self.record_deliveries(&deliveries, &txns).await?; |
||||
stored_deliveries.inc_by(deliveries.len() as u64); |
||||
|
||||
for (raw_msg, _) in sorted_messages.iter() { |
||||
let dst = CommittedMessage::try_from(raw_msg) |
||||
.ok() |
||||
.and_then(|msg| name_from_domain_id(msg.message.destination)) |
||||
.unwrap_or_else(|| "unknown".into()); |
||||
message_leaf_index |
||||
.with_label_values(&["dispatch", chain_name, &dst]) |
||||
.set(max_leaf_index_of_batch as i64); |
||||
} |
||||
|
||||
self.cursor.update(full_chunk_from as u64).await; |
||||
last_leaf_index = max_leaf_index_of_batch; |
||||
last_valid_range_start_block = full_chunk_from; |
||||
from = to + 1; |
||||
} |
||||
ListValidity::InvalidContinuation => { |
||||
missed_messages.inc(); |
||||
warn!( |
||||
?last_leaf_index, |
||||
start_block = from, |
||||
end_block = to, |
||||
last_valid_range_start_block, |
||||
chain_name, |
||||
"Found invalid continuation in range. Re-indexing from the start block of the last successful range." |
||||
); |
||||
from = last_valid_range_start_block; |
||||
} |
||||
ListValidity::ContainsGaps => { |
||||
missed_messages.inc(); |
||||
warn!( |
||||
?last_leaf_index, |
||||
start_block = from, |
||||
end_block = to, |
||||
last_valid_range_start_block, |
||||
chain_name, |
||||
"Found gaps in the message in range, re-indexing the same range." |
||||
); |
||||
} |
||||
ListValidity::Empty => from = to + 1, |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TODO: move these database functions to a database wrapper type?
|
||||
|
||||
/// Get the highest message leaf index that is stored in the database.
|
||||
#[instrument(skip(self))] |
||||
async fn last_message_leaf_index(&self) -> Result<Option<u32>> { |
||||
use crate::db::message; |
||||
use sea_orm::{prelude::*, QueryOrder, QuerySelect}; |
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] |
||||
enum QueryAs { |
||||
LeafIndex, |
||||
} |
||||
|
||||
Ok(message::Entity::find() |
||||
.filter(message::Column::Origin.eq(self.local_domain())) |
||||
.filter(message::Column::OutboxAddress.eq(format_h256(&self.local.outbox.address()))) |
||||
.order_by_desc(message::Column::LeafIndex) |
||||
.select_only() |
||||
.column_as(message::Column::LeafIndex, QueryAs::LeafIndex) |
||||
.into_values::<i32, QueryAs>() |
||||
.one(&self.db) |
||||
.await? |
||||
.map(|idx| idx as u32)) |
||||
} |
||||
|
||||
/// Store messages from the outbox into the database.
|
||||
///
|
||||
/// Returns the highest message leaf index which was provided to this
|
||||
/// function.
|
||||
#[instrument(
|
||||
level = "debug", |
||||
skip_all, |
||||
fields(messages = ?messages.iter().map(|(_, meta)| meta).collect::<Vec<_>>()) |
||||
)] |
||||
async fn store_messages( |
||||
&self, |
||||
messages: &[(RawCommittedMessage, LogMeta)], |
||||
txns: &HashMap<H256, (i64, TimeDateTime)>, |
||||
) -> Result<u32> { |
||||
use crate::db::message; |
||||
use sea_orm::{sea_query::OnConflict, ActiveValue::*, Insert}; |
||||
|
||||
debug_assert!(!messages.is_empty()); |
||||
|
||||
let max_leaf_id = messages |
||||
.iter() |
||||
.map(|m| m.0.leaf_index) |
||||
.max() |
||||
.ok_or_else(|| eyre!("Received empty list")); |
||||
let models = messages |
||||
.iter() |
||||
.map(|(raw, meta)| { |
||||
let msg = CommittedMessage::try_from(raw)?; |
||||
|
||||
debug_assert_eq!(self.local_domain(), msg.message.origin); |
||||
let (txn_id, txn_timestamp) = txns.get(&meta.transaction_hash).unwrap(); |
||||
Ok(message::ActiveModel { |
||||
id: NotSet, |
||||
time_created: Set(crate::date_time::now()), |
||||
hash: Unchanged(format_h256(&msg.to_leaf())), |
||||
origin: Unchanged(msg.message.origin as i32), |
||||
destination: Set(msg.message.destination as i32), |
||||
leaf_index: Unchanged(msg.leaf_index as i32), |
||||
sender: Set(format_h256(&msg.message.sender)), |
||||
recipient: Set(format_h256(&msg.message.recipient)), |
||||
msg_body: Set(if msg.message.body.is_empty() { |
||||
None |
||||
} else { |
||||
Some(msg.message.body) |
||||
}), |
||||
outbox_address: Unchanged(format_h256(&self.local.outbox.address())), |
||||
timestamp: Set(*txn_timestamp), |
||||
origin_tx_id: Set(*txn_id), |
||||
}) |
||||
}) |
||||
.collect::<Result<Vec<message::ActiveModel>>>()?; |
||||
|
||||
debug_assert!(!models.is_empty()); |
||||
trace!(?models, "Writing messages to database"); |
||||
|
||||
Insert::many(models) |
||||
.on_conflict( |
||||
OnConflict::columns([ |
||||
message::Column::OutboxAddress, |
||||
message::Column::Origin, |
||||
message::Column::LeafIndex, |
||||
]) |
||||
.update_columns([ |
||||
message::Column::TimeCreated, |
||||
message::Column::Destination, |
||||
message::Column::Sender, |
||||
message::Column::Recipient, |
||||
message::Column::MsgBody, |
||||
message::Column::Timestamp, |
||||
message::Column::OriginTxId, |
||||
]) |
||||
.to_owned(), |
||||
) |
||||
.exec(&self.db) |
||||
.await?; |
||||
|
||||
max_leaf_id |
||||
} |
||||
|
||||
/// Record that a message was delivered.
|
||||
async fn record_deliveries( |
||||
&self, |
||||
deliveries: &[Delivery], |
||||
txns: &HashMap<H256, (i64, TimeDateTime)>, |
||||
) -> Result<()> { |
||||
use crate::db::delivered_message; |
||||
use sea_orm::{sea_query::OnConflict, ActiveValue::*, Insert}; |
||||
|
||||
if deliveries.is_empty() { |
||||
return Ok(()); |
||||
} |
||||
|
||||
// we have a race condition where a message may not have been scraped yet even
|
||||
// though we have received news of delivery on this chain, so the
|
||||
// message IDs are looked up in a separate "thread".
|
||||
|
||||
let models = deliveries |
||||
.iter() |
||||
.map(|delivery| delivered_message::ActiveModel { |
||||
id: NotSet, |
||||
time_created: Set(crate::date_time::now()), |
||||
msg_id: NotSet, |
||||
hash: Unchanged(format_h256(&delivery.message_hash)), |
||||
domain: Unchanged(self.local_domain() as i32), |
||||
inbox_address: Unchanged(format_h256(&delivery.inbox)), |
||||
tx_id: Set(txns.get(&delivery.meta.transaction_hash).unwrap().0), |
||||
}) |
||||
.collect::<Vec<_>>(); |
||||
|
||||
debug_assert!(!models.is_empty()); |
||||
trace!(?models, "Writing delivered messages to database"); |
||||
|
||||
Insert::many(models) |
||||
.on_conflict( |
||||
OnConflict::columns([delivered_message::Column::Hash]) |
||||
.update_columns([ |
||||
delivered_message::Column::TimeCreated, |
||||
delivered_message::Column::TxId, |
||||
]) |
||||
.to_owned(), |
||||
) |
||||
.exec(&self.db) |
||||
.await?; |
||||
Ok(()) |
||||
} |
||||
|
||||
/// Takes a list of txn and block hashes and ensure they are all in the
|
||||
/// database. If any are not it will fetch the data and insert them.
|
||||
///
|
||||
/// Returns a lit of transaction hashes mapping to their database ids.
|
||||
async fn ensure_blocks_and_txns( |
||||
&self, |
||||
message_metadata: impl Iterator<Item = &LogMeta>, |
||||
) -> Result<impl Iterator<Item = (H256, (i64, TimeDateTime))>> { |
||||
let block_hash_by_txn_hash: HashMap<H256, H256> = message_metadata |
||||
.map(|meta| (meta.transaction_hash, meta.block_hash)) |
||||
.collect(); |
||||
|
||||
// all blocks we care about
|
||||
// hash of block maps to the block id and timestamp
|
||||
let blocks: HashMap<_, _> = self |
||||
.ensure_blocks(block_hash_by_txn_hash.values().copied()) |
||||
.await? |
||||
.collect(); |
||||
trace!(?blocks, "Ensured blocks"); |
||||
|
||||
// not sure why rust can't detect the lifetimes here are valid, but just
|
||||
// wrapping with the Arc/mutex for now.
|
||||
let block_timestamps_by_txn: Arc<std::sync::Mutex<HashMap<H256, TimeDateTime>>> = |
||||
Default::default(); |
||||
|
||||
let block_timestamps_by_txn_clone = block_timestamps_by_txn.clone(); |
||||
// all txns we care about
|
||||
let ids = |
||||
self.ensure_txns(block_hash_by_txn_hash.into_iter().map( |
||||
move |(txn_hash, block_hash)| { |
||||
let mut block_timestamps_by_txn = block_timestamps_by_txn_clone.lock().unwrap(); |
||||
let block_info = *blocks.get(&block_hash).unwrap(); |
||||
block_timestamps_by_txn.insert(txn_hash, block_info.1); |
||||
(txn_hash, block_info.0) |
||||
}, |
||||
)) |
||||
.await?; |
||||
|
||||
Ok(ids.map(move |(txn, id)| { |
||||
( |
||||
txn, |
||||
( |
||||
id, |
||||
*block_timestamps_by_txn.lock().unwrap().get(&txn).unwrap(), |
||||
), |
||||
) |
||||
})) |
||||
} |
||||
|
||||
/// Takes a list of `(transaction_hash, block_id)` and for each transaction
|
||||
/// if it is in the database already:
|
||||
/// Fetches its associated database id
|
||||
/// if it is not in the database already:
|
||||
/// Looks up its data with ethers and then returns the database id after
|
||||
/// inserting it into the database.
|
||||
async fn ensure_txns( |
||||
&self, |
||||
txns: impl Iterator<Item = (H256, i64)>, |
||||
) -> Result<impl Iterator<Item = (H256, i64)>> { |
||||
use crate::db::transaction; |
||||
use sea_orm::{prelude::*, ActiveValue::*, Insert, QuerySelect}; |
||||
|
||||
// mapping of txn hash to (txn_id, block_id).
|
||||
let mut txns: HashMap<H256, (Option<i64>, i64)> = txns |
||||
.map(|(txn_hash, block_id)| (txn_hash, (None, block_id))) |
||||
.collect(); |
||||
|
||||
let db_txns: Vec<(i64, String)> = if !txns.is_empty() { |
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] |
||||
enum QueryAs { |
||||
Id, |
||||
Hash, |
||||
} |
||||
|
||||
// check database to see which txns we already know and fetch their IDs
|
||||
transaction::Entity::find() |
||||
.filter( |
||||
txns.iter() |
||||
.map(|(txn, _)| transaction::Column::Hash.eq(format_h256(txn))) |
||||
.reduce(|acc, i| acc.or(i)) |
||||
.unwrap(), |
||||
) |
||||
.select_only() |
||||
.column_as(transaction::Column::Id, QueryAs::Id) |
||||
.column_as(transaction::Column::Hash, QueryAs::Hash) |
||||
.into_values::<_, QueryAs>() |
||||
.all(&self.db) |
||||
.await? |
||||
} else { |
||||
vec![] |
||||
}; |
||||
for txn in db_txns { |
||||
let hash = parse_h256(&txn.1)?; |
||||
// insert the txn id now that we have it to the Option value in txns
|
||||
let _ = txns |
||||
.get_mut(&hash) |
||||
.expect("We found a txn that we did not request") |
||||
.0 |
||||
.insert(txn.0); |
||||
} |
||||
|
||||
// insert any txns that were not known and get their IDs
|
||||
// use this vec as temporary list of mut refs so we can update once we get back
|
||||
// the ids.
|
||||
let mut txns_to_insert: Vec<(&H256, &mut (Option<i64>, i64))> = |
||||
txns.iter_mut().filter(|(_, id)| id.0.is_none()).collect(); |
||||
|
||||
let mut models: Vec<transaction::ActiveModel> = Vec::with_capacity(txns_to_insert.len()); |
||||
let as_f64 = ethers::types::U256::to_f64_lossy; |
||||
for (hash, (_, block_id)) in txns_to_insert.iter() { |
||||
let txn = self.local.provider.get_txn_by_hash(hash).await?; |
||||
let receipt = txn |
||||
.receipt |
||||
.as_ref() |
||||
.ok_or_else(|| eyre!("Transaction is not yet included"))?; |
||||
|
||||
models.push(transaction::ActiveModel { |
||||
id: NotSet, |
||||
block_id: Unchanged(*block_id), |
||||
gas_limit: Set(as_f64(txn.gas_limit)), |
||||
max_priority_fee_per_gas: Set(txn.max_priority_fee_per_gas.map(as_f64)), |
||||
hash: Unchanged(format_h256(hash)), |
||||
time_created: Set(date_time::now()), |
||||
gas_used: Set(as_f64(receipt.gas_used)), |
||||
gas_price: Set(txn.gas_price.map(as_f64)), |
||||
effective_gas_price: Set(receipt.effective_gas_price.map(as_f64)), |
||||
nonce: Set(txn.nonce as i64), |
||||
sender: Set(format_h256(&txn.sender)), |
||||
recipient: Set(txn.recipient.as_ref().map(format_h256)), |
||||
max_fee_per_gas: Set(txn.max_fee_per_gas.map(as_f64)), |
||||
cumulative_gas_used: Set(as_f64(receipt.cumulative_gas_used)), |
||||
}); |
||||
} |
||||
|
||||
if !models.is_empty() { |
||||
trace!(?models, "Writing txns to database"); |
||||
// this is actually the ID that was first inserted for postgres
|
||||
let mut cur_id = Insert::many(models).exec(&self.db).await?.last_insert_id; |
||||
for (_hash, (txn_id, _block_id)) in txns_to_insert.iter_mut() { |
||||
debug_assert!(cur_id > 0); |
||||
let _ = txn_id.insert(cur_id); |
||||
cur_id += 1; |
||||
} |
||||
} |
||||
drop(txns_to_insert); |
||||
|
||||
Ok(txns |
||||
.into_iter() |
||||
.map(|(hash, (txn_id, _block_id))| (hash, txn_id.unwrap()))) |
||||
} |
||||
|
||||
/// Takes a list of block hashes for each block
|
||||
/// if it is in the database already:
|
||||
/// Fetches its associated database id
|
||||
/// if it is not in the database already:
|
||||
/// Looks up its data with ethers and then returns the database id after
|
||||
/// inserting it into the database.
|
||||
async fn ensure_blocks( |
||||
&self, |
||||
block_hashes: impl Iterator<Item = H256>, |
||||
) -> Result<impl Iterator<Item = (H256, (i64, TimeDateTime))>> { |
||||
use crate::db::block; |
||||
use sea_orm::{prelude::*, ActiveValue::*, FromQueryResult, Insert, QuerySelect}; |
||||
|
||||
type OptionalBlockInfo = Option<(Option<i64>, BlockInfo)>; |
||||
// mapping of block hash to the database id and block timestamp. Optionals are
|
||||
// in place because we will find the timestamp first if the block was not
|
||||
// already in the db.
|
||||
let mut blocks: HashMap<H256, OptionalBlockInfo> = |
||||
block_hashes.map(|b| (b, None)).collect(); |
||||
|
||||
#[derive(FromQueryResult)] |
||||
struct Block { |
||||
id: i64, |
||||
hash: String, |
||||
timestamp: TimeDateTime, |
||||
} |
||||
|
||||
let db_blocks: Vec<Block> = if !blocks.is_empty() { |
||||
// check database to see which blocks we already know and fetch their IDs
|
||||
block::Entity::find() |
||||
.filter( |
||||
blocks |
||||
.iter() |
||||
.map(|(block, _)| block::Column::Hash.eq(format_h256(block))) |
||||
.reduce(|acc, i| acc.or(i)) |
||||
.unwrap(), |
||||
) |
||||
.select_only() |
||||
.column_as(block::Column::Id, "id") |
||||
.column_as(block::Column::Hash, "hash") |
||||
.column_as(block::Column::Timestamp, "timestamp") |
||||
.into_model::<Block>() |
||||
.all(&self.db) |
||||
.await? |
||||
} else { |
||||
vec![] |
||||
}; |
||||
|
||||
for block in db_blocks { |
||||
let hash = parse_h256(&block.hash)?; |
||||
let _ = blocks |
||||
.get_mut(&hash) |
||||
.expect("We found a block that we did not request") |
||||
.insert(( |
||||
Some(block.id), |
||||
BlockInfo { |
||||
hash, |
||||
timestamp: date_time::to_unix_timestamp_s(&block.timestamp), |
||||
// TODO: we don't actually use these below, we should make sure to clean
|
||||
// this out
|
||||
number: 0, |
||||
}, |
||||
)); |
||||
} |
||||
|
||||
let blocks_to_fetch = blocks |
||||
.iter_mut() |
||||
.inspect(|(_, info)| { |
||||
// info being defined implies the id has been set (at this point)
|
||||
debug_assert!(info.is_none() || info.as_ref().unwrap().0.is_some()) |
||||
}) |
||||
.filter(|(_, block_info)| block_info.is_none()); |
||||
for (hash, block_info) in blocks_to_fetch { |
||||
let info = self.local.provider.get_block_by_hash(hash).await?; |
||||
let _ = block_info.insert((None, info)); |
||||
} |
||||
|
||||
// insert any blocks that were not known and get their IDs
|
||||
// use this vec as temporary list of mut refs so we can update once we get back
|
||||
// the ids.
|
||||
let mut blocks_to_insert: Vec<(&H256, &mut OptionalBlockInfo)> = blocks |
||||
.iter_mut() |
||||
.filter(|(_, info)| info.as_ref().unwrap().0.is_none()) |
||||
.collect(); |
||||
|
||||
let mut models: Vec<block::ActiveModel> = blocks_to_insert |
||||
.iter() |
||||
.map(|(hash, block_info)| { |
||||
let block_info = block_info.as_ref().unwrap(); |
||||
block::ActiveModel { |
||||
id: NotSet, |
||||
hash: Set(format_h256(hash)), |
||||
time_created: Set(date_time::now()), |
||||
domain: Unchanged(self.local_domain() as i32), |
||||
height: Unchanged(block_info.1.number as i64), |
||||
timestamp: Set(date_time::from_unix_timestamp_s(block_info.1.timestamp)), |
||||
} |
||||
}) |
||||
.collect(); |
||||
|
||||
if !models.is_empty() { |
||||
trace!(?models, "Writing blocks to database"); |
||||
// `last_insert_id` is actually the ID that was first inserted for postgres
|
||||
let mut cur_id = Insert::many(models).exec(&self.db).await?.last_insert_id; |
||||
for (_hash, block_info) in blocks_to_insert.iter_mut() { |
||||
debug_assert!(cur_id > 0); |
||||
let _ = block_info.as_mut().unwrap().0.insert(cur_id); |
||||
cur_id += 1; |
||||
} |
||||
} |
||||
|
||||
Ok(blocks.into_iter().map(|(hash, block_info)| { |
||||
let block_info = block_info.unwrap(); |
||||
( |
||||
hash, |
||||
( |
||||
block_info.0.unwrap(), |
||||
date_time::from_unix_timestamp_s(block_info.1.timestamp), |
||||
), |
||||
) |
||||
})) |
||||
} |
||||
} |
||||
|
||||
/// Task-thread to link the delivered messages to the correct messages.
|
||||
#[instrument(skip_all)] |
||||
async fn delivered_message_linker(db: DbConn) -> Result<()> { |
||||
use sea_orm::{ConnectionTrait, DbBackend, Statement}; |
||||
|
||||
const QUERY: &str = r#" |
||||
UPDATE |
||||
"delivered_message" AS "delivered" |
||||
SET |
||||
"msg_id" = "message"."id" |
||||
FROM |
||||
"message" |
||||
WHERE |
||||
"delivered"."msg_id" IS NULL |
||||
AND "message"."hash" = "delivered"."hash" |
||||
"#; |
||||
|
||||
loop { |
||||
let linked = db |
||||
.execute(Statement::from_string( |
||||
DbBackend::Postgres, |
||||
QUERY.to_owned(), |
||||
)) |
||||
.await? |
||||
.rows_affected(); |
||||
info!(linked, "Linked message deliveries"); |
||||
sleep(Duration::from_secs(10)).await; |
||||
} |
||||
} |
Loading…
Reference in new issue