feat: persistent ts relayer (#4831)

### Description

- Adjust long running TS relayer with retry queue instead of crashing.
- Adds a "whitelist" for relaying to specific message
senders/recipients.
- Adds a "symbol" flag borrowed from warp commands for filtering on a
specific warp route.

### Drive-by changes

None

### Related issues

- Enables warp route deployer to run the CLI relayer in the background
and test/share the warp UI.

### Backward compatibility

Yes

### Testing

CLI e2e tests
pull/4890/head
Yorke Rhodes 2 weeks ago committed by GitHub
parent f76984baa5
commit 5db46bd315
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      .changeset/real-phones-bake.md
  2. 3
      typescript/cli/scripts/run-e2e-test.sh
  3. 81
      typescript/cli/src/commands/relayer.ts
  4. 2
      typescript/cli/src/commands/signCommands.ts
  5. 25
      typescript/cli/src/tests/commands/helpers.ts
  6. 3
      typescript/cli/src/tests/commands/warp.ts
  7. 82
      typescript/cli/src/tests/relay.e2e-test.ts
  8. 1
      typescript/cli/src/utils/relay.ts
  9. 11
      typescript/infra/scripts/relay.ts
  10. 13
      typescript/sdk/src/core/HyperlaneCore.ts
  11. 196
      typescript/sdk/src/core/HyperlaneRelayer.ts

@ -0,0 +1,7 @@
---
'@hyperlane-xyz/infra': minor
'@hyperlane-xyz/cli': minor
'@hyperlane-xyz/sdk': minor
---
Implements persistent relayer for use in CLI

@ -3,8 +3,7 @@
function cleanup() {
set +e
pkill -f anvil
rm -rf /tmp/anvil2
rm -rf /tmp/anvil3
rm -rf ./tmp
rm -f ./test-configs/anvil/chains/anvil2/addresses.yaml
rm -f ./test-configs/anvil/chains/anvil3/addresses.yaml
set -e

@ -1,35 +1,94 @@
import { HyperlaneCore, HyperlaneRelayer } from '@hyperlane-xyz/sdk';
import {
ChainMap,
HyperlaneCore,
HyperlaneRelayer,
RelayerCacheSchema,
} from '@hyperlane-xyz/sdk';
import { Address } from '@hyperlane-xyz/utils';
import { CommandModuleWithContext } from '../context/types.js';
import { log } from '../logger.js';
import { tryReadJson, writeJson } from '../utils/files.js';
import { getWarpCoreConfigOrExit } from '../utils/input.js';
import { agentTargetsCommandOption } from './options.js';
import {
agentTargetsCommandOption,
overrideRegistryUriCommandOption,
symbolCommandOption,
warpCoreConfigCommandOption,
} from './options.js';
import { MessageOptionsArgTypes } from './send.js';
const DEFAULT_RELAYER_CACHE = `${overrideRegistryUriCommandOption.default}/relayer-cache.json`;
export const relayerCommand: CommandModuleWithContext<
MessageOptionsArgTypes & { chains?: string }
MessageOptionsArgTypes & {
chains?: string;
cache: string;
symbol?: string;
warp?: string;
}
> = {
command: 'relayer',
describe: 'Run a Hyperlane message self-relayer',
describe: 'Run a Hyperlane message relayer',
builder: {
chains: agentTargetsCommandOption,
cache: {
describe: 'Path to relayer cache file',
type: 'string',
default: DEFAULT_RELAYER_CACHE,
},
symbol: symbolCommandOption,
warp: warpCoreConfigCommandOption,
},
handler: async ({ context, chains }) => {
const chainsArray = chains
? chains.split(',').map((_) => _.trim())
: undefined;
handler: async ({ context, cache, chains, symbol, warp }) => {
const chainAddresses = await context.registry.getAddresses();
const core = HyperlaneCore.fromAddressesMap(
chainAddresses,
context.multiProvider,
);
const relayer = new HyperlaneRelayer({ core });
const chainsArray =
chains?.split(',').map((_) => _.trim()) ?? Object.keys(chainAddresses);
const whitelist: ChainMap<Address[]> = Object.fromEntries(
chainsArray.map((chain) => [chain, []]),
);
// add warp route addresses to whitelist
if (symbol || warp) {
const warpRoute = await getWarpCoreConfigOrExit({
context,
symbol,
warp,
});
warpRoute.tokens.forEach(
({ chainName, addressOrDenom }) =>
(whitelist[chainName] = [addressOrDenom!]),
);
}
const relayer = new HyperlaneRelayer({ core, whitelist });
// TODO: fix merkle hook stubbing
const jsonCache = tryReadJson(cache);
if (jsonCache) {
try {
const parsedCache = RelayerCacheSchema.parse(jsonCache);
relayer.hydrate(parsedCache);
} catch (error) {
log(`Error hydrating cache: ${error}`);
}
}
log('Starting relayer ...');
relayer.start(chainsArray);
relayer.start();
process.once('SIGINT', () => {
relayer.stop(chainsArray);
log('Stopping relayer ...');
relayer.stop();
writeJson(cache, relayer.cache);
process.exit(0);
});
},

@ -1,7 +1,7 @@
// Commands that send tx and require a key to sign.
// It's useful to have this listed here so the context
// middleware can request keys up front when required.
export const SIGN_COMMANDS = ['deploy', 'send', 'status', 'submit'];
export const SIGN_COMMANDS = ['deploy', 'send', 'status', 'submit', 'relayer'];
export function isSignCommand(argv: any): boolean {
return (

@ -1,3 +1,5 @@
import { $ } from 'zx';
import { ERC20Test__factory, ERC4626Test__factory } from '@hyperlane-xyz/core';
import { ChainAddresses } from '@hyperlane-xyz/registry';
import {
@ -183,3 +185,26 @@ export async function sendWarpRouteMessageRoundTrip(
await hyperlaneWarpSendRelay(chain1, chain2, warpCoreConfigPath);
return hyperlaneWarpSendRelay(chain2, chain1, warpCoreConfigPath);
}
export async function hyperlaneSendMessage(
origin: string,
destination: string,
) {
return $`yarn workspace @hyperlane-xyz/cli run hyperlane send message \
--registry ${REGISTRY_PATH} \
--origin ${origin} \
--destination ${destination} \
--key ${ANVIL_KEY} \
--verbosity debug \
--yes`;
}
export function hyperlaneRelayer(chains: string[], warp?: string) {
return $`yarn workspace @hyperlane-xyz/cli run hyperlane relayer \
--registry ${REGISTRY_PATH} \
--chains ${chains.join(',')} \
--warp ${warp ?? ''} \
--key ${ANVIL_KEY} \
--verbosity debug \
--yes`;
}

@ -60,9 +60,10 @@ export async function hyperlaneWarpSendRelay(
origin: string,
destination: string,
warpCorePath: string,
relay = true,
) {
return $`yarn workspace @hyperlane-xyz/cli run hyperlane warp send \
--relay \
${relay ? '--relay' : ''} \
--registry ${REGISTRY_PATH} \
--overrides " " \
--origin ${origin} \

@ -0,0 +1,82 @@
import { TokenType } from '@hyperlane-xyz/sdk';
import { writeYamlOrJson } from '../utils/files.js';
import { hyperlaneCoreDeploy } from './commands/core.js';
import {
REGISTRY_PATH,
hyperlaneRelayer,
hyperlaneSendMessage,
} from './commands/helpers.js';
import {
hyperlaneWarpDeploy,
hyperlaneWarpSendRelay,
} from './commands/warp.js';
const CHAIN_NAME_1 = 'anvil2';
const CHAIN_NAME_2 = 'anvil3';
const SYMBOL = 'ETH';
const WARP_DEPLOY_OUTPUT = `${REGISTRY_PATH}/deployments/warp_routes/${SYMBOL}/${CHAIN_NAME_1}-${CHAIN_NAME_2}-config.yaml`;
const EXAMPLES_PATH = './examples';
const CORE_CONFIG_PATH = `${EXAMPLES_PATH}/core-config.yaml`;
const TEST_TIMEOUT = 100_000; // Long timeout since these tests can take a while
describe('hyperlane relayer e2e tests', async function () {
this.timeout(TEST_TIMEOUT);
before(async () => {
await hyperlaneCoreDeploy(CHAIN_NAME_1, CORE_CONFIG_PATH);
await hyperlaneCoreDeploy(CHAIN_NAME_2, CORE_CONFIG_PATH);
const warpConfig = {
anvil2: {
type: TokenType.native,
symbol: SYMBOL,
},
anvil3: {
type: TokenType.synthetic,
symbol: SYMBOL,
},
};
const warpConfigPath = './tmp/warp-route-config.yaml';
writeYamlOrJson(warpConfigPath, warpConfig);
await hyperlaneWarpDeploy(warpConfigPath);
});
describe('relayer', () => {
it('should relay core messages', async () => {
const process = hyperlaneRelayer([CHAIN_NAME_1, CHAIN_NAME_2]);
await hyperlaneSendMessage(CHAIN_NAME_1, CHAIN_NAME_2);
await hyperlaneSendMessage(CHAIN_NAME_2, CHAIN_NAME_1);
await process.kill('SIGINT');
});
it('should relay warp messages', async () => {
const process = hyperlaneRelayer(
[CHAIN_NAME_1, CHAIN_NAME_2],
WARP_DEPLOY_OUTPUT,
);
await hyperlaneWarpSendRelay(
CHAIN_NAME_1,
CHAIN_NAME_2,
WARP_DEPLOY_OUTPUT,
false,
);
await hyperlaneWarpSendRelay(
CHAIN_NAME_2,
CHAIN_NAME_1,
WARP_DEPLOY_OUTPUT,
false,
);
await process.kill('SIGINT');
});
});
});

@ -19,5 +19,6 @@ export function stubMerkleTreeConfig(
},
},
ism: {},
backlog: [],
});
}

@ -10,11 +10,10 @@ const CACHE_PATH = process.env.RELAYER_CACHE ?? './relayer-cache.json';
async function main() {
const { environment } = await getArgs().argv;
const { core } = await getHyperlaneCore(environment);
const relayer = new HyperlaneRelayer({ core });
// target subset of chains
// const chains = ['ethereum', 'polygon', 'bsc']
const chains = undefined;
// target subset of chains and senders/recipients
const whitelist = undefined;
const relayer = new HyperlaneRelayer({ core, whitelist });
try {
const contents = await readFile(CACHE_PATH, 'utf-8');
@ -26,10 +25,10 @@ async function main() {
console.error(`Failed to load cache from ${CACHE_PATH}`);
}
relayer.start(chains);
relayer.start();
process.once('SIGINT', async () => {
relayer.stop(chains);
relayer.stop();
const cache = JSON.stringify(relayer.cache);
await writeFile(CACHE_PATH, cache, 'utf-8');

@ -204,9 +204,16 @@ export class HyperlaneCore extends HyperlaneApp<CoreFactories> {
mailbox.on<DispatchEvent>(
mailbox.filters.Dispatch(),
(_sender, _destination, _recipient, message, event) => {
const parsed = HyperlaneCore.parseDispatchedMessage(message);
this.logger.info(`Observed message ${parsed.id} on ${originChain}`);
return handler(parsed, event);
const dispatched = HyperlaneCore.parseDispatchedMessage(message);
// add human readable chain names
dispatched.parsed.originChain = this.getOrigin(dispatched);
dispatched.parsed.destinationChain = this.getDestination(dispatched);
this.logger.info(
`Observed message ${dispatched.id} on ${originChain} to ${dispatched.parsed.destinationChain}`,
);
return handler(dispatched, event);
},
);
});

@ -2,13 +2,18 @@ import { ethers, providers } from 'ethers';
import { Logger } from 'pino';
import { z } from 'zod';
import { ChainMap } from '@hyperlane-xyz/sdk';
import {
Address,
ParsedMessage,
assert,
bytes32ToAddress,
messageId,
objMap,
objMerge,
pollAsync,
parseMessage,
promiseObjAll,
sleep,
} from '@hyperlane-xyz/utils';
import { DerivedHookConfig, EvmHookReader } from '../hook/EvmHookReader.js';
@ -31,19 +36,64 @@ const DerivedHookConfigWithAddressSchema =
const DerivedIsmConfigWithAddressSchema =
IsmConfigSchema.and(WithAddressSchema);
const BacklogMessageSchema = z.object({
attempts: z.number(),
lastAttempt: z.number(),
message: z.string(),
dispatchTx: z.string(),
});
const MessageBacklogSchema = z.array(BacklogMessageSchema);
export const RelayerCacheSchema = z.object({
hook: z.record(z.record(DerivedHookConfigWithAddressSchema)),
ism: z.record(z.record(DerivedIsmConfigWithAddressSchema)),
backlog: MessageBacklogSchema,
});
type RelayerCache = z.infer<typeof RelayerCacheSchema>;
type MessageWhitelist = ChainMap<Set<Address>>;
// message must have origin and destination chains in the whitelist
// if whitelist has non-empty address set for chain, message must have sender and recipient in the set
export function messageMatchesWhitelist(
whitelist: MessageWhitelist,
message: ParsedMessage,
): boolean {
const originAddresses = whitelist[message.originChain ?? message.origin];
if (!originAddresses) {
return false;
}
const sender = bytes32ToAddress(message.sender);
if (originAddresses.size !== 0 && !originAddresses.has(sender)) {
return false;
}
const destinationAddresses =
whitelist[message.destinationChain ?? message.destination];
if (!destinationAddresses) {
return false;
}
const recipient = bytes32ToAddress(message.recipient);
if (destinationAddresses.size !== 0 && !destinationAddresses.has(recipient)) {
return false;
}
return true;
}
export class HyperlaneRelayer {
protected multiProvider: MultiProvider;
protected metadataBuilder: BaseMetadataBuilder;
protected readonly core: HyperlaneCore;
protected readonly retryTimeout: number;
protected readonly whitelist: ChainMap<Set<Address>> | undefined;
public backlog: RelayerCache['backlog'];
public cache: RelayerCache | undefined;
protected stopRelayingHandler: ((chains?: ChainName[]) => void) | undefined;
@ -53,21 +103,32 @@ export class HyperlaneRelayer {
constructor({
core,
caching = true,
retryTimeout = 5 * 1000,
retryTimeout = 1000,
whitelist = undefined,
}: {
core: HyperlaneCore;
caching?: boolean;
retryTimeout?: number;
whitelist?: ChainMap<Address[]>;
}) {
this.core = core;
this.retryTimeout = retryTimeout;
this.logger = core.logger.child({ module: 'Relayer' });
this.metadataBuilder = new BaseMetadataBuilder(core);
this.multiProvider = core.multiProvider;
if (whitelist) {
this.whitelist = objMap(
whitelist,
(_chain, addresses) => new Set(addresses),
);
}
this.backlog = [];
if (caching) {
this.cache = {
hook: {},
ism: {},
backlog: [],
};
}
}
@ -152,11 +213,24 @@ export class HyperlaneRelayer {
messageIndex = 0,
message = HyperlaneCore.getDispatchedMessages(dispatchTx)[messageIndex],
): Promise<ethers.ContractReceipt> {
if (this.whitelist) {
// add human readable names for use in whitelist checks
message.parsed = {
originChain: this.core.getOrigin(message),
destinationChain: this.core.getDestination(message),
...message.parsed,
};
assert(
messageMatchesWhitelist(this.whitelist, message.parsed),
`Message ${message.id} does not match whitelist`,
);
}
this.logger.info(`Preparing to relay message ${message.id}`);
const isDelivered = await this.core.isDelivered(message);
if (isDelivered) {
this.logger.debug(`Message ${message.id} already delivered`);
this.logger.info(`Message ${message.id} already delivered`);
return this.core.getProcessedReceipt(message);
}
@ -170,18 +244,14 @@ export class HyperlaneRelayer {
]);
this.logger.debug({ ism, hook }, `Retrieved ISM and hook configs`);
const blockTime = this.multiProvider.getChainMetadata(
message.parsed.destination,
).blocks?.estimateBlockTime;
const waitTime = blockTime ? blockTime * 2 : this.retryTimeout;
const metadata = await pollAsync(
() => this.metadataBuilder.build({ message, ism, hook, dispatchTx }),
waitTime,
12, // 12 attempts
);
const metadata = await this.metadataBuilder.build({
message,
ism,
hook,
dispatchTx,
});
this.logger.info({ message, metadata }, `Relaying message ${message.id}`);
this.logger.info(`Relaying message ${message.id}`);
return this.core.deliver(message, metadata);
}
@ -206,44 +276,92 @@ export class HyperlaneRelayer {
);
}
start(chains = this.core.chains()): void {
assert(!this.stopRelayingHandler, 'Relayer already started');
protected async flushBacklog(): Promise<void> {
while (this.stopRelayingHandler) {
const backlogMsg = this.backlog.shift();
const { removeHandler } = this.core.onDispatch(async (message, event) => {
const destination = message.parsed.destination;
const chain = this.multiProvider.tryGetChainName(destination);
if (!chain) {
this.logger.warn(`Unknown destination ${destination}`);
return;
if (!backlogMsg) {
this.logger.trace('Backlog empty, waiting 1s');
await sleep(1000);
continue;
}
if (!chains.includes(chain)) {
this.logger.info(`Skipping message to chain ${chain}`);
return;
// linear backoff (attempts * retryTimeout)
if (
Date.now() <
backlogMsg.lastAttempt + backlogMsg.attempts * this.retryTimeout
) {
this.backlog.push(backlogMsg);
continue;
}
const dispatchReceipt = await event.getTransactionReceipt();
const processReceipt = await this.relayMessage(
dispatchReceipt,
undefined,
message,
const { message, dispatchTx, attempts } = backlogMsg;
const id = messageId(message);
const parsed = parseMessage(message);
const dispatchMsg = { id, message, parsed };
try {
const dispatchReceipt = await this.multiProvider
.getProvider(parsed.origin)
.getTransactionReceipt(dispatchTx);
// TODO: handle batching
await this.relayMessage(dispatchReceipt, undefined, dispatchMsg);
} catch (error) {
this.logger.error(
`Failed to relay message ${id} (attempt #${attempts + 1})`,
);
this.backlog.push({
...backlogMsg,
attempts: attempts + 1,
lastAttempt: Date.now(),
});
}
}
}
this.logger.info(
`Message ${message.id} was processed in ${
this.multiProvider.tryGetExplorerTxUrl(destination, {
hash: processReceipt.transactionHash,
}) ?? 'tx ' + processReceipt.transactionHash
}`,
protected whitelistChains() {
return this.whitelist ? Object.keys(this.whitelist) : undefined;
}
start(): void {
assert(!this.stopRelayingHandler, 'Relayer already started');
this.backlog = this.cache?.backlog ?? [];
const { removeHandler } = this.core.onDispatch(async (message, event) => {
if (
this.whitelist &&
!messageMatchesWhitelist(this.whitelist, message.parsed)
) {
this.logger.debug(
{ message, whitelist: this.whitelist },
`Skipping message ${message.id} not matching whitelist`,
);
}, chains);
return;
}
this.backlog.push({
attempts: 0,
lastAttempt: Date.now(),
message: message.message,
dispatchTx: event.transactionHash,
});
}, this.whitelistChains());
this.stopRelayingHandler = removeHandler;
// start flushing backlog
void this.flushBacklog();
}
stop(chains = this.core.chains()): void {
stop(): void {
assert(this.stopRelayingHandler, 'Relayer not started');
this.stopRelayingHandler(chains);
this.stopRelayingHandler(this.whitelistChains());
this.stopRelayingHandler = undefined;
if (this.cache) {
this.cache.backlog = this.backlog;
}
}
}

Loading…
Cancel
Save