shard_contract_support
jenya 2 years ago
parent 1189c6f386
commit a14d7b3ef4
  1. 11
      src/indexer/index.ts
  2. 4
      src/indexer/indexer/LogIndexer.ts
  3. 12
      src/indexer/indexer/contracts/ContractIndexer.ts
  4. 3
      src/indexer/indexer/contracts/erc1155/ABI.ts
  5. 7
      src/indexer/indexer/contracts/erc1155/addContract.ts
  6. 7
      src/indexer/indexer/contracts/erc1155/onFinish.ts
  7. 18
      src/indexer/indexer/contracts/erc1155/trackEvents.ts
  8. 3
      src/indexer/indexer/contracts/erc20/ABI.ts
  9. 7
      src/indexer/indexer/contracts/erc20/addContract.ts
  10. 4
      src/indexer/indexer/contracts/erc20/onFinish.ts
  11. 11
      src/indexer/indexer/contracts/erc20/trackEvents.ts
  12. 3
      src/indexer/indexer/contracts/erc721/ABI.ts
  13. 7
      src/indexer/indexer/contracts/erc721/addContract.ts
  14. 5
      src/indexer/indexer/contracts/erc721/onFinish.ts
  15. 13
      src/indexer/indexer/contracts/erc721/trackEvents.ts
  16. 10
      src/indexer/indexer/contracts/utils/ABIManager.ts
  17. 6
      src/indexer/utils/EntityIterator/EntityIterator.ts
  18. 33
      src/indexer/utils/EntityIterator/entities.ts

@ -40,23 +40,24 @@ export const indexer = async () => {
statsIndexer()
}
// todo dry
if (config.indexer.isSyncingLogsEnabled && config.indexer.shards.includes(0)) {
const logIndexer0 = new LogIndexer(0)
logIndexer0.loop()
}
if (config.indexer.isSyncingLogsEnabledShard1 && config.indexer.shards.includes(0)) {
const logIndexer1 = new LogIndexer(0)
if (config.indexer.isSyncingLogsEnabledShard1 && config.indexer.shards.includes(1)) {
const logIndexer1 = new LogIndexer(1)
logIndexer1.loop()
}
if (config.indexer.isSyncingContractsEnabled && config.indexer.shards.includes(0)) {
const contractIndexer0 = new ContractIndexer()
const contractIndexer0 = new ContractIndexer(0)
contractIndexer0.loop()
}
if (config.indexer.isSyncingContractsEnabledShard1 && config.indexer.shards.includes(0)) {
const contractIndexer1 = new ContractIndexer()
if (config.indexer.isSyncingContractsEnabledShard1 && config.indexer.shards.includes(1)) {
const contractIndexer1 = new ContractIndexer(1)
contractIndexer1.loop()
}
}

@ -23,8 +23,8 @@ export class LogIndexer {
readonly store: PostgresStorage
constructor(shardID: ShardID) {
if (shardID !== 0) {
throw new Error('Only shard #0 is currently supported')
if (shardID !== 0 && shardID !== 1) {
throw new Error('Only shards #0 and #1 are currently supported')
}
this.l = logger(module, `shard${shardID}`)

@ -6,6 +6,7 @@ import {tasks} from './tasks'
import {ContractTracker} from 'src/indexer/indexer/contracts/types'
import {PostgresStorage} from 'src/store/postgres'
import {EntityIteratorEntities} from 'src/indexer/utils/EntityIterator/entities'
import {ShardID} from 'src/types'
const syncingIntervalMs = 1000 * 60 * 5
@ -14,9 +15,9 @@ export class ContractIndexer {
readonly ls: Record<string, LoggerModule>
readonly store: PostgresStorage
constructor() {
this.store = stores[0]
this.l = logger(module)
constructor(shardID: ShardID) {
this.store = stores[shardID]
this.l = logger(module, ':Shard' + shardID)
this.l.info(`Created [${tasks.map((t) => t.name).join(', ')}]`)
this.ls = tasks
.map((t) => t.name)
@ -39,7 +40,7 @@ export class ContractIndexer {
const {batchSize, process} = task.addContract
this.ls[task.name].info(`Syncing contracts from block ${startBlock}`)
const contractsIterator = EntityIterator('contracts', {
const contractsIterator = EntityIterator(this.store, 'contracts', {
batchSize,
index: startBlock,
})
@ -71,6 +72,7 @@ export class ContractIndexer {
)
const tokensIterator = EntityIterator(
this.store,
(task.tableName as EntityIteratorEntities) || (task.name as EntityIteratorEntities),
{
batchSize: 1,
@ -91,7 +93,7 @@ export class ContractIndexer {
const startBlock = latestSyncedBlock && latestSyncedBlock > 0 ? latestSyncedBlock + 1 : 0
let latestSyncedTokenBlock = latestSyncedBlock
const logsIterator = EntityIterator('logs', {
const logsIterator = EntityIterator(this.store, 'logs', {
batchSize,
index: startBlock,
address: token.address,

@ -1,5 +1,6 @@
import {ABIManager} from 'src/indexer/indexer/contracts/utils/ABIManager'
import ERC1155ABI from 'src/indexer/indexer/contracts/erc1155/ERC1155ABI.json'
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ShardID} from 'src/types'
export const ABI = ABIManager(ERC1155ABI as IABI)
export const ABIFactory = (shardID: ShardID) => ABIManager(shardID, ERC1155ABI as IABI)

@ -2,10 +2,9 @@ import {Contract, IERC1155} from 'src/types'
import {validator, isUint, isLength} from 'src/utils/validators/validators'
import {logger} from 'src/logger'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {getByIPFSHash} from 'src/indexer/utils/ipfs/index'
const {hasAllSignatures, callAll} = ABI
const l = logger(module, 'erc1155')
// https://eips.ethereum.org/EIPS/eip-20
@ -30,6 +29,8 @@ const initialMeta = {
const initialEmptyMeta = JSON.stringify({})
export const addContract = async (store: PostgresStorage, contract: Contract) => {
const {hasAllSignatures, callAll} = ABIFactory(store.shardID)
if (!hasAllSignatures(expectedMethodsAndEvents, contract.bytecode)) {
return
}
@ -39,7 +40,7 @@ export const addContract = async (store: PostgresStorage, contract: Contract) =>
let metaJSON = initialEmptyMeta
try {
params = await callAll(contract.address, callableMethods)
params = await callAll(store.shardID, contract.address, callableMethods)
const prepareMeta = async () => {
meta = (await getByIPFSHash(params.contractURI)) || initialMeta

@ -1,5 +1,5 @@
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from 'src/indexer/indexer/contracts/erc1155/ABI'
import {ABIFactory} from 'src/indexer/indexer/contracts/erc1155/ABI'
import {logger} from 'src/logger'
import {Address, Filter, IERC20, IERC721TokenID} from 'src/types'
import nodeFetch from 'node-fetch'
@ -7,7 +7,6 @@ import {normalizeAddress} from 'src/utils/normalizeAddress'
import {getByIPFSHash} from 'src/indexer/utils/ipfs/index'
const l = logger(module, 'erc1155:assets')
const {call} = ABI
const filter: Filter = {
limit: 10,
@ -22,6 +21,8 @@ const filter: Filter = {
}
// update balances
export const updateAssets = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating assets`)
let count = 0
const tokensForUpdate = new Set<Address>()
@ -92,6 +93,8 @@ export const updateAssets = async (store: PostgresStorage) => {
}
export const updateBalances = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating balances`)
const tokensForUpdate = new Set<Address>()
let count = 0

@ -1,22 +1,14 @@
import {Log, IERC721, ContractEventType, ContractEvent} from 'src/types'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {logger} from 'src/logger'
const {getEntryByName, decodeLog, call} = ABI
import {zeroAddress} from 'src/indexer/indexer/contracts/utils/zeroAddress'
import {normalizeAddress} from 'src/utils/normalizeAddress'
import {logTime} from 'src/utils/logTime'
const l = logger(module, 'erc1155')
const transferEventName = ContractEventType.TransferSingle
const transferEvent = getEntryByName(transferEventName)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
// todo track transfer batch
const transferBatchEvent = getEntryByName('TransferBatch')!.signature
type IParams = {
token: IERC721
}
@ -34,6 +26,14 @@ mark ownership
*/
export const trackEvents = async (store: PostgresStorage, logs: Log[], {token}: IParams) => {
const {getEntryByName, decodeLog, call} = ABIFactory(store.shardID)
// todo track transfer batch
const transferBatchEvent = getEntryByName('TransferBatch')!.signature
const transferEventName = ContractEventType.TransferSingle
const transferEvent = getEntryByName(transferEventName)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
const filteredLogs = logs.filter(({topics}) => topics.includes(transferEvent))
if (filteredLogs.length > 0) {
const addressesToUpdate = new Set<{address: string; tokenAddress: string; tokenId: string}>() // unique addresses of senders and recipients

@ -1,5 +1,6 @@
import {ABIManager} from 'src/indexer/indexer/contracts/utils/ABIManager'
import ERC20ABI from 'src/indexer/indexer/contracts/erc20/ERC20ABI.json'
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ShardID} from 'src/types'
export const ABI = ABIManager(ERC20ABI as IABI)
export const ABIFactory = (shardID: ShardID) => ABIManager(shardID, ERC20ABI as IABI)

@ -2,9 +2,8 @@ import {Address, ByteCode, Contract, IERC20} from 'src/types'
import {validator, isUint, isLength} from 'src/utils/validators/validators'
import {logger} from 'src/logger'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
const {hasAllSignatures, callAll} = ABI
const l = logger(module, 'erc20')
// https://eips.ethereum.org/EIPS/eip-20
@ -54,6 +53,8 @@ const getProxyAddress = async (store: PostgresStorage, erc20: IERC20) => {
}
export const addContract = async (store: PostgresStorage, contract: Contract) => {
const {hasAllSignatures, callAll} = ABIFactory(store.shardID)
if (!hasAllSignatures(expectedMethodsAndEvents, contract.bytecode)) {
return
}
@ -61,7 +62,7 @@ export const addContract = async (store: PostgresStorage, contract: Contract) =>
let params: Record<typeof callableMethods[number], string>
try {
params = await callAll(contract.address, callableMethods)
params = await callAll(store.shardID, contract.address, callableMethods)
validator({
decimals: () => isUint(+params.decimals),

@ -1,10 +1,9 @@
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from 'src/indexer/indexer/contracts/erc20/ABI'
import {ABIFactory} from 'src/indexer/indexer/contracts/erc20/ABI'
import {logger} from 'src/logger'
import {Address, Filter, IERC20} from 'src/types'
const l = logger(module, 'erc20:balance')
const {call} = ABI
const filter: Filter = {
limit: 100,
@ -19,6 +18,7 @@ const filter: Filter = {
}
// update balances
export const onFinish = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating balances`)
let count = 0
const tokensForUpdate = new Set<Address>()

@ -1,18 +1,14 @@
import {Log, IERC20, ContractEvent, ContractEventType} from 'src/types'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {logger} from 'src/logger'
const {getEntryByName, decodeLog} = ABI
import {zeroAddress} from 'src/indexer/indexer/contracts/utils/zeroAddress'
import {normalizeAddress} from 'src/utils/normalizeAddress'
import {logTime} from 'src/utils/logTime'
const l = logger(module, 'erc20')
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approveSignature = getEntryByName(ContractEventType.Approval)!.signature
type IParams = {
token: IERC20
}
@ -24,6 +20,11 @@ type IParams = {
// todo filter out other topics
export const trackEvents = async (store: PostgresStorage, logs: Log[], {token}: IParams) => {
const {getEntryByName, decodeLog} = ABIFactory(store.shardID)
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approveSignature = getEntryByName(ContractEventType.Approval)!.signature
const transferLogs = logs.filter(({topics}) => topics.includes(transferSignature))
if (transferLogs.length > 0) {

@ -1,5 +1,6 @@
import {ABIManager} from 'src/indexer/indexer/contracts/utils/ABIManager'
import ERC721ABI from 'src/indexer/indexer/contracts/erc721/ERC721ABI.json'
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ShardID} from 'src/types'
export const ABI = ABIManager(ERC721ABI as IABI)
export const ABIFactory = (shardID: ShardID) => ABIManager(shardID, ERC721ABI as IABI)

@ -2,9 +2,8 @@ import {Contract, IERC721} from 'src/types'
import {validator, isUint, isLength} from 'src/utils/validators/validators'
import {logger} from 'src/logger'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
const {hasAllSignatures, callAll} = ABI
const l = logger(module, 'erc721')
// https://eips.ethereum.org/EIPS/eip-20
@ -25,6 +24,8 @@ const expectedMethodsAndEvents = [
const callableMethods = ['symbol', 'name']
export const addContract = async (store: PostgresStorage, contract: Contract) => {
const {hasAllSignatures, callAll} = ABIFactory(store.shardID)
if (!hasAllSignatures(expectedMethodsAndEvents, contract.bytecode)) {
return
}
@ -32,7 +33,7 @@ export const addContract = async (store: PostgresStorage, contract: Contract) =>
let params: Record<typeof callableMethods[number], string>
try {
params = await callAll(contract.address, callableMethods)
params = await callAll(store.shardID, contract.address, callableMethods)
validator({
name: () => isLength(params.name, {min: 3, max: 64}),

@ -1,12 +1,11 @@
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from 'src/indexer/indexer/contracts/erc721/ABI'
import {ABIFactory} from 'src/indexer/indexer/contracts/erc721/ABI'
import {logger} from 'src/logger'
import {Address, Filter, IERC20, IERC721TokenID} from 'src/types'
import nodeFetch from 'node-fetch'
import {normalizeAddress} from 'src/utils/normalizeAddress'
const l = logger(module, 'erc721:assets')
const {call} = ABI
const filter: Filter = {
limit: 10,
@ -21,6 +20,8 @@ const filter: Filter = {
}
// update balances
export const onFinish = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating assets`)
let count = 0
const tokensForUpdate = new Set<Address>()

@ -1,19 +1,14 @@
import {Log, IERC721, ContractEventType, ContractEvent} from 'src/types'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {logger} from 'src/logger'
const {getEntryByName, decodeLog, call} = ABI
import {zeroAddress} from 'src/indexer/indexer/contracts/utils/zeroAddress'
import {normalizeAddress} from 'src/utils/normalizeAddress'
import {logTime} from 'src/utils/logTime'
const l = logger(module, 'erc721')
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approvalSignature = getEntryByName(ContractEventType.Approval)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
type IParams = {
token: IERC721
}
@ -25,6 +20,12 @@ type IParams = {
// todo filter out other topics
export const trackEvents = async (store: PostgresStorage, logs: Log[], {token}: IParams) => {
const {getEntryByName, decodeLog, call} = ABIFactory(store.shardID)
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approvalSignature = getEntryByName(ContractEventType.Approval)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
const filteredLogs = logs.filter(({topics}) => topics.includes(transferSignature))
if (filteredLogs.length > 0) {
const addressesToUpdate = new Set<{address: string; tokenAddress: string; tokenId: string}>() // unique addresses of senders and recipients

@ -1,12 +1,12 @@
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ByteCode, Address} from 'src/types'
import {ShardID} from 'src/types'
import Web3 from 'web3'
import * as RPCClient from 'src/indexer/rpc/client'
const web3 = new Web3()
export const ABIManager = (abi: IABI) => {
export const ABIManager = (shardID: ShardID, abi: IABI) => {
const entries = abi
.filter(({type}) => ['function', 'event'].includes(type))
.map((e) => {
@ -60,7 +60,7 @@ export const ABIManager = (abi: IABI) => {
}
const inputs = web3.eth.abi.encodeParameters(entry.inputs || [], params)
const response = await RPCClient.call(0, {
const response = await RPCClient.call(shardID, {
to: address,
data: entry.signature + inputs.slice(2),
})
@ -68,7 +68,7 @@ export const ABIManager = (abi: IABI) => {
return web3.eth.abi.decodeParameters(entry.outputs, response)['0']
}
const callAll = (address: Address, methodsNames: string[]) => {
const callAll = (shardID: ShardID, address: Address, methodsNames: string[]) => {
return Promise.all(
methodsNames.map(async (methodName) => {
const entry = getEntryByName(methodName)
@ -76,7 +76,7 @@ export const ABIManager = (abi: IABI) => {
throw new Error(`${methodName} not found`)
}
const response = await RPCClient.call(0, {
const response = await RPCClient.call(shardID, {
to: address,
data: entry.signature,
})

@ -1,12 +1,14 @@
import {EntityQueryCallbackParams} from './executors'
import {entityQueries, EntityIteratorEntities} from './entities'
import {entityQueriesFactory, EntityIteratorEntities} from './entities'
import {PostgresStorage} from 'src/store/postgres'
export async function* EntityIterator(
store: PostgresStorage,
entity: EntityIteratorEntities,
{index: initialIndex = 0, batchSize = 100, ...rest}: EntityQueryCallbackParams
) {
let index = initialIndex
const entityQueries = entityQueriesFactory(store)
const f = entityQueries[entity]
while (true) {

@ -10,6 +10,7 @@ import {
} from 'src/types'
import {listByBlockNumber, EntityQueryCallback, listByOffset, withEqual} from './executors'
import {stores} from 'src/store'
import {PostgresStorage} from 'src/store/postgres'
export type ContractIndexerTaskEntities = 'erc20' | 'erc721' | 'erc1155'
export type EntityIteratorEntities =
@ -20,20 +21,20 @@ export type EntityIteratorEntities =
| 'erc20BalancesNeedUpdate'
| ContractIndexerTaskEntities
// only shard #0
const store = stores[0]
export const entityQueries: Record<EntityIteratorEntities, EntityQueryCallback> = {
logs: listByBlockNumber<Log>(store.log.getLogs, [withEqual('address')]),
logsAll: listByBlockNumber<Log>(store.log.getLogs),
internalTransactions: listByBlockNumber<InternalTransaction>(
store.internalTransaction.getInternalTransactions
),
contracts: listByBlockNumber<Contract>(store.contract.getContracts),
erc20: listByOffset<IERC20>(store.erc20.getERC20),
erc721: listByOffset<IERC721>(store.erc721.getERC721),
erc1155: listByOffset<IERC1155>(store.erc1155.getERC1155),
erc20BalancesNeedUpdate: listByOffset<IERC20Balance>(store.erc20.getBalances, [
withEqual('needUpdate'),
]),
export const entityQueriesFactory = (store: PostgresStorage) => {
const entityQueries: Record<EntityIteratorEntities, EntityQueryCallback> = {
logs: listByBlockNumber<Log>(store.log.getLogs, [withEqual('address')]),
logsAll: listByBlockNumber<Log>(store.log.getLogs),
internalTransactions: listByBlockNumber<InternalTransaction>(
store.internalTransaction.getInternalTransactions
),
contracts: listByBlockNumber<Contract>(store.contract.getContracts),
erc20: listByOffset<IERC20>(store.erc20.getERC20),
erc721: listByOffset<IERC721>(store.erc721.getERC721),
erc1155: listByOffset<IERC1155>(store.erc1155.getERC1155),
erc20BalancesNeedUpdate: listByOffset<IERC20Balance>(store.erc20.getBalances, [
withEqual('needUpdate'),
]),
}
return entityQueries
}

Loading…
Cancel
Save