|
|
|
@ -2,6 +2,7 @@ use std::collections::VecDeque; |
|
|
|
|
use std::sync::Arc; |
|
|
|
|
use std::time::{Duration, Instant}; |
|
|
|
|
|
|
|
|
|
use derive_new::new; |
|
|
|
|
use eyre::{bail, Result}; |
|
|
|
|
use prometheus::{IntCounter, IntGauge}; |
|
|
|
|
use tokio::sync::mpsc::{self, error::TryRecvError}; |
|
|
|
@ -12,8 +13,9 @@ use tracing::{debug, error, info, info_span, instrument, instrument::Instrumente |
|
|
|
|
use hyperlane_base::{CachingMailbox, CoreMetrics}; |
|
|
|
|
use hyperlane_core::{db::HyperlaneDB, HyperlaneChain, HyperlaneDomain, Mailbox, U256}; |
|
|
|
|
|
|
|
|
|
use super::metadata_builder::MetadataBuilder; |
|
|
|
|
use super::{gas_payment::GasPaymentEnforcer, SubmitMessageArgs}; |
|
|
|
|
use super::{ |
|
|
|
|
gas_payment::GasPaymentEnforcer, metadata_builder::MetadataBuilder, SubmitMessageArgs, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
/// SerialSubmitter accepts undelivered messages over a channel from a MessageProcessor. It is
|
|
|
|
|
/// responsible for executing the right strategy to deliver those messages to the destination
|
|
|
|
@ -102,21 +104,23 @@ use super::{gas_payment::GasPaymentEnforcer, SubmitMessageArgs}; |
|
|
|
|
// TODO(webbhorn): Do we also want to await finality_blocks on source chain before attempting
|
|
|
|
|
// submission? Does this already happen?
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
|
#[derive(Debug, new)] |
|
|
|
|
pub(crate) struct SerialSubmitter { |
|
|
|
|
/// Used to construct the ISM metadata needed to verify a message.
|
|
|
|
|
metadata_builder: MetadataBuilder, |
|
|
|
|
/// Receiver for new messages to submit.
|
|
|
|
|
rx: mpsc::UnboundedReceiver<SubmitMessageArgs>, |
|
|
|
|
/// Messages we are aware of that we want to eventually submit, but haven't yet, for
|
|
|
|
|
/// whatever reason. They are not in any priority order, so are held in a vector.
|
|
|
|
|
#[new(default)] |
|
|
|
|
wait_queue: Vec<SubmitMessageArgs>, |
|
|
|
|
/// Messages that are in theory deliverable, but which are waiting in a queue for their turn
|
|
|
|
|
/// to be dispatched. The SerialSubmitter can only dispatch one message at a time, so this
|
|
|
|
|
/// queue could grow.
|
|
|
|
|
#[new(default)] |
|
|
|
|
run_queue: VecDeque<SubmitMessageArgs>, |
|
|
|
|
/// Mailbox on the destination chain.
|
|
|
|
|
mailbox: CachingMailbox, |
|
|
|
|
/// Used to construct the ISM metadata needed to verify a message.
|
|
|
|
|
metadata_builder: MetadataBuilder, |
|
|
|
|
/// Interface to agent rocks DB for e.g. writing delivery status upon completion.
|
|
|
|
|
db: HyperlaneDB, |
|
|
|
|
/// Metrics for serial submitter.
|
|
|
|
@ -128,28 +132,6 @@ pub(crate) struct SerialSubmitter { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl SerialSubmitter { |
|
|
|
|
pub(crate) fn new( |
|
|
|
|
rx: mpsc::UnboundedReceiver<SubmitMessageArgs>, |
|
|
|
|
mailbox: CachingMailbox, |
|
|
|
|
metadata_builder: MetadataBuilder, |
|
|
|
|
db: HyperlaneDB, |
|
|
|
|
metrics: SerialSubmitterMetrics, |
|
|
|
|
gas_payment_enforcer: Arc<GasPaymentEnforcer>, |
|
|
|
|
transaction_gas_limit: Option<U256>, |
|
|
|
|
) -> Self { |
|
|
|
|
Self { |
|
|
|
|
rx, |
|
|
|
|
wait_queue: Vec::new(), |
|
|
|
|
run_queue: VecDeque::new(), |
|
|
|
|
mailbox, |
|
|
|
|
metadata_builder, |
|
|
|
|
db, |
|
|
|
|
metrics, |
|
|
|
|
gas_payment_enforcer, |
|
|
|
|
transaction_gas_limit, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn spawn(mut self) -> Instrumented<JoinHandle<Result<()>>> { |
|
|
|
|
tokio::spawn(async move { self.work_loop().await }) |
|
|
|
|
.instrument(info_span!("serial submitter work loop")) |
|
|
|
|