Compare commits

...

5 Commits

Author SHA1 Message Date
jenya 152c90f965 ++ 2 years ago
jenya 1f548bbc26 ++ 2 years ago
jenya d717b3f7ad api shard1 support 2 years ago
jenya a14d7b3ef4 ++ 2 years ago
jenya 1189c6f386 shard1 contract indexer, delete old internal transactions over 7 days 2 years ago
  1. 5
      mainnet.env.example
  2. 2525
      price-abi.json
  3. 46
      src/api/controllers/ERC1155.ts
  4. 26
      src/api/controllers/ERC20.ts
  5. 30
      src/api/controllers/ERC721.ts
  6. 2
      src/api/index.ts
  7. 25
      src/api/rest/routes/ERC1155.ts
  8. 15
      src/api/rest/routes/ERC20.ts
  9. 15
      src/api/rest/routes/ERC721.ts
  10. 6
      src/api/rest/server.ts
  11. 5
      src/config.ts
  12. 13
      src/indexer/index.ts
  13. 4
      src/indexer/indexer/LogIndexer.ts
  14. 12
      src/indexer/indexer/contracts/ContractIndexer.ts
  15. 3
      src/indexer/indexer/contracts/erc1155/ABI.ts
  16. 7
      src/indexer/indexer/contracts/erc1155/addContract.ts
  17. 7
      src/indexer/indexer/contracts/erc1155/onFinish.ts
  18. 18
      src/indexer/indexer/contracts/erc1155/trackEvents.ts
  19. 3
      src/indexer/indexer/contracts/erc20/ABI.ts
  20. 7
      src/indexer/indexer/contracts/erc20/addContract.ts
  21. 5
      src/indexer/indexer/contracts/erc20/onFinish.ts
  22. 11
      src/indexer/indexer/contracts/erc20/trackEvents.ts
  23. 3
      src/indexer/indexer/contracts/erc721/ABI.ts
  24. 7
      src/indexer/indexer/contracts/erc721/addContract.ts
  25. 5
      src/indexer/indexer/contracts/erc721/onFinish.ts
  26. 13
      src/indexer/indexer/contracts/erc721/trackEvents.ts
  27. 10
      src/indexer/indexer/contracts/utils/ABIManager.ts
  28. 6
      src/indexer/utils/EntityIterator/EntityIterator.ts
  29. 33
      src/indexer/utils/EntityIterator/entities.ts
  30. 8
      src/store/postgres/Address.ts
  31. 7
      src/store/postgres/InternalTransaction.ts
  32. 16
      src/store/postgres/index.ts

@ -2,14 +2,15 @@
CHAIN=mainnet
# api
API_IS_ENABLED=1
API_REST_IS_ENABLED=1
API_GRPC_IS_ENABLED=1
API_WS_IS_ENABLED=1
#shard ids served by api comma separated 0,1,2,3
API_SHARDS=0,1,2,3
# enable LRU memory cache
API_IS_CACHE_ENABLED=1
API_REST_ACCESS_KEY=12345
# API_IS_CACHE_ENABLED=0
# API_REST_ACCESS_KEY=12345
# Indexer
INDEXER_IS_ENABLED=1

File diff suppressed because one or more lines are too long

@ -4,61 +4,79 @@ import {withCache} from 'src/api/controllers/cache'
import {validator} from 'src/utils/validators/validators'
import {isAddress, isShard} from 'src/utils/validators'
export async function getAllERC1155(): Promise<IERC1155[] | null> {
export async function getAllERC1155(shardID: ShardID): Promise<IERC1155[] | null> {
validator({
shardID: isShard(shardID),
})
return await withCache(
['getAllERC1155', arguments],
() => stores[0].erc1155.getAllERC1155(),
[shardID, 'getAllERC1155', arguments],
() => stores[shardID].erc1155.getAllERC1155(),
1000 * 60 * 15
)
}
export async function getUserERC1155Balances(address: Address): Promise<IERC20Balance[] | null> {
export async function getUserERC1155Balances(
shardID: ShardID,
address: Address
): Promise<IERC20Balance[] | null> {
validator({
address: isAddress(address),
shardID: isShard(shardID),
})
return await withCache(
['getUserERC1155Balances', arguments],
() => stores[0].erc1155.getUserBalances(address),
[shardID, 'getUserERC1155Balances', arguments],
() => stores[shardID].erc1155.getUserBalances(address),
1000 * 2
)
}
export async function getTokenERC1155Balances(address: Address): Promise<IERC20Balance[] | null> {
export async function getTokenERC1155Balances(
shardID: ShardID,
address: Address
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
})
return await withCache(
['getTokenERC1155Balances', arguments],
() => stores[0].erc1155.getTokenBalances(address),
[shardID, 'getTokenERC1155Balances', arguments],
() => stores[shardID].erc1155.getTokenBalances(address),
1000 * 10
)
}
export async function getTokenERC1155Assets(address: Address): Promise<IERC20Balance[] | null> {
export async function getTokenERC1155Assets(
shardID: ShardID,
address: Address
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
})
return await withCache(
['getTokenERC1155Assets', arguments],
() => stores[0].erc1155.getTokenAssets(address),
[shardID, 'getTokenERC1155Assets', arguments],
() => stores[shardID].erc1155.getTokenAssets(address),
1000 * 60 * 5
)
}
export async function getTokenERC1155AssetDetails(
shardID: ShardID,
address: Address,
tokenID: string
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
})
return await withCache(
['getTokenERC1155Assets', arguments],
() => stores[0].erc1155.getTokenAssetDetails(address, tokenID),
[shardID, 'getTokenERC1155Assets', arguments],
() => stores[shardID].erc1155.getTokenAssetDetails(address, tokenID),
1000 * 60 * 5
)
}

@ -4,40 +4,50 @@ import {withCache} from 'src/api/controllers/cache'
import {validator} from 'src/utils/validators/validators'
import {isAddress, isShard, isOffset, isLimit} from 'src/utils/validators'
export async function getAllERC20(): Promise<IERC20[] | null> {
export async function getAllERC20(shardID: ShardID): Promise<IERC20[] | null> {
validator({
shardID: isShard(shardID),
})
return await withCache(
['getAllERC20', arguments],
() => stores[0].erc20.getAllERC20(),
[shardID, 'getAllERC20', arguments],
() => stores[shardID].erc20.getAllERC20(),
1000 * 60 * 5
)
}
export async function getUserERC20Balances(address: Address): Promise<IERC20Balance[] | null> {
export async function getUserERC20Balances(
shardID: ShardID,
address: Address
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
})
return await withCache(
['getAllERC20', arguments],
() => stores[0].erc20.getUserBalances(address),
[shardID, 'getAllERC20', arguments],
() => stores[shardID].erc20.getUserBalances(address),
1000 * 60 * 5
)
}
export async function getERC20TokenHolders(
shardID: ShardID,
address: Address,
limit = 100,
offset = 0
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
offset: isOffset(offset),
limit: isLimit(limit),
})
return await withCache(
['getERC20TokenHolders', arguments],
() => stores[0].erc20.getHolders(address, limit, offset),
[shardID, 'getERC20TokenHolders', arguments],
() => stores[shardID].erc20.getHolders(address, limit, offset),
1000 * 10
)
}

@ -4,34 +4,46 @@ import {withCache} from 'src/api/controllers/cache'
import {validator} from 'src/utils/validators/validators'
import {isAddress, isShard} from 'src/utils/validators'
export async function getAllERC721(): Promise<IERC20[] | null> {
export async function getAllERC721(shardID: ShardID): Promise<IERC20[] | null> {
validator({
shardID: isShard(shardID),
})
return await withCache(
['getAllERC721', arguments],
() => stores[0].erc721.getAllERC721(),
[shardID, 'getAllERC721', arguments],
() => stores[shardID].erc721.getAllERC721(),
1000 * 60 * 5
)
}
export async function getUserERC721Assets(address: Address): Promise<IERC20Balance[] | null> {
export async function getUserERC721Assets(
shardID: ShardID,
address: Address
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
})
return await withCache(
['getUserERC721Assets', arguments],
() => stores[0].erc721.getUserAssets(address),
[shardID, 'getUserERC721Assets', arguments],
() => stores[shardID].erc721.getUserAssets(address),
1000 * 2
)
}
export async function getTokenERC721Assets(address: Address): Promise<IERC20Balance[] | null> {
export async function getTokenERC721Assets(
shardID: ShardID,
address: Address
): Promise<IERC20Balance[] | null> {
validator({
shardID: isShard(shardID),
address: isAddress(address),
})
return await withCache(
['getTokenERC721Assets', arguments],
() => stores[0].erc721.getTokenAssets(address),
[shardID, 'getTokenERC721Assets', arguments],
() => stores[shardID].erc721.getTokenAssets(address),
1000 * 2
)
}

@ -11,6 +11,6 @@ export const api = async () => {
await warmUpCache()
await RESTServer()
await GRPCServer()
// await GRPCServer()
await webSocketServer()
}

@ -1,44 +1,51 @@
import {Response, Request, Router, NextFunction} from 'express'
import * as controllers from 'src/api/controllers'
import {catchAsync} from 'src/api/rest/utils'
import {ShardID} from 'src/types/blockchain'
export const erc1155Router = Router({mergeParams: true})
erc1155Router.get('/', catchAsync(getAllERC1155))
export async function getAllERC1155(req: Request, res: Response, next: NextFunction) {
const data = await controllers.getAllERC1155()
const {shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getAllERC1155(s)
next(data)
}
erc1155Router.get('/address/:address/balances', catchAsync(getUserERC1155Balances))
export async function getUserERC1155Balances(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const data = await controllers.getUserERC1155Balances(address)
const {address, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getUserERC1155Balances(s, address)
next(data)
}
erc1155Router.get('/token/:address/balances', catchAsync(getTokenERC1155Balances))
export async function getTokenERC1155Balances(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const data = await controllers.getTokenERC1155Balances(address)
const {address, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getTokenERC1155Balances(s, address)
next(data)
}
erc1155Router.get('/token/:address/assets', catchAsync(getTokenERC1155Assets))
export async function getTokenERC1155Assets(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const data = await controllers.getTokenERC1155Assets(address)
const {address, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getTokenERC1155Assets(s, address)
next(data)
}
erc1155Router.get('/token/:address/asset/:tokenID', catchAsync(getTokenERC1155AssetDetails))
export async function getTokenERC1155AssetDetails(req: Request, res: Response, next: NextFunction) {
const {address, tokenID} = req.params
const data = await controllers.getTokenERC1155AssetDetails(address, tokenID)
const {address, tokenID, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getTokenERC1155AssetDetails(s, address, tokenID)
next(data)
}

@ -1,33 +1,38 @@
import {Response, Request, Router, NextFunction} from 'express'
import * as controllers from 'src/api/controllers'
import {catchAsync} from 'src/api/rest/utils'
import {ShardID} from 'src/types/blockchain'
export const erc20Router = Router({mergeParams: true})
erc20Router.get('/', catchAsync(getAllERC20))
export async function getAllERC20(req: Request, res: Response, next: NextFunction) {
const data = await controllers.getAllERC20()
const {shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getAllERC20(s)
next(data)
}
erc20Router.get('/address/:address/balances', catchAsync(getUserERC20Balances))
export async function getUserERC20Balances(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const data = await controllers.getUserERC20Balances(address)
const {address, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getUserERC20Balances(s, address)
next(data)
}
erc20Router.get('/token/:address/holders', catchAsync(getERC20TokenHolders))
export async function getERC20TokenHolders(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const {address, shardID} = req.params
const s = +shardID as ShardID
const {offset, limit} = req.query
const filter = {
offset: (+offset! as number) || 0,
limit: (+limit! as number) || 100,
}
const data = await controllers.getERC20TokenHolders(address, filter.limit, filter.offset)
const data = await controllers.getERC20TokenHolders(s, address, filter.limit, filter.offset)
next(data)
}

@ -1,28 +1,33 @@
import {Response, Request, Router, NextFunction} from 'express'
import * as controllers from 'src/api/controllers'
import {catchAsync} from 'src/api/rest/utils'
import {ShardID} from 'src/types/blockchain'
export const erc721Router = Router({mergeParams: true})
erc721Router.get('/', catchAsync(getAllERC721))
export async function getAllERC721(req: Request, res: Response, next: NextFunction) {
const data = await controllers.getAllERC721()
const {shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getAllERC721(s)
next(data)
}
erc721Router.get('/address/:address/balances', catchAsync(getUserERC721Assets))
export async function getUserERC721Assets(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const data = await controllers.getUserERC721Assets(address)
const {address, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getUserERC721Assets(s, address)
next(data)
}
erc721Router.get('/token/:address/balances', catchAsync(getTokenERC721Assets))
export async function getTokenERC721Assets(req: Request, res: Response, next: NextFunction) {
const {address} = req.params
const data = await controllers.getTokenERC721Assets(address)
const {address, shardID} = req.params
const s = +shardID as ShardID
const data = await controllers.getTokenERC721Assets(s, address)
next(data)
}

@ -65,15 +65,15 @@ export const RESTServer = async () => {
mainRouter0.use('/address', addressRouter)
mainRouter0.use('/internalTransaction', internalTransactionRouter)
mainRouter0.use('/logs', logsRouter)
mainRouter0.use('/erc20', erc20Router, transport)
mainRouter0.use('/erc721', erc721Router, transport)
mainRouter0.use('/erc1155', erc1155Router, transport)
const routerWithShards0 = Router({mergeParams: true})
routerWithShards0.use('/shard/:shardID', mainRouter0, transport)
routerWithShards0.use('/signature', signatureRouter, transport)
routerWithShards0.use('/price', priceRouter, transport)
routerWithShards0.use('/metrics', metricsRouter, transport)
routerWithShards0.use('/erc20', erc20Router, transport)
routerWithShards0.use('/erc721', erc721Router, transport)
routerWithShards0.use('/erc1155', erc1155Router, transport)
routerWithShards0.use('/1wallet', oneWalletMetricsRouter, transport)
if (config.api.json_rpc.isEnabled) {

@ -93,12 +93,17 @@ export const config = {
isEnabled: toBool(process.env.INDEXER_IS_ENABLED || '0'),
isSyncingBlocksEnabled: toBool(process.env.INDEXER_BLOCKS_IS_ENABLED || '0'),
isSyncingLogsEnabled: toBool(process.env.INDEXER_LOGS_IS_ENABLED || '0'),
isSyncingLogsEnabledShard1: toBool(process.env.INDEXER_LOGS_IS_ENABLED_SHARD1 || '0'),
isSyncingContractsEnabled: toBool(process.env.INDEXER_CONTRACTS_IS_ENABLED || '0'),
isSyncingContractsEnabledShard1: toBool(process.env.INDEXER_CONTRACTS_IS_ENABLED_SHARD1 || '0'),
isSyncedThreshold: +(process.env.INDEXER_IS_SYNCED_THRESHOLD || 10),
trackContractTypes: getCommaSeparatedList(process.env.INDEXER_CONTRACTS_TYPES),
initialBlockSyncingHeight: +(process.env.INDEXER_INITIAL_BLOCK_SYNCING_HEIGHT || 0),
// set to the height where smart contracts were introduced on the chain
initialLogsSyncingHeight: +(process.env.INDEXER_LOG_INITIAL_BLOCK_SYNCING_HEIGHT || 3500000),
initialLogsSyncingHeightShard1: +(
process.env.INDEXER_LOG_INITIAL_BLOCK_SYNCING_HEIGHT_SHARD1 || 0
),
batchCount: +(process.env.INDEXER_BATCH_COUNT || 100),
blockIndexerBlockRange: +(process.env.BLOCK_INDEXER_BLOCK_RANGE || 10),
rpc: {

@ -40,15 +40,26 @@ export const indexer = async () => {
statsIndexer()
}
// todo dry
if (config.indexer.isSyncingLogsEnabled && config.indexer.shards.includes(0)) {
const logIndexer0 = new LogIndexer(0)
logIndexer0.loop()
}
if (config.indexer.isSyncingLogsEnabledShard1 && config.indexer.shards.includes(1)) {
const logIndexer1 = new LogIndexer(1)
logIndexer1.loop()
}
if (config.indexer.isSyncingContractsEnabled && config.indexer.shards.includes(0)) {
const contractIndexer0 = new ContractIndexer()
const contractIndexer0 = new ContractIndexer(0)
contractIndexer0.loop()
}
if (config.indexer.isSyncingContractsEnabledShard1 && config.indexer.shards.includes(1)) {
const contractIndexer1 = new ContractIndexer(1)
contractIndexer1.loop()
}
}
const checkChainID = async (shardID: ShardID) => {

@ -23,8 +23,8 @@ export class LogIndexer {
readonly store: PostgresStorage
constructor(shardID: ShardID) {
if (shardID !== 0) {
throw new Error('Only shard #0 is currently supported')
if (shardID !== 0 && shardID !== 1) {
throw new Error('Only shards #0 and #1 are currently supported')
}
this.l = logger(module, `shard${shardID}`)

@ -6,6 +6,7 @@ import {tasks} from './tasks'
import {ContractTracker} from 'src/indexer/indexer/contracts/types'
import {PostgresStorage} from 'src/store/postgres'
import {EntityIteratorEntities} from 'src/indexer/utils/EntityIterator/entities'
import {ShardID} from 'src/types'
const syncingIntervalMs = 1000 * 60 * 5
@ -14,9 +15,9 @@ export class ContractIndexer {
readonly ls: Record<string, LoggerModule>
readonly store: PostgresStorage
constructor() {
this.store = stores[0]
this.l = logger(module)
constructor(shardID: ShardID) {
this.store = stores[shardID]
this.l = logger(module, ':Shard' + shardID)
this.l.info(`Created [${tasks.map((t) => t.name).join(', ')}]`)
this.ls = tasks
.map((t) => t.name)
@ -39,7 +40,7 @@ export class ContractIndexer {
const {batchSize, process} = task.addContract
this.ls[task.name].info(`Syncing contracts from block ${startBlock}`)
const contractsIterator = EntityIterator('contracts', {
const contractsIterator = EntityIterator(this.store, 'contracts', {
batchSize,
index: startBlock,
})
@ -71,6 +72,7 @@ export class ContractIndexer {
)
const tokensIterator = EntityIterator(
this.store,
(task.tableName as EntityIteratorEntities) || (task.name as EntityIteratorEntities),
{
batchSize: 1,
@ -91,7 +93,7 @@ export class ContractIndexer {
const startBlock = latestSyncedBlock && latestSyncedBlock > 0 ? latestSyncedBlock + 1 : 0
let latestSyncedTokenBlock = latestSyncedBlock
const logsIterator = EntityIterator('logs', {
const logsIterator = EntityIterator(this.store, 'logs', {
batchSize,
index: startBlock,
address: token.address,

@ -1,5 +1,6 @@
import {ABIManager} from 'src/indexer/indexer/contracts/utils/ABIManager'
import ERC1155ABI from 'src/indexer/indexer/contracts/erc1155/ERC1155ABI.json'
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ShardID} from 'src/types'
export const ABI = ABIManager(ERC1155ABI as IABI)
export const ABIFactory = (shardID: ShardID) => ABIManager(shardID, ERC1155ABI as IABI)

@ -2,10 +2,9 @@ import {Contract, IERC1155} from 'src/types'
import {validator, isUint, isLength} from 'src/utils/validators/validators'
import {logger} from 'src/logger'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {getByIPFSHash} from 'src/indexer/utils/ipfs/index'
const {hasAllSignatures, callAll} = ABI
const l = logger(module, 'erc1155')
// https://eips.ethereum.org/EIPS/eip-20
@ -30,6 +29,8 @@ const initialMeta = {
const initialEmptyMeta = JSON.stringify({})
export const addContract = async (store: PostgresStorage, contract: Contract) => {
const {hasAllSignatures, callAll} = ABIFactory(store.shardID)
if (!hasAllSignatures(expectedMethodsAndEvents, contract.bytecode)) {
return
}
@ -39,7 +40,7 @@ export const addContract = async (store: PostgresStorage, contract: Contract) =>
let metaJSON = initialEmptyMeta
try {
params = await callAll(contract.address, callableMethods)
params = await callAll(store.shardID, contract.address, callableMethods)
const prepareMeta = async () => {
meta = (await getByIPFSHash(params.contractURI)) || initialMeta

@ -1,5 +1,5 @@
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from 'src/indexer/indexer/contracts/erc1155/ABI'
import {ABIFactory} from 'src/indexer/indexer/contracts/erc1155/ABI'
import {logger} from 'src/logger'
import {Address, Filter, IERC20, IERC721TokenID} from 'src/types'
import nodeFetch from 'node-fetch'
@ -7,7 +7,6 @@ import {normalizeAddress} from 'src/utils/normalizeAddress'
import {getByIPFSHash} from 'src/indexer/utils/ipfs/index'
const l = logger(module, 'erc1155:assets')
const {call} = ABI
const filter: Filter = {
limit: 10,
@ -22,6 +21,8 @@ const filter: Filter = {
}
// update balances
export const updateAssets = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating assets`)
let count = 0
const tokensForUpdate = new Set<Address>()
@ -92,6 +93,8 @@ export const updateAssets = async (store: PostgresStorage) => {
}
export const updateBalances = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating balances`)
const tokensForUpdate = new Set<Address>()
let count = 0

@ -1,22 +1,14 @@
import {Log, IERC721, ContractEventType, ContractEvent} from 'src/types'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {logger} from 'src/logger'
const {getEntryByName, decodeLog, call} = ABI
import {zeroAddress} from 'src/indexer/indexer/contracts/utils/zeroAddress'
import {normalizeAddress} from 'src/utils/normalizeAddress'
import {logTime} from 'src/utils/logTime'
const l = logger(module, 'erc1155')
const transferEventName = ContractEventType.TransferSingle
const transferEvent = getEntryByName(transferEventName)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
// todo track transfer batch
const transferBatchEvent = getEntryByName('TransferBatch')!.signature
type IParams = {
token: IERC721
}
@ -34,6 +26,14 @@ mark ownership
*/
export const trackEvents = async (store: PostgresStorage, logs: Log[], {token}: IParams) => {
const {getEntryByName, decodeLog, call} = ABIFactory(store.shardID)
// todo track transfer batch
const transferBatchEvent = getEntryByName('TransferBatch')!.signature
const transferEventName = ContractEventType.TransferSingle
const transferEvent = getEntryByName(transferEventName)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
const filteredLogs = logs.filter(({topics}) => topics.includes(transferEvent))
if (filteredLogs.length > 0) {
const addressesToUpdate = new Set<{address: string; tokenAddress: string; tokenId: string}>() // unique addresses of senders and recipients

@ -1,5 +1,6 @@
import {ABIManager} from 'src/indexer/indexer/contracts/utils/ABIManager'
import ERC20ABI from 'src/indexer/indexer/contracts/erc20/ERC20ABI.json'
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ShardID} from 'src/types'
export const ABI = ABIManager(ERC20ABI as IABI)
export const ABIFactory = (shardID: ShardID) => ABIManager(shardID, ERC20ABI as IABI)

@ -2,9 +2,8 @@ import {Address, ByteCode, Contract, IERC20} from 'src/types'
import {validator, isUint, isLength} from 'src/utils/validators/validators'
import {logger} from 'src/logger'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
const {hasAllSignatures, callAll} = ABI
const l = logger(module, 'erc20')
// https://eips.ethereum.org/EIPS/eip-20
@ -54,6 +53,8 @@ const getProxyAddress = async (store: PostgresStorage, erc20: IERC20) => {
}
export const addContract = async (store: PostgresStorage, contract: Contract) => {
const {hasAllSignatures, callAll} = ABIFactory(store.shardID)
if (!hasAllSignatures(expectedMethodsAndEvents, contract.bytecode)) {
return
}
@ -61,7 +62,7 @@ export const addContract = async (store: PostgresStorage, contract: Contract) =>
let params: Record<typeof callableMethods[number], string>
try {
params = await callAll(contract.address, callableMethods)
params = await callAll(store.shardID, contract.address, callableMethods)
validator({
decimals: () => isUint(+params.decimals),

@ -1,10 +1,9 @@
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from 'src/indexer/indexer/contracts/erc20/ABI'
import {ABIFactory} from 'src/indexer/indexer/contracts/erc20/ABI'
import {logger} from 'src/logger'
import {Address, Filter, IERC20} from 'src/types'
const l = logger(module, 'erc20:balance')
const {call} = ABI
const filter: Filter = {
limit: 100,
@ -19,6 +18,8 @@ const filter: Filter = {
}
// update balances
export const onFinish = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating balances`)
let count = 0
const tokensForUpdate = new Set<Address>()

@ -1,18 +1,14 @@
import {Log, IERC20, ContractEvent, ContractEventType} from 'src/types'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {logger} from 'src/logger'
const {getEntryByName, decodeLog} = ABI
import {zeroAddress} from 'src/indexer/indexer/contracts/utils/zeroAddress'
import {normalizeAddress} from 'src/utils/normalizeAddress'
import {logTime} from 'src/utils/logTime'
const l = logger(module, 'erc20')
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approveSignature = getEntryByName(ContractEventType.Approval)!.signature
type IParams = {
token: IERC20
}
@ -24,6 +20,11 @@ type IParams = {
// todo filter out other topics
export const trackEvents = async (store: PostgresStorage, logs: Log[], {token}: IParams) => {
const {getEntryByName, decodeLog} = ABIFactory(store.shardID)
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approveSignature = getEntryByName(ContractEventType.Approval)!.signature
const transferLogs = logs.filter(({topics}) => topics.includes(transferSignature))
if (transferLogs.length > 0) {

@ -1,5 +1,6 @@
import {ABIManager} from 'src/indexer/indexer/contracts/utils/ABIManager'
import ERC721ABI from 'src/indexer/indexer/contracts/erc721/ERC721ABI.json'
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ShardID} from 'src/types'
export const ABI = ABIManager(ERC721ABI as IABI)
export const ABIFactory = (shardID: ShardID) => ABIManager(shardID, ERC721ABI as IABI)

@ -2,9 +2,8 @@ import {Contract, IERC721} from 'src/types'
import {validator, isUint, isLength} from 'src/utils/validators/validators'
import {logger} from 'src/logger'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
const {hasAllSignatures, callAll} = ABI
const l = logger(module, 'erc721')
// https://eips.ethereum.org/EIPS/eip-20
@ -25,6 +24,8 @@ const expectedMethodsAndEvents = [
const callableMethods = ['symbol', 'name']
export const addContract = async (store: PostgresStorage, contract: Contract) => {
const {hasAllSignatures, callAll} = ABIFactory(store.shardID)
if (!hasAllSignatures(expectedMethodsAndEvents, contract.bytecode)) {
return
}
@ -32,7 +33,7 @@ export const addContract = async (store: PostgresStorage, contract: Contract) =>
let params: Record<typeof callableMethods[number], string>
try {
params = await callAll(contract.address, callableMethods)
params = await callAll(store.shardID, contract.address, callableMethods)
validator({
name: () => isLength(params.name, {min: 3, max: 64}),

@ -1,12 +1,11 @@
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from 'src/indexer/indexer/contracts/erc721/ABI'
import {ABIFactory} from 'src/indexer/indexer/contracts/erc721/ABI'
import {logger} from 'src/logger'
import {Address, Filter, IERC20, IERC721TokenID} from 'src/types'
import nodeFetch from 'node-fetch'
import {normalizeAddress} from 'src/utils/normalizeAddress'
const l = logger(module, 'erc721:assets')
const {call} = ABI
const filter: Filter = {
limit: 10,
@ -21,6 +20,8 @@ const filter: Filter = {
}
// update balances
export const onFinish = async (store: PostgresStorage) => {
const {call} = ABIFactory(store.shardID)
l.info(`Updating assets`)
let count = 0
const tokensForUpdate = new Set<Address>()

@ -1,19 +1,14 @@
import {Log, IERC721, ContractEventType, ContractEvent} from 'src/types'
import {PostgresStorage} from 'src/store/postgres'
import {ABI} from './ABI'
import {ABIFactory} from './ABI'
import {logger} from 'src/logger'
const {getEntryByName, decodeLog, call} = ABI
import {zeroAddress} from 'src/indexer/indexer/contracts/utils/zeroAddress'
import {normalizeAddress} from 'src/utils/normalizeAddress'
import {logTime} from 'src/utils/logTime'
const l = logger(module, 'erc721')
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approvalSignature = getEntryByName(ContractEventType.Approval)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
type IParams = {
token: IERC721
}
@ -25,6 +20,12 @@ type IParams = {
// todo filter out other topics
export const trackEvents = async (store: PostgresStorage, logs: Log[], {token}: IParams) => {
const {getEntryByName, decodeLog, call} = ABIFactory(store.shardID)
const transferSignature = getEntryByName(ContractEventType.Transfer)!.signature
const approvalSignature = getEntryByName(ContractEventType.Approval)!.signature
const approvalForAllSignature = getEntryByName(ContractEventType.ApprovalForAll)!.signature
const filteredLogs = logs.filter(({topics}) => topics.includes(transferSignature))
if (filteredLogs.length > 0) {
const addressesToUpdate = new Set<{address: string; tokenAddress: string; tokenId: string}>() // unique addresses of senders and recipients

@ -1,12 +1,12 @@
import {IABI} from 'src/indexer/indexer/contracts/types'
import {ByteCode, Address} from 'src/types'
import {ShardID} from 'src/types'
import Web3 from 'web3'
import * as RPCClient from 'src/indexer/rpc/client'
const web3 = new Web3()
export const ABIManager = (abi: IABI) => {
export const ABIManager = (shardID: ShardID, abi: IABI) => {
const entries = abi
.filter(({type}) => ['function', 'event'].includes(type))
.map((e) => {
@ -60,7 +60,7 @@ export const ABIManager = (abi: IABI) => {
}
const inputs = web3.eth.abi.encodeParameters(entry.inputs || [], params)
const response = await RPCClient.call(0, {
const response = await RPCClient.call(shardID, {
to: address,
data: entry.signature + inputs.slice(2),
})
@ -68,7 +68,7 @@ export const ABIManager = (abi: IABI) => {
return web3.eth.abi.decodeParameters(entry.outputs, response)['0']
}
const callAll = (address: Address, methodsNames: string[]) => {
const callAll = (shardID: ShardID, address: Address, methodsNames: string[]) => {
return Promise.all(
methodsNames.map(async (methodName) => {
const entry = getEntryByName(methodName)
@ -76,7 +76,7 @@ export const ABIManager = (abi: IABI) => {
throw new Error(`${methodName} not found`)
}
const response = await RPCClient.call(0, {
const response = await RPCClient.call(shardID, {
to: address,
data: entry.signature,
})

@ -1,12 +1,14 @@
import {EntityQueryCallbackParams} from './executors'
import {entityQueries, EntityIteratorEntities} from './entities'
import {entityQueriesFactory, EntityIteratorEntities} from './entities'
import {PostgresStorage} from 'src/store/postgres'
export async function* EntityIterator(
store: PostgresStorage,
entity: EntityIteratorEntities,
{index: initialIndex = 0, batchSize = 100, ...rest}: EntityQueryCallbackParams
) {
let index = initialIndex
const entityQueries = entityQueriesFactory(store)
const f = entityQueries[entity]
while (true) {

@ -10,6 +10,7 @@ import {
} from 'src/types'
import {listByBlockNumber, EntityQueryCallback, listByOffset, withEqual} from './executors'
import {stores} from 'src/store'
import {PostgresStorage} from 'src/store/postgres'
export type ContractIndexerTaskEntities = 'erc20' | 'erc721' | 'erc1155'
export type EntityIteratorEntities =
@ -20,20 +21,20 @@ export type EntityIteratorEntities =
| 'erc20BalancesNeedUpdate'
| ContractIndexerTaskEntities
// only shard #0
const store = stores[0]
export const entityQueries: Record<EntityIteratorEntities, EntityQueryCallback> = {
logs: listByBlockNumber<Log>(store.log.getLogs, [withEqual('address')]),
logsAll: listByBlockNumber<Log>(store.log.getLogs),
internalTransactions: listByBlockNumber<InternalTransaction>(
store.internalTransaction.getInternalTransactions
),
contracts: listByBlockNumber<Contract>(store.contract.getContracts),
erc20: listByOffset<IERC20>(store.erc20.getERC20),
erc721: listByOffset<IERC721>(store.erc721.getERC721),
erc1155: listByOffset<IERC1155>(store.erc1155.getERC1155),
erc20BalancesNeedUpdate: listByOffset<IERC20Balance>(store.erc20.getBalances, [
withEqual('needUpdate'),
]),
export const entityQueriesFactory = (store: PostgresStorage) => {
const entityQueries: Record<EntityIteratorEntities, EntityQueryCallback> = {
logs: listByBlockNumber<Log>(store.log.getLogs, [withEqual('address')]),
logsAll: listByBlockNumber<Log>(store.log.getLogs),
internalTransactions: listByBlockNumber<InternalTransaction>(
store.internalTransaction.getInternalTransactions
),
contracts: listByBlockNumber<Contract>(store.contract.getContracts),
erc20: listByOffset<IERC20>(store.erc20.getERC20),
erc721: listByOffset<IERC721>(store.erc721.getERC721),
erc1155: listByOffset<IERC1155>(store.erc1155.getERC1155),
erc20BalancesNeedUpdate: listByOffset<IERC20Balance>(store.erc20.getBalances, [
withEqual('needUpdate'),
]),
}
return entityQueries
}

@ -11,7 +11,7 @@ import {Query} from 'src/store/postgres/types'
import {fromSnakeToCamelResponse} from 'src/store/postgres/queryMapper'
import {buildSQLQuery} from 'src/store/postgres/filters'
const subQueryLimit = 100000
const subQueryLimit = 1000 // 100000
export class PostgresStorageAddress implements IStorageAddress {
query: Query
@ -103,7 +103,7 @@ export class PostgresStorageAddress implements IStorageAddress {
select it.*, t.input, t.timestamp from (
select * from (
(select * from internal_transactions it ${filterQuery} and it.from = $1 order by block_number desc limit $4)
union
union
(select * from internal_transactions it ${filterQuery} and it.to = $1 order by block_number desc limit $4)
) it
${filterQuery}
@ -196,11 +196,11 @@ export class PostgresStorageAddress implements IStorageAddress {
} else if (type === 'internal_transaction') {
const filterQuery = buildSQLQuery({filters: filter.filters})
const [{count}] = await this.query(
`
`
select count(t.*) from (
select * from (
(select * from internal_transactions it ${filterQuery} and it.from = $1)
union
union
(select * from internal_transactions it ${filterQuery} and it.to = $1)
) it
${filterQuery}

@ -17,6 +17,13 @@ export class PostgresStorageInternalTransaction implements IStorageInternalTrans
this.query = query
}
// remove old transactions every day
removeInternalTransactionsOlder7Days = async (toBlockNumber: number) => {
const res = await this.query(`delete from internal_transactions where block_number < $1`, [
toBlockNumber,
])
}
addInternalTransaction = async (tx: InternalTransaction) => {
const newTx = {
...tx,

@ -28,6 +28,8 @@ import {config} from 'src/config'
const defaultRetries = 2
const sleep = (time = 1000) => new Promise((r) => setTimeout(r, time))
const removeOldInternalTransactionsTaskPeriod = 1000 * 60 * 60 * 24
const blockNumber7DayOld = (7 * 24 * 60 * 60) / 2
export class PostgresStorage implements IStorage {
db: Pool
@ -96,6 +98,11 @@ export class PostgresStorage implements IStorage {
this.isStarted = true
this.isStarting = false
this.l.info('Done')
this.l.info(
`Start removing internal transactions task, period: ${removeOldInternalTransactionsTaskPeriod}`
)
this.removeOldInternalTransactionsTask()
}
async migrate() {
@ -161,6 +168,15 @@ export class PostgresStorage implements IStorage {
return {count}
}
removeOldInternalTransactionsTask = async () => {
const lastBlockNumber = await this.block.getLatestBlockNumber()
const toBlock = lastBlockNumber - blockNumber7DayOld
this.l.info(`Removing internal transactions where block number < ${toBlock}`)
await this.internalTransaction.removeInternalTransactionsOlder7Days(lastBlockNumber)
setTimeout(this.removeOldInternalTransactionsTask, removeOldInternalTransactionsTaskPeriod)
}
async stop() {
await this.db.end()
}

Loading…
Cancel
Save