feat: faster way of figuring out how many new scraper rows were inserted (#4073)

### Description

### Context

Full context / thread:
https://discord.com/channels/935678348330434570/1254871503825141923

Copied here:

so this is kinda interesting

basically the gas payment indexing in the scraper task is consistently
prioritizing hook-based indexing work over block range indexing work

this only manifests in the scraper and for arbitrum because:
there are a lot of arbitrum origin messages
on the scraper, each gas payment indexing tick takes a very long time
because of some very suboptimal queries. store_payments can take like
10-15 seconds!
You can see batches of logs Found log(s) in index range
https://cloudlogging.app.goo.gl/Bo9Q7YwyziSqjsEu7 that correspond with
the gas payment contract sync block height (green line) advancing
https://abacusworks.grafana.net/goto/poW4KOQIg?orgId=1


![image](https://github.com/hyperlane-xyz/hyperlane-monorepo/assets/20362969/65da2e44-5728-4ecc-9c1f-52741805eb5b)

but we're still indexing gas payments bc of hook indexing - there are
tons of Found log(s) for tx id
https://cloudlogging.app.goo.gl/QvDaBuf67SjjKviw6
In the scraper's store_payments we bizarrely end up performing 2 very
expensive queries with the calls to payments_count - it gets the count
of gas payments for the provided domain so it can figure out how many
new gas payments were actually inserted

https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/main/rust/agents/scraper/src/db/payment.rs#L27-L64

These queries vary a bunch in how expensive they can be - sometimes each
one is like 1-2 seconds, sometimes it gets up to 10 seconds each though
https://cloudlogging.app.goo.gl/cDFNH3oxh6WWV3Go8


![image](https://github.com/hyperlane-xyz/hyperlane-monorepo/assets/20362969/612c7999-21b2-43f4-861f-8b42580e4cb7)

easiest fix is probably to figure out how to make store_payments less
expensive - we probably don't need to be making these calls to
payments_count to figure out which ones are new
which ill do atm
there's maybe a decent argument to having hook indexing and range
indexing not contend with one another but we should be able to kick that
can
also an annoying thing is
https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/main/rust/hyperlane-base/src/contract_sync/mod.rs#L155
this log not showing up, probably some weird instrumentation setup or
something
makes debugging harder

### The fix

We want to keep supporting the possibility that we've double-indexed a
data type, and to only return the number of new rows inserted.

We want to keep each indexing task as non-conflicting as possible - we
know that only one scraper is running at a time, but that a task for
each chain and data type is executing concurrently. It's possible for
e.g. two gas payment indexing tasks to try to insert into the same table
concurrently, but it'll be for different domains. Insert statement
concurrency issues aren't an issue because the only chance for conflict
is the auto-incrementing primary key id, but this is all handled
properly by database internals.

Insert results only give the latest ID resulting from the insertion.
E.g. if no matter how many rows were inserted, it gives the latest ID
from that batch, but no info about how many were in the batch.

We could do something like get the latest id in the table for all
domains (this is cheap) before, then do the insert, and then get the
latest id again, but this is susceptible to race conditions if
concurrent tasks get an insertion in during that time as well. Shoving
all these into a transaction doesn't seem to solve the problem either
unless there's a lock on the table. However, sea-orm doesn't support
table level locks, just select locks. We could also use a CTE to perform
as a way to perform the select & insertion atomically and return the
value of the select - but again sea-orm doesn't seem to provide a nice
way of doing this from what I could tell.

Instead, my suggested fix is:
1. Get the latest id in the table relating to the specific domain (this
is cheap, < 100ms)
2. Perform the insertion
3. Count how many new rows for the specific domain now exist whose id is
> the id from (1). This is also cheap, < 100ms. If there is a very long
time since the last insertion for the domain, this could take longer,
but in practice this doesn't seem to be the case.

If this couple order of magnitude improvement still poses issues in the
future, we can consider other alternatives, probably involving locking
or fancier CTE queries.

Some example queries and how long they take:

Before (2x of these would occur!):
<img width="477" alt="Screen Shot 2024-06-28 at 12 36 41 PM"
src="https://github.com/hyperlane-xyz/hyperlane-monorepo/assets/20362969/5cc8ac35-74fb-4847-9c6f-448deb253a0c">

Now:
1. Max id for a given domain
<img width="494" alt="Screen Shot 2024-06-28 at 12 37 00 PM"
src="https://github.com/hyperlane-xyz/hyperlane-monorepo/assets/20362969/56dbb3ff-b540-458c-809c-56ebfbfd3b0f">

2. Subsequent count but with an ID filter - note I actually changed this
query to look for IDs > 3600000, which includes 55300 rows and it's
still just 102ms!
<img width="632" alt="Screen Shot 2024-06-28 at 12 37 49 PM"
src="https://github.com/hyperlane-xyz/hyperlane-monorepo/assets/20362969/11593180-682f-4867-9867-97fd93512802">

### Drive-by changes

- Updated the neutron image along with a deploy there

### Related issues



### Backward compatibility



### Testing

Ran some sorta ad-hoc unit tests to test my new query by running against
the prod db with a tokio::task. Tested for a new domain and an existing
one
pull/4088/head
Trevor Porter 4 months ago committed by GitHub
parent 36e75af4ed
commit 6f4ef05d41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 91
      rust/agents/scraper/src/db/message.rs
  2. 43
      rust/agents/scraper/src/db/payment.rs
  3. 2
      typescript/infra/config/environments/mainnet3/agent.ts

@ -118,10 +118,34 @@ impl ScraperDb {
Ok(tx_id)
}
async fn deliveries_count(&self, domain: u32, destination_mailbox: Vec<u8>) -> Result<u64> {
async fn latest_deliveries_id(&self, domain: u32, destination_mailbox: Vec<u8>) -> Result<i64> {
let result = delivered_message::Entity::find()
.select_only()
.column_as(delivered_message::Column::Id.max(), "max_id")
.filter(delivered_message::Column::Domain.eq(domain))
.filter(delivered_message::Column::DestinationMailbox.eq(destination_mailbox))
.into_tuple::<Option<i64>>()
.one(&self.0)
.await?;
Ok(result
// Top level Option indicates some kind of error
.ok_or_else(|| eyre::eyre!("Error getting latest delivery id"))?
// Inner Option indicates whether there was any data in the filter -
// just default to 0 if there was no data
.unwrap_or(0))
}
async fn deliveries_count_since_id(
&self,
domain: u32,
destination_mailbox: Vec<u8>,
prev_id: i64,
) -> Result<u64> {
Ok(delivered_message::Entity::find()
.filter(delivered_message::Column::Domain.eq(domain))
.filter(delivered_message::Column::DestinationMailbox.eq(destination_mailbox.clone()))
.filter(delivered_message::Column::DestinationMailbox.eq(destination_mailbox))
.filter(delivered_message::Column::Id.gt(prev_id))
.count(&self.0)
.await?)
}
@ -136,8 +160,8 @@ impl ScraperDb {
deliveries: impl Iterator<Item = StorableDelivery<'_>>,
) -> Result<u64> {
let destination_mailbox = address_to_bytes(&destination_mailbox);
let deliveries_count_before = self
.deliveries_count(domain, destination_mailbox.clone())
let latest_id_before = self
.latest_deliveries_id(domain, destination_mailbox.clone())
.await?;
// we have a race condition where a message may not have been scraped yet even
// though we have received news of delivery on this chain, so the
@ -167,21 +191,48 @@ impl ScraperDb {
)
.exec(&self.0)
.await?;
let deliveries_count_after = self.deliveries_count(domain, destination_mailbox).await?;
let difference = deliveries_count_after.saturating_sub(deliveries_count_before);
if difference > 0 {
let new_deliveries_count = self
.deliveries_count_since_id(domain, destination_mailbox, latest_id_before)
.await?;
if new_deliveries_count > 0 {
debug!(
messages = difference,
messages = new_deliveries_count,
"Wrote new delivered messages to database"
);
}
Ok(difference)
Ok(new_deliveries_count)
}
async fn latest_dispatched_id(&self, domain: u32, origin_mailbox: Vec<u8>) -> Result<i64> {
let result = message::Entity::find()
.select_only()
.column_as(message::Column::Id.max(), "max_id")
.filter(message::Column::Origin.eq(domain))
.filter(message::Column::OriginMailbox.eq(origin_mailbox))
.into_tuple::<Option<i64>>()
.one(&self.0)
.await?;
Ok(result
// Top level Option indicates some kind of error
.ok_or_else(|| eyre::eyre!("Error getting latest dispatched id"))?
// Inner Option indicates whether there was any data in the filter -
// just default to 0 if there was no data
.unwrap_or(0))
}
async fn dispatched_messages_count(&self, domain: u32, origin_mailbox: Vec<u8>) -> Result<u64> {
async fn dispatch_count_since_id(
&self,
domain: u32,
origin_mailbox: Vec<u8>,
prev_id: i64,
) -> Result<u64> {
Ok(message::Entity::find()
.filter(message::Column::Origin.eq(domain))
.filter(message::Column::OriginMailbox.eq(origin_mailbox))
.filter(message::Column::Id.gt(prev_id))
.count(&self.0)
.await?)
}
@ -196,8 +247,8 @@ impl ScraperDb {
messages: impl Iterator<Item = StorableMessage<'_>>,
) -> Result<u64> {
let origin_mailbox = address_to_bytes(origin_mailbox);
let messages_count_before = self
.dispatched_messages_count(domain, origin_mailbox.clone())
let latest_id_before = self
.latest_dispatched_id(domain, origin_mailbox.clone())
.await?;
// we have a race condition where a message may not have been scraped yet even
let models = messages
@ -242,13 +293,17 @@ impl ScraperDb {
)
.exec(&self.0)
.await?;
let messages_count_after = self
.dispatched_messages_count(domain, origin_mailbox)
let new_dispatch_count = self
.dispatch_count_since_id(domain, origin_mailbox, latest_id_before)
.await?;
let difference = messages_count_after.saturating_sub(messages_count_before);
if difference > 0 {
debug!(messages = difference, "Wrote new messages to database");
if new_dispatch_count > 0 {
debug!(
messages = new_dispatch_count,
"Wrote new messages to database"
);
}
Ok(difference)
Ok(new_dispatch_count)
}
}

@ -1,6 +1,6 @@
use eyre::Result;
use eyre::{eyre, Result};
use itertools::Itertools;
use sea_orm::{prelude::*, ActiveValue::*, Insert};
use sea_orm::{prelude::*, ActiveValue::*, Insert, QuerySelect};
use tracing::{debug, instrument, trace};
use hyperlane_core::{InterchainGasPayment, LogMeta};
@ -26,7 +26,8 @@ impl ScraperDb {
domain: u32,
payments: impl Iterator<Item = StorablePayment<'_>>,
) -> Result<u64> {
let payment_count_before = self.payments_count(domain).await?;
let latest_id_before = self.latest_payment_id(domain).await?;
// we have a race condition where a message may not have been scraped yet even
let models = payments
.map(|storable| gas_payment::ActiveModel {
@ -61,17 +62,41 @@ impl ScraperDb {
)
.exec(&self.0)
.await?;
let payment_count_after = self.payments_count(domain).await?;
let difference = payment_count_after.saturating_sub(payment_count_before);
if difference > 0 {
debug!(payments = difference, "Wrote new gas payments to database");
let new_payments_count = self
.payments_count_since_id(domain, latest_id_before)
.await?;
if new_payments_count > 0 {
debug!(
payments = new_payments_count,
"Wrote new gas payments to database"
);
}
Ok(difference)
Ok(new_payments_count)
}
async fn latest_payment_id(&self, domain: u32) -> Result<i64> {
let result = gas_payment::Entity::find()
.select_only()
.column_as(gas_payment::Column::Id.max(), "max_id")
.filter(gas_payment::Column::Domain.eq(domain))
.into_tuple::<Option<i64>>()
.one(&self.0)
.await?;
Ok(result
// Top level Option indicates some kind of error
.ok_or_else(|| eyre!("Error getting latest payment id"))?
// Inner Option indicates whether there was any data in the filter -
// just default to 0 if there was no data
.unwrap_or(0))
}
async fn payments_count(&self, domain: u32) -> Result<u64> {
async fn payments_count_since_id(&self, domain: u32, prev_id: i64) -> Result<u64> {
Ok(gas_payment::Entity::find()
.filter(gas_payment::Column::Domain.eq(domain))
.filter(gas_payment::Column::Id.gt(prev_id))
.count(&self.0)
.await?)
}

@ -318,7 +318,7 @@ const neutron: RootAgentConfig = {
rpcConsensusType: RpcConsensusType.Fallback,
docker: {
repo,
tag: '0d12ff3-20240620-173353',
tag: '9535087-20240623-174819',
},
gasPaymentEnforcement: [
{

Loading…
Cancel
Save