Add more tracing information (#362)

* Add more tracing information

* PR review
pull/372/head
Nam Chu Hoai 3 years ago committed by GitHub
parent 6593e47f1a
commit cee9a2d8ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      rust/abacus-base/src/types/checkpoint_syncer.rs
  2. 12
      rust/abacus-base/src/types/multisig.rs
  3. 4
      rust/agents/relayer/src/checkpoint_relayer.rs
  4. 6
      rust/agents/relayer/src/merkle_tree_builder.rs
  5. 6
      rust/agents/relayer/src/message_processor.rs
  6. 2
      rust/chains/abacus-ethereum/src/inbox.rs

@ -1,6 +1,7 @@
use core::str::FromStr;
use ethers::types::Address;
use std::collections::HashMap;
use tracing::instrument;
use abacus_core::SignedCheckpoint;
use async_trait::async_trait;
@ -77,6 +78,7 @@ pub enum CheckpointSyncers {
#[async_trait]
impl CheckpointSyncer for CheckpointSyncers {
#[instrument(err, skip(self), level = "debug")]
/// Read the highest index of this Syncer
async fn latest_index(&self) -> Result<Option<u32>> {
match self {
@ -84,6 +86,8 @@ impl CheckpointSyncer for CheckpointSyncers {
CheckpointSyncers::S3(syncer) => syncer.latest_index().await,
}
}
#[instrument(err, skip(self))]
/// Attempt to fetch the signed checkpoint at this index
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpoint>> {
match self {
@ -91,6 +95,8 @@ impl CheckpointSyncer for CheckpointSyncers {
CheckpointSyncers::S3(syncer) => syncer.fetch_checkpoint(index).await,
}
}
#[instrument(err, skip(self))]
/// Write the signed checkpoint to this syncer
async fn write_checkpoint(&self, signed_checkpoint: SignedCheckpoint) -> Result<()> {
match self {

@ -5,6 +5,7 @@ use ethers::prelude::Address;
use ethers::types::H256;
use color_eyre::Result;
use tracing::{debug, instrument};
use crate::{CheckpointSyncer, CheckpointSyncers};
@ -28,6 +29,7 @@ impl MultisigCheckpointSyncer {
/// Fetches a MultisigSignedCheckpoint if there is a quorum.
/// Returns Ok(None) if there is no quorum.
#[instrument(err, skip(self))]
pub async fn fetch_checkpoint(&self, index: u32) -> Result<Option<MultisigSignedCheckpoint>> {
// Keeps track of signed validator checkpoints for a particular root.
// In practice, it's likely that validators will all sign the same root for a
@ -73,9 +75,9 @@ impl MultisigCheckpointSyncer {
// If we've hit a quorum, create a MultisigSignedCheckpoint
if signature_count >= self.threshold {
if let Some(signed_checkpoints) = signed_checkpoints_per_root.get(&root) {
return Ok(Some(MultisigSignedCheckpoint::try_from(
signed_checkpoints,
)?));
let checkpoint = MultisigSignedCheckpoint::try_from(signed_checkpoints)?;
debug!(checkpoint=?checkpoint, "Fetched multisig checkpoint");
return Ok(Some(checkpoint));
}
}
}
@ -92,6 +94,7 @@ impl MultisigCheckpointSyncer {
/// Note it's possible for both strategies for finding the latest index to not find a quorum.
/// A more robust implementation should be considered in the future that indexes historical
/// checkpoint indices.
#[instrument(err, skip(self))]
pub async fn latest_index(&self) -> Result<Option<u32>> {
// Get the latest_index from each validator's checkpoint syncer.
let mut latest_indices = Vec::with_capacity(self.checkpoint_syncers.len());
@ -101,9 +104,12 @@ impl MultisigCheckpointSyncer {
latest_indices.push(index);
}
}
debug!(latest_indices=?latest_indices, "Fetched latest indices from checkpoint syncers");
if latest_indices.is_empty() {
return Ok(None);
}
// Sort in descending order to iterate through higher indices first.
latest_indices.sort_by(|a, b| b.cmp(a));

@ -5,7 +5,7 @@ use abacus_core::{db::AbacusDB, AbacusCommon, CommittedMessage, Inbox, InboxVali
use color_eyre::Result;
use prometheus::{IntGauge, IntGaugeVec};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument};
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};
use crate::merkle_tree_builder::{MerkleTreeBuilder, MessageBatch};
@ -66,6 +66,7 @@ impl CheckpointRelayer {
/// Only gets the messages desinated for the Relayers inbox
/// Inclusive the to_checkpoint_index
#[instrument(ret, err, skip(self), level = "debug")]
async fn get_messages_between(
&self,
from_leaf_index: u32,
@ -97,6 +98,7 @@ impl CheckpointRelayer {
}
// Returns the newest "current" checkpoint index
#[instrument(ret, err, skip(self, messages), fields(messages=messages.len()))]
async fn submit_checkpoint_and_messages(
&mut self,
onchain_checkpoint_index: u32,

@ -8,8 +8,9 @@ use color_eyre::eyre::Result;
use ethers::core::types::H256;
use std::fmt::Display;
use tracing::{debug, error};
use tracing::{debug, error, instrument};
#[derive(Debug)]
/// Struct to update prover
pub struct MessageBatch {
/// Messages
@ -111,6 +112,7 @@ impl MerkleTreeBuilder {
}
}
#[instrument(err, skip(self), level = "debug")]
pub fn get_proof(&self, leaf_index: u32) -> Result<Proof, MerkleTreeBuilderError> {
self.prover.prove(leaf_index as usize).map_err(Into::into)
}
@ -136,6 +138,7 @@ impl MerkleTreeBuilder {
self.prover.count() as u32
}
#[instrument(err, skip(self), level = "debug")]
pub async fn update_to_checkpoint(
&mut self,
checkpoint: &Checkpoint,
@ -163,6 +166,7 @@ impl MerkleTreeBuilder {
Ok(())
}
#[instrument(err, skip(self), level = "debug")]
/// Update the prover with a message batch
pub fn update_from_batch(
&mut self,

@ -10,7 +10,9 @@ use abacus_core::{db::AbacusDB, AbacusCommon, CommittedMessage, Inbox, MessageSt
use color_eyre::{eyre::bail, Result};
use prometheus::{IntGauge, IntGaugeVec};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, error, info, info_span, instrument::Instrumented, warn, Instrument};
use tracing::{
debug, error, info, info_span, instrument, instrument::Instrumented, warn, Instrument,
};
use crate::merkle_tree_builder::MerkleTreeBuilder;
@ -34,6 +36,7 @@ struct MessageToRetry {
retries: u32,
}
#[derive(Debug)]
enum MessageProcessingStatus {
NotDestinedForInbox,
NotYetCheckpointed,
@ -73,6 +76,7 @@ impl MessageProcessor {
}
}
#[instrument(ret, err, skip(self), level = "debug")]
async fn try_processing_message(
&mut self,
message_leaf_index: u32,

@ -233,7 +233,7 @@ where
Ok(self.contract.remote_domain().call().await?)
}
#[tracing::instrument(err)]
#[tracing::instrument(err, skip(proof))]
async fn process(
&self,
message: &AbacusMessage,

Loading…
Cancel
Save