diff --git a/.changeset/real-phones-bake.md b/.changeset/real-phones-bake.md new file mode 100644 index 000000000..589c4e012 --- /dev/null +++ b/.changeset/real-phones-bake.md @@ -0,0 +1,7 @@ +--- +'@hyperlane-xyz/infra': minor +'@hyperlane-xyz/cli': minor +'@hyperlane-xyz/sdk': minor +--- + +Implements persistent relayer for use in CLI diff --git a/typescript/cli/scripts/run-e2e-test.sh b/typescript/cli/scripts/run-e2e-test.sh index e398253b2..bfa89a0b3 100755 --- a/typescript/cli/scripts/run-e2e-test.sh +++ b/typescript/cli/scripts/run-e2e-test.sh @@ -3,8 +3,7 @@ function cleanup() { set +e pkill -f anvil - rm -rf /tmp/anvil2 - rm -rf /tmp/anvil3 + rm -rf ./tmp rm -f ./test-configs/anvil/chains/anvil2/addresses.yaml rm -f ./test-configs/anvil/chains/anvil3/addresses.yaml set -e diff --git a/typescript/cli/src/commands/relayer.ts b/typescript/cli/src/commands/relayer.ts index de7ae18e1..0d7672e73 100644 --- a/typescript/cli/src/commands/relayer.ts +++ b/typescript/cli/src/commands/relayer.ts @@ -1,35 +1,94 @@ -import { HyperlaneCore, HyperlaneRelayer } from '@hyperlane-xyz/sdk'; +import { + ChainMap, + HyperlaneCore, + HyperlaneRelayer, + RelayerCacheSchema, +} from '@hyperlane-xyz/sdk'; +import { Address } from '@hyperlane-xyz/utils'; import { CommandModuleWithContext } from '../context/types.js'; import { log } from '../logger.js'; +import { tryReadJson, writeJson } from '../utils/files.js'; +import { getWarpCoreConfigOrExit } from '../utils/input.js'; -import { agentTargetsCommandOption } from './options.js'; +import { + agentTargetsCommandOption, + overrideRegistryUriCommandOption, + symbolCommandOption, + warpCoreConfigCommandOption, +} from './options.js'; import { MessageOptionsArgTypes } from './send.js'; +const DEFAULT_RELAYER_CACHE = `${overrideRegistryUriCommandOption.default}/relayer-cache.json`; + export const relayerCommand: CommandModuleWithContext< - MessageOptionsArgTypes & { chains?: string } + MessageOptionsArgTypes & { + chains?: string; + cache: string; + symbol?: string; + warp?: string; + } > = { command: 'relayer', - describe: 'Run a Hyperlane message self-relayer', + describe: 'Run a Hyperlane message relayer', builder: { chains: agentTargetsCommandOption, + cache: { + describe: 'Path to relayer cache file', + type: 'string', + default: DEFAULT_RELAYER_CACHE, + }, + symbol: symbolCommandOption, + warp: warpCoreConfigCommandOption, }, - handler: async ({ context, chains }) => { - const chainsArray = chains - ? chains.split(',').map((_) => _.trim()) - : undefined; + handler: async ({ context, cache, chains, symbol, warp }) => { const chainAddresses = await context.registry.getAddresses(); const core = HyperlaneCore.fromAddressesMap( chainAddresses, context.multiProvider, ); - const relayer = new HyperlaneRelayer({ core }); + const chainsArray = + chains?.split(',').map((_) => _.trim()) ?? Object.keys(chainAddresses); + + const whitelist: ChainMap = Object.fromEntries( + chainsArray.map((chain) => [chain, []]), + ); + + // add warp route addresses to whitelist + if (symbol || warp) { + const warpRoute = await getWarpCoreConfigOrExit({ + context, + symbol, + warp, + }); + warpRoute.tokens.forEach( + ({ chainName, addressOrDenom }) => + (whitelist[chainName] = [addressOrDenom!]), + ); + } + + const relayer = new HyperlaneRelayer({ core, whitelist }); + // TODO: fix merkle hook stubbing + + const jsonCache = tryReadJson(cache); + if (jsonCache) { + try { + const parsedCache = RelayerCacheSchema.parse(jsonCache); + relayer.hydrate(parsedCache); + } catch (error) { + log(`Error hydrating cache: ${error}`); + } + } + log('Starting relayer ...'); - relayer.start(chainsArray); + relayer.start(); + process.once('SIGINT', () => { - relayer.stop(chainsArray); log('Stopping relayer ...'); + relayer.stop(); + + writeJson(cache, relayer.cache); process.exit(0); }); }, diff --git a/typescript/cli/src/commands/signCommands.ts b/typescript/cli/src/commands/signCommands.ts index 79c243ca6..37d83096a 100644 --- a/typescript/cli/src/commands/signCommands.ts +++ b/typescript/cli/src/commands/signCommands.ts @@ -1,7 +1,7 @@ // Commands that send tx and require a key to sign. // It's useful to have this listed here so the context // middleware can request keys up front when required. -export const SIGN_COMMANDS = ['deploy', 'send', 'status', 'submit']; +export const SIGN_COMMANDS = ['deploy', 'send', 'status', 'submit', 'relayer']; export function isSignCommand(argv: any): boolean { return ( diff --git a/typescript/cli/src/tests/commands/helpers.ts b/typescript/cli/src/tests/commands/helpers.ts index 6dd76969f..8cb6be027 100644 --- a/typescript/cli/src/tests/commands/helpers.ts +++ b/typescript/cli/src/tests/commands/helpers.ts @@ -1,3 +1,5 @@ +import { $ } from 'zx'; + import { ERC20Test__factory, ERC4626Test__factory } from '@hyperlane-xyz/core'; import { ChainAddresses } from '@hyperlane-xyz/registry'; import { @@ -183,3 +185,26 @@ export async function sendWarpRouteMessageRoundTrip( await hyperlaneWarpSendRelay(chain1, chain2, warpCoreConfigPath); return hyperlaneWarpSendRelay(chain2, chain1, warpCoreConfigPath); } + +export async function hyperlaneSendMessage( + origin: string, + destination: string, +) { + return $`yarn workspace @hyperlane-xyz/cli run hyperlane send message \ + --registry ${REGISTRY_PATH} \ + --origin ${origin} \ + --destination ${destination} \ + --key ${ANVIL_KEY} \ + --verbosity debug \ + --yes`; +} + +export function hyperlaneRelayer(chains: string[], warp?: string) { + return $`yarn workspace @hyperlane-xyz/cli run hyperlane relayer \ + --registry ${REGISTRY_PATH} \ + --chains ${chains.join(',')} \ + --warp ${warp ?? ''} \ + --key ${ANVIL_KEY} \ + --verbosity debug \ + --yes`; +} diff --git a/typescript/cli/src/tests/commands/warp.ts b/typescript/cli/src/tests/commands/warp.ts index 3f3eec338..dc80b6ad2 100644 --- a/typescript/cli/src/tests/commands/warp.ts +++ b/typescript/cli/src/tests/commands/warp.ts @@ -60,9 +60,10 @@ export async function hyperlaneWarpSendRelay( origin: string, destination: string, warpCorePath: string, + relay = true, ) { return $`yarn workspace @hyperlane-xyz/cli run hyperlane warp send \ - --relay \ + ${relay ? '--relay' : ''} \ --registry ${REGISTRY_PATH} \ --overrides " " \ --origin ${origin} \ diff --git a/typescript/cli/src/tests/relay.e2e-test.ts b/typescript/cli/src/tests/relay.e2e-test.ts new file mode 100644 index 000000000..3f970eed7 --- /dev/null +++ b/typescript/cli/src/tests/relay.e2e-test.ts @@ -0,0 +1,82 @@ +import { TokenType } from '@hyperlane-xyz/sdk'; + +import { writeYamlOrJson } from '../utils/files.js'; + +import { hyperlaneCoreDeploy } from './commands/core.js'; +import { + REGISTRY_PATH, + hyperlaneRelayer, + hyperlaneSendMessage, +} from './commands/helpers.js'; +import { + hyperlaneWarpDeploy, + hyperlaneWarpSendRelay, +} from './commands/warp.js'; + +const CHAIN_NAME_1 = 'anvil2'; +const CHAIN_NAME_2 = 'anvil3'; + +const SYMBOL = 'ETH'; + +const WARP_DEPLOY_OUTPUT = `${REGISTRY_PATH}/deployments/warp_routes/${SYMBOL}/${CHAIN_NAME_1}-${CHAIN_NAME_2}-config.yaml`; + +const EXAMPLES_PATH = './examples'; +const CORE_CONFIG_PATH = `${EXAMPLES_PATH}/core-config.yaml`; + +const TEST_TIMEOUT = 100_000; // Long timeout since these tests can take a while +describe('hyperlane relayer e2e tests', async function () { + this.timeout(TEST_TIMEOUT); + + before(async () => { + await hyperlaneCoreDeploy(CHAIN_NAME_1, CORE_CONFIG_PATH); + await hyperlaneCoreDeploy(CHAIN_NAME_2, CORE_CONFIG_PATH); + + const warpConfig = { + anvil2: { + type: TokenType.native, + symbol: SYMBOL, + }, + anvil3: { + type: TokenType.synthetic, + symbol: SYMBOL, + }, + }; + + const warpConfigPath = './tmp/warp-route-config.yaml'; + writeYamlOrJson(warpConfigPath, warpConfig); + await hyperlaneWarpDeploy(warpConfigPath); + }); + + describe('relayer', () => { + it('should relay core messages', async () => { + const process = hyperlaneRelayer([CHAIN_NAME_1, CHAIN_NAME_2]); + + await hyperlaneSendMessage(CHAIN_NAME_1, CHAIN_NAME_2); + await hyperlaneSendMessage(CHAIN_NAME_2, CHAIN_NAME_1); + + await process.kill('SIGINT'); + }); + + it('should relay warp messages', async () => { + const process = hyperlaneRelayer( + [CHAIN_NAME_1, CHAIN_NAME_2], + WARP_DEPLOY_OUTPUT, + ); + + await hyperlaneWarpSendRelay( + CHAIN_NAME_1, + CHAIN_NAME_2, + WARP_DEPLOY_OUTPUT, + false, + ); + await hyperlaneWarpSendRelay( + CHAIN_NAME_2, + CHAIN_NAME_1, + WARP_DEPLOY_OUTPUT, + false, + ); + + await process.kill('SIGINT'); + }); + }); +}); diff --git a/typescript/cli/src/utils/relay.ts b/typescript/cli/src/utils/relay.ts index 51ea7bc15..24bdc96fc 100644 --- a/typescript/cli/src/utils/relay.ts +++ b/typescript/cli/src/utils/relay.ts @@ -19,5 +19,6 @@ export function stubMerkleTreeConfig( }, }, ism: {}, + backlog: [], }); } diff --git a/typescript/infra/scripts/relay.ts b/typescript/infra/scripts/relay.ts index ddfbad99d..e689146d8 100644 --- a/typescript/infra/scripts/relay.ts +++ b/typescript/infra/scripts/relay.ts @@ -10,11 +10,10 @@ const CACHE_PATH = process.env.RELAYER_CACHE ?? './relayer-cache.json'; async function main() { const { environment } = await getArgs().argv; const { core } = await getHyperlaneCore(environment); - const relayer = new HyperlaneRelayer({ core }); - // target subset of chains - // const chains = ['ethereum', 'polygon', 'bsc'] - const chains = undefined; + // target subset of chains and senders/recipients + const whitelist = undefined; + const relayer = new HyperlaneRelayer({ core, whitelist }); try { const contents = await readFile(CACHE_PATH, 'utf-8'); @@ -26,10 +25,10 @@ async function main() { console.error(`Failed to load cache from ${CACHE_PATH}`); } - relayer.start(chains); + relayer.start(); process.once('SIGINT', async () => { - relayer.stop(chains); + relayer.stop(); const cache = JSON.stringify(relayer.cache); await writeFile(CACHE_PATH, cache, 'utf-8'); diff --git a/typescript/sdk/src/core/HyperlaneCore.ts b/typescript/sdk/src/core/HyperlaneCore.ts index 8b1425d10..a3f82e36d 100644 --- a/typescript/sdk/src/core/HyperlaneCore.ts +++ b/typescript/sdk/src/core/HyperlaneCore.ts @@ -204,9 +204,16 @@ export class HyperlaneCore extends HyperlaneApp { mailbox.on( mailbox.filters.Dispatch(), (_sender, _destination, _recipient, message, event) => { - const parsed = HyperlaneCore.parseDispatchedMessage(message); - this.logger.info(`Observed message ${parsed.id} on ${originChain}`); - return handler(parsed, event); + const dispatched = HyperlaneCore.parseDispatchedMessage(message); + + // add human readable chain names + dispatched.parsed.originChain = this.getOrigin(dispatched); + dispatched.parsed.destinationChain = this.getDestination(dispatched); + + this.logger.info( + `Observed message ${dispatched.id} on ${originChain} to ${dispatched.parsed.destinationChain}`, + ); + return handler(dispatched, event); }, ); }); diff --git a/typescript/sdk/src/core/HyperlaneRelayer.ts b/typescript/sdk/src/core/HyperlaneRelayer.ts index cb0ba570e..0a1a451eb 100644 --- a/typescript/sdk/src/core/HyperlaneRelayer.ts +++ b/typescript/sdk/src/core/HyperlaneRelayer.ts @@ -2,13 +2,18 @@ import { ethers, providers } from 'ethers'; import { Logger } from 'pino'; import { z } from 'zod'; +import { ChainMap } from '@hyperlane-xyz/sdk'; import { Address, + ParsedMessage, assert, + bytes32ToAddress, + messageId, objMap, objMerge, - pollAsync, + parseMessage, promiseObjAll, + sleep, } from '@hyperlane-xyz/utils'; import { DerivedHookConfig, EvmHookReader } from '../hook/EvmHookReader.js'; @@ -31,19 +36,64 @@ const DerivedHookConfigWithAddressSchema = const DerivedIsmConfigWithAddressSchema = IsmConfigSchema.and(WithAddressSchema); +const BacklogMessageSchema = z.object({ + attempts: z.number(), + lastAttempt: z.number(), + message: z.string(), + dispatchTx: z.string(), +}); + +const MessageBacklogSchema = z.array(BacklogMessageSchema); + export const RelayerCacheSchema = z.object({ hook: z.record(z.record(DerivedHookConfigWithAddressSchema)), ism: z.record(z.record(DerivedIsmConfigWithAddressSchema)), + backlog: MessageBacklogSchema, }); type RelayerCache = z.infer; +type MessageWhitelist = ChainMap>; + +// message must have origin and destination chains in the whitelist +// if whitelist has non-empty address set for chain, message must have sender and recipient in the set +export function messageMatchesWhitelist( + whitelist: MessageWhitelist, + message: ParsedMessage, +): boolean { + const originAddresses = whitelist[message.originChain ?? message.origin]; + if (!originAddresses) { + return false; + } + + const sender = bytes32ToAddress(message.sender); + if (originAddresses.size !== 0 && !originAddresses.has(sender)) { + return false; + } + + const destinationAddresses = + whitelist[message.destinationChain ?? message.destination]; + if (!destinationAddresses) { + return false; + } + + const recipient = bytes32ToAddress(message.recipient); + if (destinationAddresses.size !== 0 && !destinationAddresses.has(recipient)) { + return false; + } + + return true; +} + export class HyperlaneRelayer { protected multiProvider: MultiProvider; protected metadataBuilder: BaseMetadataBuilder; protected readonly core: HyperlaneCore; protected readonly retryTimeout: number; + protected readonly whitelist: ChainMap> | undefined; + + public backlog: RelayerCache['backlog']; public cache: RelayerCache | undefined; protected stopRelayingHandler: ((chains?: ChainName[]) => void) | undefined; @@ -53,21 +103,32 @@ export class HyperlaneRelayer { constructor({ core, caching = true, - retryTimeout = 5 * 1000, + retryTimeout = 1000, + whitelist = undefined, }: { core: HyperlaneCore; caching?: boolean; retryTimeout?: number; + whitelist?: ChainMap; }) { this.core = core; this.retryTimeout = retryTimeout; this.logger = core.logger.child({ module: 'Relayer' }); this.metadataBuilder = new BaseMetadataBuilder(core); this.multiProvider = core.multiProvider; + if (whitelist) { + this.whitelist = objMap( + whitelist, + (_chain, addresses) => new Set(addresses), + ); + } + + this.backlog = []; if (caching) { this.cache = { hook: {}, ism: {}, + backlog: [], }; } } @@ -152,11 +213,24 @@ export class HyperlaneRelayer { messageIndex = 0, message = HyperlaneCore.getDispatchedMessages(dispatchTx)[messageIndex], ): Promise { + if (this.whitelist) { + // add human readable names for use in whitelist checks + message.parsed = { + originChain: this.core.getOrigin(message), + destinationChain: this.core.getDestination(message), + ...message.parsed, + }; + assert( + messageMatchesWhitelist(this.whitelist, message.parsed), + `Message ${message.id} does not match whitelist`, + ); + } + this.logger.info(`Preparing to relay message ${message.id}`); const isDelivered = await this.core.isDelivered(message); if (isDelivered) { - this.logger.debug(`Message ${message.id} already delivered`); + this.logger.info(`Message ${message.id} already delivered`); return this.core.getProcessedReceipt(message); } @@ -170,18 +244,14 @@ export class HyperlaneRelayer { ]); this.logger.debug({ ism, hook }, `Retrieved ISM and hook configs`); - const blockTime = this.multiProvider.getChainMetadata( - message.parsed.destination, - ).blocks?.estimateBlockTime; - const waitTime = blockTime ? blockTime * 2 : this.retryTimeout; - - const metadata = await pollAsync( - () => this.metadataBuilder.build({ message, ism, hook, dispatchTx }), - waitTime, - 12, // 12 attempts - ); + const metadata = await this.metadataBuilder.build({ + message, + ism, + hook, + dispatchTx, + }); - this.logger.info({ message, metadata }, `Relaying message ${message.id}`); + this.logger.info(`Relaying message ${message.id}`); return this.core.deliver(message, metadata); } @@ -206,44 +276,92 @@ export class HyperlaneRelayer { ); } - start(chains = this.core.chains()): void { - assert(!this.stopRelayingHandler, 'Relayer already started'); + protected async flushBacklog(): Promise { + while (this.stopRelayingHandler) { + const backlogMsg = this.backlog.shift(); - const { removeHandler } = this.core.onDispatch(async (message, event) => { - const destination = message.parsed.destination; - const chain = this.multiProvider.tryGetChainName(destination); - if (!chain) { - this.logger.warn(`Unknown destination ${destination}`); - return; + if (!backlogMsg) { + this.logger.trace('Backlog empty, waiting 1s'); + await sleep(1000); + continue; } - if (!chains.includes(chain)) { - this.logger.info(`Skipping message to chain ${chain}`); - return; + // linear backoff (attempts * retryTimeout) + if ( + Date.now() < + backlogMsg.lastAttempt + backlogMsg.attempts * this.retryTimeout + ) { + this.backlog.push(backlogMsg); + continue; } - const dispatchReceipt = await event.getTransactionReceipt(); - const processReceipt = await this.relayMessage( - dispatchReceipt, - undefined, - message, - ); + const { message, dispatchTx, attempts } = backlogMsg; + const id = messageId(message); + const parsed = parseMessage(message); + const dispatchMsg = { id, message, parsed }; + + try { + const dispatchReceipt = await this.multiProvider + .getProvider(parsed.origin) + .getTransactionReceipt(dispatchTx); + + // TODO: handle batching + await this.relayMessage(dispatchReceipt, undefined, dispatchMsg); + } catch (error) { + this.logger.error( + `Failed to relay message ${id} (attempt #${attempts + 1})`, + ); + this.backlog.push({ + ...backlogMsg, + attempts: attempts + 1, + lastAttempt: Date.now(), + }); + } + } + } - this.logger.info( - `Message ${message.id} was processed in ${ - this.multiProvider.tryGetExplorerTxUrl(destination, { - hash: processReceipt.transactionHash, - }) ?? 'tx ' + processReceipt.transactionHash - }`, - ); - }, chains); + protected whitelistChains() { + return this.whitelist ? Object.keys(this.whitelist) : undefined; + } + + start(): void { + assert(!this.stopRelayingHandler, 'Relayer already started'); + + this.backlog = this.cache?.backlog ?? []; + + const { removeHandler } = this.core.onDispatch(async (message, event) => { + if ( + this.whitelist && + !messageMatchesWhitelist(this.whitelist, message.parsed) + ) { + this.logger.debug( + { message, whitelist: this.whitelist }, + `Skipping message ${message.id} not matching whitelist`, + ); + return; + } + + this.backlog.push({ + attempts: 0, + lastAttempt: Date.now(), + message: message.message, + dispatchTx: event.transactionHash, + }); + }, this.whitelistChains()); this.stopRelayingHandler = removeHandler; + + // start flushing backlog + void this.flushBacklog(); } - stop(chains = this.core.chains()): void { + stop(): void { assert(this.stopRelayingHandler, 'Relayer not started'); - this.stopRelayingHandler(chains); + this.stopRelayingHandler(this.whitelistChains()); this.stopRelayingHandler = undefined; + + if (this.cache) { + this.cache.backlog = this.backlog; + } } }