From 90191d43d0d142d83b407e8b4c3b721ee53ed855 Mon Sep 17 00:00:00 2001 From: Asa Oines Date: Fri, 26 May 2023 08:58:29 -0400 Subject: [PATCH] Fix bug in MessageSyncCursor rewind logic (#2297) ### Description This PR fixes a bug in the MessageSyncCursor rewind logic that causes the cursor to get stuck. The cursor would look for the message with nonce `n`, find it, and move its `next_block` to the block `b_n` at which `n` was dispatched. It would then index starting at `b_n`, find `n` again, interpret that as a "discontinuity", and rewind back to `b_n`, getting itself stuck permanently. We *should* be moving to `b_n` and not e.g. `b_n + 1` because we have no guarantee that `n+1` was not also present in `b_n` but just dropped by a flaky `eth_getLogs` implementation. The fix is to only consider messages with nonce > `n` when looking for discontinuities when our target nonce is `n + 1`. ### Drive-by changes - Modifies the E2E tests to use a chunk size of 1, allowing us to reproduce this failure mode without the filtering fix. - Debug log on rewind - Comments to clarify fast forward logic --- .../src/contract_sync/cursor.rs | 22 ++++++++++++++++--- rust/utils/run-locally/src/main.rs | 3 +++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/rust/hyperlane-base/src/contract_sync/cursor.rs b/rust/hyperlane-base/src/contract_sync/cursor.rs index 8354d9ce2..7b640624d 100644 --- a/rust/hyperlane-base/src/contract_sync/cursor.rs +++ b/rust/hyperlane-base/src/contract_sync/cursor.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use derive_new::new; use eyre::Result; use tokio::time::sleep; -use tracing::warn; +use tracing::{debug, warn}; use hyperlane_core::{ ChainResult, ContractSyncCursor, HyperlaneMessage, HyperlaneMessageStore, @@ -61,10 +61,12 @@ impl MessageSyncCursor { // If we found messages, but did *not* find the message we were looking for, // we need to rewind to the block at which we found the last message. if !logs.is_empty() && !logs.iter().any(|m| m.0.nonce == self.next_nonce) { + debug!(next_nonce=?self.next_nonce, "Target nonce not found, rewinding"); // If the previous nonce has been synced, rewind to the block number // at which it was dispatched. Otherwise, rewind all the way back to the start block. if let Some(block_number) = self.retrieve_dispatched_block_number(prev_nonce).await { self.next_block = block_number; + debug!(block_number, "Rewound to previous known message"); } else { self.next_block = self.start_block; } @@ -94,6 +96,7 @@ impl ForwardMessageSyncCursor { .retrieve_dispatched_block_number(self.0.next_nonce) .await { + // It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number + 1. self.0.next_block = block_number; } self.0.next_nonce += 1; @@ -142,7 +145,13 @@ impl ContractSyncCursor for ForwardMessageSyncCursor { /// Otherwise, rewind all the way back to the start block. async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> eyre::Result<()> { let prev_nonce = self.0.next_nonce.saturating_sub(1); - self.0.update(logs, prev_nonce).await + // We may wind up having re-indexed messages that are previous to the nonce that we are looking for. + // We should not consider these messages when checking for continuity errors. + let filtered_logs = logs + .into_iter() + .filter(|m| m.0.nonce >= self.0.next_nonce) + .collect(); + self.0.update(filtered_logs, prev_nonce).await } } @@ -177,6 +186,7 @@ impl BackwardMessageSyncCursor { .retrieve_dispatched_block_number(self.cursor.next_nonce) .await { + // It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number - 1. self.cursor.next_block = block_number; } @@ -199,7 +209,13 @@ impl BackwardMessageSyncCursor { /// Otherwise, rewind all the way back to the start block. async fn update(&mut self, logs: Vec<(HyperlaneMessage, LogMeta)>) -> eyre::Result<()> { let prev_nonce = self.cursor.next_nonce.saturating_add(1); - self.cursor.update(logs, prev_nonce).await + // We may wind up having re-indexed messages that are previous to the nonce that we are looking for. + // We should not consider these messages when checking for continuity errors. + let filtered_logs = logs + .into_iter() + .filter(|m| m.0.nonce <= self.cursor.next_nonce) + .collect(); + self.cursor.update(filtered_logs, prev_nonce).await } } diff --git a/rust/utils/run-locally/src/main.rs b/rust/utils/run-locally/src/main.rs index 47404d2a9..5cff2282a 100644 --- a/rust/utils/run-locally/src/main.rs +++ b/rust/utils/run-locally/src/main.rs @@ -163,6 +163,9 @@ fn main() -> ExitCode { "RUST_BACKTRACE" => "full", "HYP_BASE_TRACING_FMT" => "pretty", "HYP_BASE_TRACING_LEVEL" => "debug", + "HYP_BASE_CHAINS_TEST1_INDEX_CHUNK" => "1", + "HYP_BASE_CHAINS_TEST2_INDEX_CHUNK" => "1", + "HYP_BASE_CHAINS_TEST3_INDEX_CHUNK" => "1", }; let relayer_env = hashmap! {