Reduce Kathy Polling in V2 (#1357)

pull/1364/head
Nam Chu Hoai 2 years ago committed by GitHub
parent 3dc8545b76
commit 49b8c61a41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      typescript/helloworld/src/app/app.ts
  2. 2
      typescript/infra/scripts/helloworld/kathy.ts
  3. 23
      typescript/sdk/src/core/HyperlaneCore.ts
  4. 20
      typescript/utils/src/utils.ts

@ -67,6 +67,12 @@ export class HelloWorldApp<
return this.core.waitForMessageProcessing(receipt); return this.core.waitForMessageProcessing(receipt);
} }
async waitForMessageProcessed(
receipt: ethers.ContractReceipt,
): Promise<void> {
return this.core.waitForMessageProcessed(receipt);
}
async channelStats<From extends Chain>( async channelStats<From extends Chain>(
from: From, from: From,
to: Remotes<Chain, From>, to: Remotes<Chain, From>,

@ -413,7 +413,7 @@ async function sendMessage(
try { try {
await utils.timeout( await utils.timeout(
app.waitForMessageReceipt(receipt), app.waitForMessageProcessed(receipt),
messageReceiptTimeout, messageReceiptTimeout,
'Timeout waiting for message to be received', 'Timeout waiting for message to be received',
); );

@ -133,6 +133,20 @@ export class HyperlaneCore<
}); });
} }
protected async waitForMessageWasProcessed(
message: DispatchedMessage,
): Promise<void> {
const id = utils.messageId(message.message);
const { mailbox } = this.getDestination(message);
await utils.pollAsync(async () => {
const delivered = await mailbox.delivered(id);
if (!delivered) {
throw new Error(`Message ${id} not yet processed`);
}
});
return;
}
getDispatchedMessages(sourceTx: ethers.ContractReceipt): DispatchedMessage[] { getDispatchedMessages(sourceTx: ethers.ContractReceipt): DispatchedMessage[] {
const mailbox = Mailbox__factory.createInterface(); const mailbox = Mailbox__factory.createInterface();
const dispatchLogs = sourceTx.logs const dispatchLogs = sourceTx.logs
@ -161,4 +175,13 @@ export class HyperlaneCore<
const messages = this.getDispatchedMessages(sourceTx); const messages = this.getDispatchedMessages(sourceTx);
return Promise.all(messages.map((msg) => this.waitForProcessReceipt(msg))); return Promise.all(messages.map((msg) => this.waitForProcessReceipt(msg)));
} }
async waitForMessageProcessed(
sourceTx: ethers.ContractReceipt,
): Promise<void> {
const messages = this.getDispatchedMessages(sourceTx);
await Promise.all(
messages.map((msg) => this.waitForMessageWasProcessed(msg)),
);
}
} }

@ -205,6 +205,26 @@ export async function retryAsync<T>(
throw saveError; throw saveError;
} }
export async function pollAsync<T>(
runner: () => Promise<T>,
delayMs = 500,
maxAttempts: number | undefined = undefined,
) {
let attempts = 0;
let saveError;
while (!maxAttempts || attempts < maxAttempts) {
try {
const ret = await runner();
return ret;
} catch (error) {
saveError = error;
attempts += 1;
await sleep(delayMs);
}
}
throw saveError;
}
export function median(a: number[]): number { export function median(a: number[]): number {
const sorted = a.slice().sort(); const sorted = a.slice().sort();
const mid = Math.floor(sorted.length / 2); const mid = Math.floor(sorted.length / 2);

Loading…
Cancel
Save