fix: dedup eth logs (#3108)

### Description

Deduplicates ethereum logs by collecting them into a HashSet before
returning a Vec. I couldn't find a way to deduplicate using an ethers
config, but since `fetch_logs` is only called once in hyperlane-base,
I'm doing the deduplication there "globally". I also noted in doc
comments that duplicates may be returned.

### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3070

### Backward compatibility

Yes

### Testing

Will rely on e2e passing but haven't added unit tests for this scenario
pull/3110/head
Daniel Savu 11 months ago committed by GitHub
parent 3f88aa6f6e
commit cdbaf9ef02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      rust/chains/hyperlane-ethereum/src/interchain_gas.rs
  2. 2
      rust/chains/hyperlane-ethereum/src/mailbox.rs
  3. 1
      rust/chains/hyperlane-ethereum/src/merkle_tree_hook.rs
  4. 6
      rust/hyperlane-base/src/contract_sync/mod.rs
  5. 2
      rust/hyperlane-core/src/types/log_metadata.rs
  6. 2
      rust/hyperlane-core/src/types/merkle_tree.rs
  7. 2
      rust/hyperlane-core/src/types/message.rs
  8. 2
      rust/hyperlane-core/src/types/mod.rs

@ -84,6 +84,7 @@ impl<M> Indexer<InterchainGasPayment> for EthereumInterchainGasPaymasterIndexer<
where where
M: Middleware + 'static, M: Middleware + 'static,
{ {
/// Note: This call may return duplicates depending on the provider used
#[instrument(err, skip(self))] #[instrument(err, skip(self))]
async fn fetch_logs( async fn fetch_logs(
&self, &self,

@ -125,6 +125,7 @@ where
self.get_finalized_block_number().await self.get_finalized_block_number().await
} }
/// Note: This call may return duplicates depending on the provider used
#[instrument(err, skip(self))] #[instrument(err, skip(self))]
async fn fetch_logs( async fn fetch_logs(
&self, &self,
@ -168,6 +169,7 @@ where
self.get_finalized_block_number().await self.get_finalized_block_number().await
} }
/// Note: This call may return duplicates depending on the provider used
#[instrument(err, skip(self))] #[instrument(err, skip(self))]
async fn fetch_logs(&self, range: RangeInclusive<u32>) -> ChainResult<Vec<(H256, LogMeta)>> { async fn fetch_logs(&self, range: RangeInclusive<u32>) -> ChainResult<Vec<(H256, LogMeta)>> {
Ok(self Ok(self

@ -105,6 +105,7 @@ impl<M> Indexer<MerkleTreeInsertion> for EthereumMerkleTreeHookIndexer<M>
where where
M: Middleware + 'static, M: Middleware + 'static,
{ {
/// Note: This call may return duplicates depending on the provider used
#[instrument(err, skip(self))] #[instrument(err, skip(self))]
async fn fetch_logs( async fn fetch_logs(
&self, &self,

@ -1,4 +1,4 @@
use std::{fmt::Debug, marker::PhantomData, sync::Arc}; use std::{collections::HashSet, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
use cursor::*; use cursor::*;
use derive_new::new; use derive_new::new;
@ -31,7 +31,7 @@ pub struct ContractSync<T, D: HyperlaneLogStore<T>, I: Indexer<T>> {
impl<T, D, I> ContractSync<T, D, I> impl<T, D, I> ContractSync<T, D, I>
where where
T: Debug + Send + Sync + Clone + 'static, T: Debug + Send + Sync + Clone + Eq + Hash + 'static,
D: HyperlaneLogStore<T> + 'static, D: HyperlaneLogStore<T> + 'static,
I: Indexer<T> + Clone + 'static, I: Indexer<T> + Clone + 'static,
{ {
@ -67,6 +67,8 @@ where
debug!(?range, "Looking for for events in index range"); debug!(?range, "Looking for for events in index range");
let logs = self.indexer.fetch_logs(range.clone()).await?; let logs = self.indexer.fetch_logs(range.clone()).await?;
let deduped_logs = HashSet::<_>::from_iter(logs);
let logs = Vec::from_iter(deduped_logs);
info!( info!(
?range, ?range,

@ -10,7 +10,7 @@ use crate::{H256, H512, U256};
/// A close clone of the Ethereum `LogMeta`, this is designed to be a more /// A close clone of the Ethereum `LogMeta`, this is designed to be a more
/// generic metadata that we can use for other blockchains later. Some changes /// generic metadata that we can use for other blockchains later. Some changes
/// may be required in the future. /// may be required in the future.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default, Hash)]
pub struct LogMeta { pub struct LogMeta {
/// Address from which this log originated /// Address from which this log originated
pub address: H256, pub address: H256,

@ -4,7 +4,7 @@ use std::io::{Read, Write};
use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced, H256}; use crate::{Decode, Encode, HyperlaneProtocolError, Sequenced, H256};
/// Merkle Tree Hook insertion event /// Merkle Tree Hook insertion event
#[derive(Debug, Copy, Clone, new, Eq, PartialEq)] #[derive(Debug, Copy, Clone, new, Eq, PartialEq, Hash)]
pub struct MerkleTreeInsertion { pub struct MerkleTreeInsertion {
leaf_index: u32, leaf_index: u32,
message_id: H256, message_id: H256,

@ -21,7 +21,7 @@ impl From<&HyperlaneMessage> for RawHyperlaneMessage {
} }
/// A full Hyperlane message between chains /// A full Hyperlane message between chains
#[derive(Clone, Eq, PartialEq)] #[derive(Clone, Eq, PartialEq, Hash)]
pub struct HyperlaneMessage { pub struct HyperlaneMessage {
/// 1 Hyperlane version number /// 1 Hyperlane version number
pub version: u8, pub version: u8,

@ -115,7 +115,7 @@ pub struct GasPaymentKey {
} }
/// A payment of a message's gas costs. /// A payment of a message's gas costs.
#[derive(Debug, Copy, Clone, Default, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Hash)]
pub struct InterchainGasPayment { pub struct InterchainGasPayment {
/// Id of the message /// Id of the message
pub message_id: H256, pub message_id: H256,

Loading…
Cancel
Save