Implement staggered provider triggering

Implement pagination log fetching
Improve tests and fix bugs
pull/35/head
J M Rossy 2 years ago
parent cf52a1244d
commit 4479944d3b
  1. 58
      src/features/providers/SmartProvider.test.ts
  2. 165
      src/features/providers/SmartProvider.ts
  3. 10
      src/utils/number.ts
  4. 4
      src/utils/timeout.ts
  5. 4
      src/utils/typeof.ts

@ -7,17 +7,28 @@ import { logger } from '../../utils/logger';
import { HyperlaneSmartProvider, ProviderMethod } from './SmartProvider';
jest.setTimeout(30000);
jest.setTimeout(40000);
const MIN_BLOCK_NUM = 8000000;
const MIN_BLOCK_NUM = 8900000;
const DEFAULT_ACCOUNT = '0x9d525E28Fe5830eE92d7Aa799c4D21590567B595';
const WETH_CONTRACT = '0xb4fbf271143f4fbf7b91a5ded31805e42b2208d6';
const WETH_TRANSFER_TOPIC0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';
const TRANSFER_TX_HASH = '0x45a586f90ffd5d0f8e618f0f3703b14c2c9e4611af6231d6fed32c62776b6c1b';
const goerliRpcConfig = {
...chainMetadata.goerli.publicRpcUrls[0],
pagination: {
blocks: 1000,
from: MIN_BLOCK_NUM,
},
};
const justExplorersConfig: ChainMetadata = { ...chainMetadata.goerli, publicRpcUrls: [] };
const justRpcsConfig: ChainMetadata = { ...chainMetadata.goerli, blockExplorers: [] };
const combinedConfig: ChainMetadata = { ...chainMetadata.goerli };
const justRpcsConfig: ChainMetadata = {
...chainMetadata.goerli,
publicRpcUrls: [goerliRpcConfig],
blockExplorers: [],
};
const combinedConfig: ChainMetadata = { ...chainMetadata.goerli, publicRpcUrls: [goerliRpcConfig] };
const configs: [string, ChainMetadata][] = [
['Just Explorers', justExplorersConfig],
['Just RPCs', justRpcsConfig],
@ -99,15 +110,42 @@ describe('SmartProvider', () => {
});
itDoesIfSupported(ProviderMethod.GetLogs, async () => {
const result = await provider.getLogs({
logger.debug('Testing logs with no from/to range');
const result1 = await provider.getLogs({
address: WETH_CONTRACT,
topics: [WETH_TRANSFER_TOPIC0],
});
// console.log(result);
console.log(JSON.stringify(result.slice(0, 20)));
logger.debug('Logs found', result.length);
expect(result.length).toBeGreaterThan(100);
expect(areAddressesEqual(result[0].address, WETH_CONTRACT)).toBeTruthy();
logger.debug('Logs found', result1.length);
expect(result1.length).toBeGreaterThan(100);
expect(areAddressesEqual(result1[0].address, WETH_CONTRACT)).toBeTruthy();
logger.debug('Testing logs with small from/to range');
const result2 = await provider.getLogs({
address: WETH_CONTRACT,
topics: [WETH_TRANSFER_TOPIC0],
fromBlock: MIN_BLOCK_NUM,
toBlock: MIN_BLOCK_NUM + 100,
});
expect(result2.length).toBeGreaterThan(10);
expect(areAddressesEqual(result2[0].address, WETH_CONTRACT)).toBeTruthy();
try {
logger.debug('Testing logs with too large from/to range');
const result3 = await provider.getLogs({
address: WETH_CONTRACT,
topics: [WETH_TRANSFER_TOPIC0],
fromBlock: MIN_BLOCK_NUM,
toBlock: 'latest',
});
if (config === justRpcsConfig) {
expect(false).toBe('Should throw error about minQueryable');
} else {
expect(result3.length).toBeGreaterThan(10);
expect(areAddressesEqual(result3[0].address, WETH_CONTRACT)).toBeTruthy();
}
} catch (error: any) {
expect(error.message).toMatch(/(.*)too many queries(.*)/);
}
});
itDoesIfSupported(ProviderMethod.EstimateGas, async () => {

@ -1,9 +1,15 @@
import { providers, utils } from 'ethers';
import { BigNumber, providers, utils } from 'ethers';
import { ChainMetadata, ExplorerFamily, objFilter } from '@hyperlane-xyz/sdk';
import { logger } from '../../utils/logger';
import { isBigNumberish } from '../../utils/number';
import { chunk } from '../../utils/string';
import { sleep } from '../../utils/timeout';
import { isNullish } from '../../utils/typeof';
const PROVIDER_STAGGER_DELAY_MS = 1000; // 1 seconds
const PROVIDER_TIMEOUT_MARKER = '__PROVIDER_TIMEOUT__';
type RpcConfigWithConnectionInfo = ChainMetadata['publicRpcUrls'][number] & {
connection?: utils.ConnectionInfo;
@ -85,40 +91,64 @@ export class HyperlaneSmartProvider extends providers.BaseProvider implements IP
return chainMetadataToProviderNetwork(this.chainMetadata);
}
/**
* This perform method will trigger any providers that support the method
* one at a time in preferential order. If one is slow to respond, the next is triggered.
*/
async perform(method: string, params: { [name: string]: any }): Promise<any> {
const allProviders = [...this.explorerProviders, ...this.rpcProviders];
if (!allProviders.length) throw new Error('No providers available');
if (!this.supportedMethods.includes(method as ProviderMethod))
throw new Error(`No providers available for method ${method}`);
const supportedProviders = allProviders.filter((p) =>
p.supportedMethods.includes(method as ProviderMethod),
);
if (!supportedProviders.length) throw new Error(`No providers available for method ${method}`);
let index = 0;
const maxIndex = supportedProviders.length - 1;
const providerResultPromises: Promise<any>[] = [];
// TODO consider implementing quorum and/or retry logic here similar to FallbackProvider/RetryProvider
// TODO trigger next provider if current one takes too long
// This will help spread load across providers and ease rate limiting
for (const provider of supportedProviders) {
const providerUrl =
provider instanceof providers.JsonRpcProvider ? provider.connection.url : provider.baseUrl;
try {
const result = await provider.perform(method, params);
if (result === null || result === undefined) {
throw new Error(`Nullish result from provider using url: ${providerUrl}`);
while (true) {
if (index <= maxIndex) {
// Trigger the next provider in line
const provider = supportedProviders[index];
const providerUrl = provider.getBaseUrl();
const resultPromise = provider.perform(method, params);
providerResultPromises.push(resultPromise);
const timeoutPromise = sleep(PROVIDER_STAGGER_DELAY_MS, PROVIDER_TIMEOUT_MARKER);
const result = await Promise.race([resultPromise, timeoutPromise]);
if (isNullish(result)) {
logger.error(
`Nullish result from provider using ${providerUrl}. Triggering next available provider`,
);
index += 1;
} else if (result === PROVIDER_TIMEOUT_MARKER) {
logger.warn(
`Slow response from provider using ${providerUrl}. Triggering next available provider`,
);
index += 1;
} else {
// Result looks good
return result;
}
} else {
// All providers already triggered, wait for one to complete
const timeoutPromise = sleep(PROVIDER_STAGGER_DELAY_MS * 12, PROVIDER_TIMEOUT_MARKER);
const result = await Promise.race([...providerResultPromises, timeoutPromise]);
if (isNullish(result) || result === PROVIDER_TIMEOUT_MARKER) {
throw new Error(`All providers failed or timed out for method ${method}`);
} else {
return result;
}
return result;
} catch (error) {
logger.error('Error from provider using url:', providerUrl, error);
}
}
throw new Error(`All providers failed for method ${method}`);
}
}
// Used for crude rate-limiting of explorer queries without API keys
const hostToLastQueried: Record<string, number> = {};
const ETHERSCAN_THROTTLE_TIME = 5100; // 5.1 seconds
const ETHERSCAN_THROTTLE_TIME = 5200; // 5.2 seconds
export class HyperlaneEtherscanProvider
extends providers.EtherscanProvider
@ -165,7 +195,10 @@ export class HyperlaneEtherscanProvider
try {
const lastExplorerQuery = hostToLastQueried[hostname] || 0;
const waitTime = ETHERSCAN_THROTTLE_TIME - (Date.now() - lastExplorerQuery);
if (waitTime > 0) await sleep(waitTime);
if (waitTime > 0) {
logger.debug(`HyperlaneEtherscanProvider waiting ${waitTime}ms to avoid rate limit`);
await sleep(waitTime);
}
const result = await super.fetch(module, params, post);
return result;
} finally {
@ -174,25 +207,111 @@ export class HyperlaneEtherscanProvider
}
async perform(method: string, params: any): Promise<any> {
logger.debug('HyperlaneEtherscanProvider performing method:', method);
if (!this.supportedMethods.includes(method as ProviderMethod))
throw new Error(`Unsupported method ${method}`);
return super.perform(method, params);
}
}
const NUM_LOG_BLOCK_RANGES_TO_QUERY = 10;
const NUM_PARALLEL_LOG_QUERIES = 5;
export class HyperlaneJsonRpcProvider
extends providers.StaticJsonRpcProvider
implements IProviderMethods
{
public readonly supportedMethods = AllProviderMethods;
constructor(rpcConfig: RpcConfigWithConnectionInfo, network: providers.Network) {
constructor(public readonly rpcConfig: RpcConfigWithConnectionInfo, network: providers.Network) {
super(rpcConfig.connection ?? rpcConfig.http, network);
}
send(method: string, params: Array<any>): Promise<any> {
// TODO implement smart chunking here based on rpcConfig values
return super.send(method, params);
async perform(method: string, params: any): Promise<any> {
logger.debug('HyperlaneJsonRpcProvider performing method:', method);
if (method === ProviderMethod.GetLogs) {
return this.performGetLogs(params);
} else {
return super.perform(method, params);
}
}
async performGetLogs(params: { filter: providers.Filter }) {
const deferToSuper = () => super.perform(ProviderMethod.GetLogs, params);
const paginationOptions = this.rpcConfig.pagination;
if (!paginationOptions || !params.filter) return deferToSuper();
const { fromBlock, toBlock, address, topics } = params.filter;
// TODO update when sdk is updated
const { blocks: maxBlockRange, from: minBlockNumber } = paginationOptions;
if (!maxBlockRange && isNullish(minBlockNumber)) return deferToSuper();
const currentBlockNumber = await super.perform(ProviderMethod.GetBlockNumber, null);
let endBlock: number;
if (isNullish(toBlock) || toBlock === 'latest') {
endBlock = currentBlockNumber;
} else if (isBigNumberish(toBlock)) {
endBlock = BigNumber.from(toBlock).toNumber();
} else {
return deferToSuper();
}
const minQueryable = maxBlockRange
? endBlock - maxBlockRange * NUM_LOG_BLOCK_RANGES_TO_QUERY + 1
: 0;
let startBlock: number;
if (fromBlock === 'earliest') {
startBlock = 0;
} else if (isBigNumberish(fromBlock)) {
startBlock = BigNumber.from(fromBlock).toNumber();
} else if (isNullish(fromBlock)) {
startBlock = Math.max(minQueryable, minBlockNumber ?? 0);
} else {
return deferToSuper();
}
if (startBlock >= endBlock)
throw new Error(`Invalid range ${startBlock} - ${endBlock}: start >= end`);
if (minBlockNumber && startBlock < minBlockNumber)
throw new Error(`Invalid start ${startBlock}: below rpc minBlockNumber ${minBlockNumber}`);
if (startBlock < minQueryable) {
throw new Error(`Invalid range ${startBlock} - ${endBlock}: requires too many queries`);
}
const blockChunkRange = maxBlockRange || endBlock - startBlock;
const blockChunks: [number, number][] = [];
for (let from = startBlock; from <= endBlock; from += blockChunkRange) {
const to = Math.min(from + blockChunkRange - 1, endBlock);
blockChunks.push([from, to]);
}
let combinedResults: Array<providers.Log> = [];
const requestChunks = chunk(blockChunks, NUM_PARALLEL_LOG_QUERIES);
for (const reqChunk of requestChunks) {
const resultPromises = reqChunk.map(
(blockChunk) =>
super.perform(ProviderMethod.GetLogs, {
filter: {
address,
topics,
fromBlock: BigNumber.from(blockChunk[0]).toHexString(),
toBlock: BigNumber.from(blockChunk[1]).toHexString(),
},
}) as Promise<Array<providers.Log>>,
);
const results = await Promise.all(resultPromises);
combinedResults = [...combinedResults, ...results.flat()];
}
return combinedResults;
}
getBaseUrl(): string {
return this.connection.url;
}
}

@ -1,6 +1,7 @@
import { BigNumber, BigNumberish } from 'ethers';
import { logger } from './logger';
import { isNullish } from './typeof';
export function tryToDecimalNumber(value: BigNumberish) {
try {
@ -16,3 +17,12 @@ export function toDecimalNumber(value: BigNumberish) {
if (result === null || result === undefined) throw new Error(`Error parsing hex number ${value}`);
return result;
}
export function isBigNumberish(value: any): value is BigNumberish {
try {
if (isNullish(value)) return false;
return BigNumber.from(value)._isBigNumber;
} catch (error) {
return false;
}
}

@ -40,8 +40,8 @@ export async function fetchWithTimeout(
return response;
}
export function sleep(milliseconds: number) {
return new Promise((resolve) => setTimeout(() => resolve(true), milliseconds));
export function sleep(milliseconds: number, resolveValue: any = true) {
return new Promise((resolve) => setTimeout(() => resolve(resolveValue), milliseconds));
}
export const PROMISE_TIMEOUT = '__promise_timeout__';

@ -0,0 +1,4 @@
export function isNullish<T>(val: T) {
if (val === null || val === undefined) return true;
else return false;
}
Loading…
Cancel
Save