Ricl rpc (#880)
* added readme * more RPC APIs * update for comments; fix travis issues * update auto-generated host_mock.go * pubsub WIP * WIP * added hmy/backend.go * update to support newBlockFilter * fix golint * update readme; add log in travis_checker.sh * add APIs * update deploy.sh for duration * enable sync by default * testing getFilterChange * fix ten-oneshard.txt * add support for bloom indexer * minior * update * [protobuf] generate the pb.go files Signed-off-by: Leo Chen <leo@harmony.one> * undo parameters * remove bloombits * fix bloom9_test * remove service.gopull/905/head
parent
00d002caa6
commit
1db7381d28
@ -1,119 +0,0 @@ |
||||
package core |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/params" |
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
"github.com/harmony-one/harmony/accounts" |
||||
"github.com/harmony-one/harmony/core/state" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
// HmyAPIBackend ...
|
||||
type HmyAPIBackend struct { |
||||
blockchain *BlockChain |
||||
txPool *TxPool |
||||
accountManager *accounts.Manager |
||||
nodeAPI NodeAPIFunctions |
||||
} |
||||
|
||||
// NodeAPIFunctions is the list of functions from node used to call rpc apis.
|
||||
type NodeAPIFunctions interface { |
||||
AddPendingTransaction(newTx *types.Transaction) |
||||
} |
||||
|
||||
// NewBackend ...
|
||||
func NewBackend(blockchain *BlockChain, txPool *TxPool, accountManager *accounts.Manager, nodeAPI NodeAPIFunctions) *HmyAPIBackend { |
||||
return &HmyAPIBackend{blockchain, txPool, accountManager, nodeAPI} |
||||
} |
||||
|
||||
// ChainDb ...
|
||||
func (b *HmyAPIBackend) ChainDb() ethdb.Database { |
||||
return b.blockchain.db |
||||
} |
||||
|
||||
// GetBlock ...
|
||||
func (b *HmyAPIBackend) GetBlock(ctx context.Context, hash common.Hash) (*types.Block, error) { |
||||
return b.blockchain.GetBlockByHash(hash), nil |
||||
} |
||||
|
||||
// GetPoolTransaction ...
|
||||
func (b *HmyAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { |
||||
return b.txPool.Get(hash) |
||||
} |
||||
|
||||
// BlockByNumber ...
|
||||
func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { |
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
// Otherwise resolve and return the block
|
||||
if blockNr == rpc.LatestBlockNumber { |
||||
return b.blockchain.CurrentBlock(), nil |
||||
} |
||||
return b.blockchain.GetBlockByNumber(uint64(blockNr)), nil |
||||
} |
||||
|
||||
// StateAndHeaderByNumber ...
|
||||
func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) { |
||||
// Pending state is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber { |
||||
return nil, nil, errors.New("not implemented") |
||||
} |
||||
// Otherwise resolve the block number and return its state
|
||||
header, err := b.HeaderByNumber(ctx, blockNr) |
||||
if header == nil || err != nil { |
||||
return nil, nil, err |
||||
} |
||||
stateDb, err := b.blockchain.StateAt(header.Root) |
||||
return stateDb, header, err |
||||
} |
||||
|
||||
// HeaderByNumber ...
|
||||
func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { |
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
// Otherwise resolve and return the block
|
||||
if blockNr == rpc.LatestBlockNumber { |
||||
return b.blockchain.CurrentBlock().Header(), nil |
||||
} |
||||
return b.blockchain.GetHeaderByNumber(uint64(blockNr)), nil |
||||
} |
||||
|
||||
// GetPoolNonce ...
|
||||
func (b *HmyAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { |
||||
return b.txPool.State().GetNonce(addr), nil |
||||
} |
||||
|
||||
// SendTx ...
|
||||
func (b *HmyAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { |
||||
b.nodeAPI.AddPendingTransaction(signedTx) |
||||
return nil |
||||
} |
||||
|
||||
// ChainConfig ...
|
||||
func (b *HmyAPIBackend) ChainConfig() *params.ChainConfig { |
||||
return b.blockchain.chainConfig |
||||
} |
||||
|
||||
// CurrentBlock ...
|
||||
func (b *HmyAPIBackend) CurrentBlock() *types.Block { |
||||
return types.NewBlockWithHeader(b.blockchain.CurrentHeader()) |
||||
} |
||||
|
||||
// AccountManager ...
|
||||
func (b *HmyAPIBackend) AccountManager() *accounts.Manager { |
||||
return b.accountManager |
||||
} |
||||
|
||||
// GetReceipts ...
|
||||
func (b *HmyAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { |
||||
return b.blockchain.GetReceiptsByHash(hash), nil |
||||
} |
@ -0,0 +1,496 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package core |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
"github.com/harmony-one/harmony/core/rawdb" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
// ChainIndexerBackend defines the methods needed to process chain segments in
|
||||
// the background and write the segment results into the database. These can be
|
||||
// used to create filter blooms or CHTs.
|
||||
type ChainIndexerBackend interface { |
||||
// Reset initiates the processing of a new chain segment, potentially terminating
|
||||
// any partially completed operations (in case of a reorg).
|
||||
Reset(ctx context.Context, section uint64, prevHead common.Hash) error |
||||
|
||||
// Process crunches through the next header in the chain segment. The caller
|
||||
// will ensure a sequential order of headers.
|
||||
Process(ctx context.Context, header *types.Header) error |
||||
|
||||
// Commit finalizes the section metadata and stores it into the database.
|
||||
Commit() error |
||||
} |
||||
|
||||
// ChainIndexerChain interface is used for connecting the indexer to a blockchain
|
||||
type ChainIndexerChain interface { |
||||
// CurrentHeader retrieves the latest locally known header.
|
||||
CurrentHeader() *types.Header |
||||
|
||||
// SubscribeChainHeadEvent subscribes to new head header notifications.
|
||||
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription |
||||
} |
||||
|
||||
// ChainIndexer does a post-processing job for equally sized sections of the
|
||||
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
|
||||
// connected to the blockchain through the event system by starting a
|
||||
// ChainHeadEventLoop in a goroutine.
|
||||
//
|
||||
// Further child ChainIndexers can be added which use the output of the parent
|
||||
// section indexer. These child indexers receive new head notifications only
|
||||
// after an entire section has been finished or in case of rollbacks that might
|
||||
// affect already finished sections.
|
||||
type ChainIndexer struct { |
||||
chainDb ethdb.Database // Chain database to index the data from
|
||||
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
|
||||
backend ChainIndexerBackend // Background processor generating the index data content
|
||||
children []*ChainIndexer // Child indexers to cascade chain updates to
|
||||
|
||||
active uint32 // Flag whether the event loop was started
|
||||
update chan struct{} // Notification channel that headers should be processed
|
||||
quit chan chan error // Quit channel to tear down running goroutines
|
||||
ctx context.Context |
||||
ctxCancel func() |
||||
|
||||
sectionSize uint64 // Number of blocks in a single chain segment to process
|
||||
confirmsReq uint64 // Number of confirmations before processing a completed segment
|
||||
|
||||
storedSections uint64 // Number of sections successfully indexed into the database
|
||||
knownSections uint64 // Number of sections known to be complete (block wise)
|
||||
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
|
||||
|
||||
checkpointSections uint64 // Number of sections covered by the checkpoint
|
||||
checkpointHead common.Hash // Section head belonging to the checkpoint
|
||||
|
||||
throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
|
||||
|
||||
log log.Logger |
||||
lock sync.RWMutex |
||||
} |
||||
|
||||
// NewChainIndexer creates a new chain indexer to do background processing on
|
||||
// chain segments of a given size after certain number of confirmations passed.
|
||||
// The throttling parameter might be used to prevent database thrashing.
|
||||
func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { |
||||
c := &ChainIndexer{ |
||||
chainDb: chainDb, |
||||
indexDb: indexDb, |
||||
backend: backend, |
||||
update: make(chan struct{}, 1), |
||||
quit: make(chan chan error), |
||||
sectionSize: section, |
||||
confirmsReq: confirm, |
||||
throttling: throttling, |
||||
log: log.New("type", kind), |
||||
} |
||||
// Initialize database dependent fields and start the updater
|
||||
c.loadValidSections() |
||||
c.ctx, c.ctxCancel = context.WithCancel(context.Background()) |
||||
|
||||
go c.updateLoop() |
||||
|
||||
return c |
||||
} |
||||
|
||||
// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
|
||||
// is not expected to be available before this point. The indexer assumes that
|
||||
// the backend has sufficient information available to process subsequent sections.
|
||||
//
|
||||
// Note: knownSections == 0 and storedSections == checkpointSections until
|
||||
// syncing reaches the checkpoint
|
||||
func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
c.checkpointSections = section + 1 |
||||
c.checkpointHead = shead |
||||
|
||||
if section < c.storedSections { |
||||
return |
||||
} |
||||
c.setSectionHead(section, shead) |
||||
c.setValidSections(section + 1) |
||||
} |
||||
|
||||
// Start creates a goroutine to feed chain head events into the indexer for
|
||||
// cascading background processing. Children do not need to be started, they
|
||||
// are notified about new events by their parents.
|
||||
func (c *ChainIndexer) Start(chain ChainIndexerChain) { |
||||
events := make(chan ChainHeadEvent, 10) |
||||
sub := chain.SubscribeChainHeadEvent(events) |
||||
|
||||
go c.eventLoop(chain.CurrentHeader(), events, sub) |
||||
} |
||||
|
||||
// Close tears down all goroutines belonging to the indexer and returns any error
|
||||
// that might have occurred internally.
|
||||
func (c *ChainIndexer) Close() error { |
||||
var errs []error |
||||
|
||||
c.ctxCancel() |
||||
|
||||
// Tear down the primary update loop
|
||||
errc := make(chan error) |
||||
c.quit <- errc |
||||
if err := <-errc; err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
// If needed, tear down the secondary event loop
|
||||
if atomic.LoadUint32(&c.active) != 0 { |
||||
c.quit <- errc |
||||
if err := <-errc; err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
// Close all children
|
||||
for _, child := range c.children { |
||||
if err := child.Close(); err != nil { |
||||
errs = append(errs, err) |
||||
} |
||||
} |
||||
// Return any failures
|
||||
switch { |
||||
case len(errs) == 0: |
||||
return nil |
||||
|
||||
case len(errs) == 1: |
||||
return errs[0] |
||||
|
||||
default: |
||||
return fmt.Errorf("%v", errs) |
||||
} |
||||
} |
||||
|
||||
// eventLoop is a secondary - optional - event loop of the indexer which is only
|
||||
// started for the outermost indexer to push chain head events into a processing
|
||||
// queue.
|
||||
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) { |
||||
// Mark the chain indexer as active, requiring an additional teardown
|
||||
atomic.StoreUint32(&c.active, 1) |
||||
|
||||
defer sub.Unsubscribe() |
||||
|
||||
// Fire the initial new head event to start any outstanding processing
|
||||
c.newHead(currentHeader.Number.Uint64(), false) |
||||
|
||||
var ( |
||||
prevHeader = currentHeader |
||||
prevHash = currentHeader.Hash() |
||||
) |
||||
for { |
||||
select { |
||||
case errc := <-c.quit: |
||||
// Chain indexer terminating, report no failure and abort
|
||||
errc <- nil |
||||
return |
||||
|
||||
case ev, ok := <-events: |
||||
// Received a new event, ensure it's not nil (closing) and update
|
||||
if !ok { |
||||
errc := <-c.quit |
||||
errc <- nil |
||||
return |
||||
} |
||||
header := ev.Block.Header() |
||||
if header.ParentHash != prevHash { |
||||
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
|
||||
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
|
||||
|
||||
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash { |
||||
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { |
||||
c.newHead(h.Number.Uint64(), true) |
||||
} |
||||
} |
||||
} |
||||
c.newHead(header.Number.Uint64(), false) |
||||
|
||||
prevHeader, prevHash = header, header.Hash() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// newHead notifies the indexer about new chain heads and/or reorgs.
|
||||
func (c *ChainIndexer) newHead(head uint64, reorg bool) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
// If a reorg happened, invalidate all sections until that point
|
||||
if reorg { |
||||
// Revert the known section number to the reorg point
|
||||
known := head / c.sectionSize |
||||
stored := known |
||||
if known < c.checkpointSections { |
||||
known = 0 |
||||
} |
||||
if stored < c.checkpointSections { |
||||
stored = c.checkpointSections |
||||
} |
||||
if known < c.knownSections { |
||||
c.knownSections = known |
||||
} |
||||
// Revert the stored sections from the database to the reorg point
|
||||
if stored < c.storedSections { |
||||
c.setValidSections(stored) |
||||
} |
||||
// Update the new head number to the finalized section end and notify children
|
||||
head = known * c.sectionSize |
||||
|
||||
if head < c.cascadedHead { |
||||
c.cascadedHead = head |
||||
for _, child := range c.children { |
||||
child.newHead(c.cascadedHead, true) |
||||
} |
||||
} |
||||
return |
||||
} |
||||
// No reorg, calculate the number of newly known sections and update if high enough
|
||||
var sections uint64 |
||||
if head >= c.confirmsReq { |
||||
sections = (head + 1 - c.confirmsReq) / c.sectionSize |
||||
if sections < c.checkpointSections { |
||||
sections = 0 |
||||
} |
||||
if sections > c.knownSections { |
||||
if c.knownSections < c.checkpointSections { |
||||
// syncing reached the checkpoint, verify section head
|
||||
syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1) |
||||
if syncedHead != c.checkpointHead { |
||||
c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead) |
||||
return |
||||
} |
||||
} |
||||
c.knownSections = sections |
||||
|
||||
select { |
||||
case c.update <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// updateLoop is the main event loop of the indexer which pushes chain segments
|
||||
// down into the processing backend.
|
||||
func (c *ChainIndexer) updateLoop() { |
||||
var ( |
||||
updating bool |
||||
updated time.Time |
||||
) |
||||
|
||||
for { |
||||
select { |
||||
case errc := <-c.quit: |
||||
// Chain indexer terminating, report no failure and abort
|
||||
errc <- nil |
||||
return |
||||
|
||||
case <-c.update: |
||||
// Section headers completed (or rolled back), update the index
|
||||
c.lock.Lock() |
||||
if c.knownSections > c.storedSections { |
||||
// Periodically print an upgrade log message to the user
|
||||
if time.Since(updated) > 8*time.Second { |
||||
if c.knownSections > c.storedSections+1 { |
||||
updating = true |
||||
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) |
||||
} |
||||
updated = time.Now() |
||||
} |
||||
// Cache the current section count and head to allow unlocking the mutex
|
||||
section := c.storedSections |
||||
var oldHead common.Hash |
||||
if section > 0 { |
||||
oldHead = c.SectionHead(section - 1) |
||||
} |
||||
// Process the newly defined section in the background
|
||||
c.lock.Unlock() |
||||
newHead, err := c.processSection(section, oldHead) |
||||
if err != nil { |
||||
select { |
||||
case <-c.ctx.Done(): |
||||
<-c.quit <- nil |
||||
return |
||||
default: |
||||
} |
||||
c.log.Error("Section processing failed", "error", err) |
||||
} |
||||
c.lock.Lock() |
||||
|
||||
// If processing succeeded and no reorgs occcurred, mark the section completed
|
||||
if err == nil && oldHead == c.SectionHead(section-1) { |
||||
c.setSectionHead(section, newHead) |
||||
c.setValidSections(section + 1) |
||||
if c.storedSections == c.knownSections && updating { |
||||
updating = false |
||||
c.log.Info("Finished upgrading chain index") |
||||
} |
||||
c.cascadedHead = c.storedSections*c.sectionSize - 1 |
||||
for _, child := range c.children { |
||||
c.log.Trace("Cascading chain index update", "head", c.cascadedHead) |
||||
child.newHead(c.cascadedHead, false) |
||||
} |
||||
} else { |
||||
// If processing failed, don't retry until further notification
|
||||
c.log.Debug("Chain index processing failed", "section", section, "err", err) |
||||
c.knownSections = c.storedSections |
||||
} |
||||
} |
||||
// If there are still further sections to process, reschedule
|
||||
if c.knownSections > c.storedSections { |
||||
time.AfterFunc(c.throttling, func() { |
||||
select { |
||||
case c.update <- struct{}{}: |
||||
default: |
||||
} |
||||
}) |
||||
} |
||||
c.lock.Unlock() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// processSection processes an entire section by calling backend functions while
|
||||
// ensuring the continuity of the passed headers. Since the chain mutex is not
|
||||
// held while processing, the continuity can be broken by a long reorg, in which
|
||||
// case the function returns with an error.
|
||||
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { |
||||
c.log.Trace("Processing new chain section", "section", section) |
||||
|
||||
// Reset and partial processing
|
||||
|
||||
if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { |
||||
c.setValidSections(0) |
||||
return common.Hash{}, err |
||||
} |
||||
|
||||
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { |
||||
hash := rawdb.ReadCanonicalHash(c.chainDb, number) |
||||
if hash == (common.Hash{}) { |
||||
return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) |
||||
} |
||||
header := rawdb.ReadHeader(c.chainDb, hash, number) |
||||
if header == nil { |
||||
return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) |
||||
} else if header.ParentHash != lastHead { |
||||
return common.Hash{}, fmt.Errorf("chain reorged during section processing") |
||||
} |
||||
if err := c.backend.Process(c.ctx, header); err != nil { |
||||
return common.Hash{}, err |
||||
} |
||||
lastHead = header.Hash() |
||||
} |
||||
if err := c.backend.Commit(); err != nil { |
||||
return common.Hash{}, err |
||||
} |
||||
return lastHead, nil |
||||
} |
||||
|
||||
// Sections returns the number of processed sections maintained by the indexer
|
||||
// and also the information about the last header indexed for potential canonical
|
||||
// verifications.
|
||||
func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) |
||||
} |
||||
|
||||
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
|
||||
func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { |
||||
c.lock.Lock() |
||||
defer c.lock.Unlock() |
||||
|
||||
c.children = append(c.children, indexer) |
||||
|
||||
// Cascade any pending updates to new children too
|
||||
sections := c.storedSections |
||||
if c.knownSections < sections { |
||||
// if a section is "stored" but not "known" then it is a checkpoint without
|
||||
// available chain data so we should not cascade it yet
|
||||
sections = c.knownSections |
||||
} |
||||
if sections > 0 { |
||||
indexer.newHead(sections*c.sectionSize-1, false) |
||||
} |
||||
} |
||||
|
||||
// loadValidSections reads the number of valid sections from the index database
|
||||
// and caches is into the local state.
|
||||
func (c *ChainIndexer) loadValidSections() { |
||||
data, _ := c.indexDb.Get([]byte("count")) |
||||
if len(data) == 8 { |
||||
c.storedSections = binary.BigEndian.Uint64(data) |
||||
} |
||||
} |
||||
|
||||
// setValidSections writes the number of valid sections to the index database
|
||||
func (c *ChainIndexer) setValidSections(sections uint64) { |
||||
// Set the current number of valid sections in the database
|
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], sections) |
||||
c.indexDb.Put([]byte("count"), data[:]) |
||||
|
||||
// Remove any reorged sections, caching the valids in the mean time
|
||||
for c.storedSections > sections { |
||||
c.storedSections-- |
||||
c.removeSectionHead(c.storedSections) |
||||
} |
||||
c.storedSections = sections // needed if new > old
|
||||
} |
||||
|
||||
// SectionHead retrieves the last block hash of a processed section from the
|
||||
// index database.
|
||||
func (c *ChainIndexer) SectionHead(section uint64) common.Hash { |
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], section) |
||||
|
||||
hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) |
||||
if len(hash) == len(common.Hash{}) { |
||||
return common.BytesToHash(hash) |
||||
} |
||||
return common.Hash{} |
||||
} |
||||
|
||||
// setSectionHead writes the last block hash of a processed section to the index
|
||||
// database.
|
||||
func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { |
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], section) |
||||
|
||||
c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) |
||||
} |
||||
|
||||
// removeSectionHead removes the reference to a processed section from the index
|
||||
// database.
|
||||
func (c *ChainIndexer) removeSectionHead(section uint64) { |
||||
var data [8]byte |
||||
binary.BigEndian.PutUint64(data[:], section) |
||||
|
||||
c.indexDb.Delete(append([]byte("shead"), data[:]...)) |
||||
} |
@ -0,0 +1,187 @@ |
||||
package hmy |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/bloombits" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/params" |
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
"github.com/harmony-one/harmony/accounts" |
||||
"github.com/harmony-one/harmony/api/proto" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/state" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
// APIBackend An implementation of Backend. Full client.
|
||||
type APIBackend struct { |
||||
hmy *Harmony |
||||
} |
||||
|
||||
// ChainDb ...
|
||||
func (b *APIBackend) ChainDb() ethdb.Database { |
||||
return b.hmy.chainDb |
||||
} |
||||
|
||||
// GetBlock ...
|
||||
func (b *APIBackend) GetBlock(ctx context.Context, hash common.Hash) (*types.Block, error) { |
||||
return b.hmy.blockchain.GetBlockByHash(hash), nil |
||||
} |
||||
|
||||
// GetPoolTransaction ...
|
||||
func (b *APIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { |
||||
return b.hmy.txPool.Get(hash) |
||||
} |
||||
|
||||
// BlockByNumber ...
|
||||
func (b *APIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { |
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
// Otherwise resolve and return the block
|
||||
if blockNr == rpc.LatestBlockNumber { |
||||
return b.hmy.blockchain.CurrentBlock(), nil |
||||
} |
||||
return b.hmy.blockchain.GetBlockByNumber(uint64(blockNr)), nil |
||||
} |
||||
|
||||
// StateAndHeaderByNumber ...
|
||||
func (b *APIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) { |
||||
// Pending state is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber { |
||||
return nil, nil, errors.New("not implemented") |
||||
} |
||||
// Otherwise resolve the block number and return its state
|
||||
header, err := b.HeaderByNumber(ctx, blockNr) |
||||
if header == nil || err != nil { |
||||
return nil, nil, err |
||||
} |
||||
stateDb, err := b.hmy.blockchain.StateAt(header.Root) |
||||
return stateDb, header, err |
||||
} |
||||
|
||||
// HeaderByNumber ...
|
||||
func (b *APIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { |
||||
// Pending block is only known by the miner
|
||||
if blockNr == rpc.PendingBlockNumber { |
||||
return nil, errors.New("not implemented") |
||||
} |
||||
// Otherwise resolve and return the block
|
||||
if blockNr == rpc.LatestBlockNumber { |
||||
return b.hmy.blockchain.CurrentBlock().Header(), nil |
||||
} |
||||
return b.hmy.blockchain.GetHeaderByNumber(uint64(blockNr)), nil |
||||
} |
||||
|
||||
// GetPoolNonce ...
|
||||
func (b *APIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { |
||||
return b.hmy.txPool.State().GetNonce(addr), nil |
||||
} |
||||
|
||||
// SendTx ...
|
||||
func (b *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { |
||||
// return b.hmy.txPool.Add(ctx, signedTx)
|
||||
b.hmy.nodeAPI.AddPendingTransaction(signedTx) |
||||
return nil // TODO(ricl): AddPendingTransaction should return error
|
||||
} |
||||
|
||||
// ChainConfig ...
|
||||
func (b *APIBackend) ChainConfig() *params.ChainConfig { |
||||
return b.hmy.blockchain.Config() |
||||
} |
||||
|
||||
// CurrentBlock ...
|
||||
func (b *APIBackend) CurrentBlock() *types.Block { |
||||
return types.NewBlockWithHeader(b.hmy.blockchain.CurrentHeader()) |
||||
} |
||||
|
||||
// AccountManager ...
|
||||
func (b *APIBackend) AccountManager() *accounts.Manager { |
||||
return b.hmy.accountManager |
||||
} |
||||
|
||||
// GetReceipts ...
|
||||
func (b *APIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { |
||||
return b.hmy.blockchain.GetReceiptsByHash(hash), nil |
||||
} |
||||
|
||||
// EventMux ...
|
||||
func (b *APIBackend) EventMux() *event.TypeMux { return b.hmy.eventMux } |
||||
|
||||
// BloomStatus ...
|
||||
func (b *APIBackend) BloomStatus() (uint64, uint64) { |
||||
sections, _, _ := b.hmy.bloomIndexer.Sections() |
||||
return params.BloomBitsBlocks, sections |
||||
} |
||||
|
||||
// ProtocolVersion ...
|
||||
func (b *APIBackend) ProtocolVersion() int { |
||||
return proto.ProtocolVersion |
||||
} |
||||
|
||||
// Filter related APIs
|
||||
|
||||
// GetLogs ...
|
||||
func (b *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { |
||||
// TODO(ricl): implement
|
||||
return nil, nil |
||||
} |
||||
|
||||
// HeaderByHash ...
|
||||
func (b *APIBackend) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) { |
||||
// TODO(ricl): implement
|
||||
return nil, nil |
||||
} |
||||
|
||||
// ServiceFilter ...
|
||||
func (b *APIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { |
||||
// TODO(ricl): implement
|
||||
} |
||||
|
||||
// SubscribeNewTxsEvent ...
|
||||
func (b *APIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { |
||||
return b.hmy.TxPool().SubscribeNewTxsEvent(ch) |
||||
} |
||||
|
||||
// SubscribeChainEvent ...
|
||||
func (b *APIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { |
||||
return b.hmy.BlockChain().SubscribeChainEvent(ch) |
||||
} |
||||
|
||||
// SubscribeChainHeadEvent ...
|
||||
func (b *APIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { |
||||
return b.hmy.BlockChain().SubscribeChainHeadEvent(ch) |
||||
} |
||||
|
||||
// SubscribeChainSideEvent ...
|
||||
func (b *APIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { |
||||
return b.hmy.BlockChain().SubscribeChainSideEvent(ch) |
||||
} |
||||
|
||||
// SubscribeRemovedLogsEvent ...
|
||||
func (b *APIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { |
||||
return b.hmy.BlockChain().SubscribeRemovedLogsEvent(ch) |
||||
} |
||||
|
||||
// SubscribeLogsEvent ...
|
||||
func (b *APIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { |
||||
return b.hmy.BlockChain().SubscribeLogsEvent(ch) |
||||
} |
||||
|
||||
// GetPoolTransactions ...
|
||||
func (b *APIBackend) GetPoolTransactions() (types.Transactions, error) { |
||||
pending, err := b.hmy.txPool.Pending() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
var txs types.Transactions |
||||
for _, batch := range pending { |
||||
txs = append(txs, batch...) |
||||
} |
||||
return txs, nil |
||||
} |
@ -0,0 +1,64 @@ |
||||
package hmy |
||||
|
||||
import ( |
||||
"github.com/ethereum/go-ethereum/core/bloombits" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/params" |
||||
"github.com/harmony-one/harmony/accounts" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
// Harmony implements the Harmony full node service.
|
||||
type Harmony struct { |
||||
// Channel for shutting down the service
|
||||
shutdownChan chan bool // Channel for shutting down the Harmony
|
||||
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
|
||||
|
||||
blockchain *core.BlockChain |
||||
txPool *core.TxPool |
||||
accountManager *accounts.Manager |
||||
eventMux *event.TypeMux |
||||
// DB interfaces
|
||||
chainDb ethdb.Database // Block chain database
|
||||
|
||||
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
|
||||
APIBackend *APIBackend |
||||
|
||||
nodeAPI NodeAPI |
||||
} |
||||
|
||||
// NodeAPI is the list of functions from node used to call rpc apis.
|
||||
type NodeAPI interface { |
||||
AddPendingTransaction(newTx *types.Transaction) |
||||
Blockchain() *core.BlockChain |
||||
AccountManager() *accounts.Manager |
||||
} |
||||
|
||||
// New creates a new Harmony object (including the
|
||||
// initialisation of the common Harmony object)
|
||||
func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux) (*Harmony, error) { |
||||
chainDb := nodeAPI.Blockchain().ChainDB() |
||||
hmy := &Harmony{ |
||||
shutdownChan: make(chan bool), |
||||
bloomRequests: make(chan chan *bloombits.Retrieval), |
||||
blockchain: nodeAPI.Blockchain(), |
||||
txPool: txPool, |
||||
accountManager: nodeAPI.AccountManager(), |
||||
eventMux: eventMux, |
||||
chainDb: chainDb, |
||||
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), |
||||
nodeAPI: nodeAPI, |
||||
} |
||||
|
||||
hmy.APIBackend = &APIBackend{hmy} |
||||
|
||||
return hmy, nil |
||||
} |
||||
|
||||
// TxPool ...
|
||||
func (s *Harmony) TxPool() *core.TxPool { return s.txPool } |
||||
|
||||
// BlockChain ...
|
||||
func (s *Harmony) BlockChain() *core.BlockChain { return s.blockchain } |
@ -0,0 +1,138 @@ |
||||
// Copyright 2017 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package hmy |
||||
|
||||
import ( |
||||
"context" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/common/bitutil" |
||||
"github.com/ethereum/go-ethereum/core/bloombits" |
||||
"github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
const ( |
||||
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
|
||||
// instance to service bloombits lookups for all running filters.
|
||||
bloomServiceThreads = 16 |
||||
|
||||
// bloomFilterThreads is the number of goroutines used locally per filter to
|
||||
// multiplex requests onto the global servicing goroutines.
|
||||
bloomFilterThreads = 3 |
||||
|
||||
// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
|
||||
// in a single batch.
|
||||
bloomRetrievalBatch = 16 |
||||
|
||||
// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
|
||||
// to accumulate request an entire batch (avoiding hysteresis).
|
||||
bloomRetrievalWait = time.Duration(0) |
||||
) |
||||
|
||||
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
||||
// retrievals from possibly a range of filters and serving the data to satisfy.
|
||||
func (hmy *Harmony) startBloomHandlers(sectionSize uint64) { |
||||
for i := 0; i < bloomServiceThreads; i++ { |
||||
go func() { |
||||
for { |
||||
select { |
||||
case <-hmy.shutdownChan: |
||||
return |
||||
|
||||
case request := <-hmy.bloomRequests: |
||||
task := <-request |
||||
task.Bitsets = make([][]byte, len(task.Sections)) |
||||
for i, section := range task.Sections { |
||||
head := rawdb.ReadCanonicalHash(hmy.chainDb, (section+1)*sectionSize-1) |
||||
if compVector, err := rawdb.ReadBloomBits(hmy.chainDb, task.Bit, section, head); err == nil { |
||||
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { |
||||
task.Bitsets[i] = blob |
||||
} else { |
||||
task.Error = err |
||||
} |
||||
} else { |
||||
task.Error = err |
||||
} |
||||
} |
||||
request <- task |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
} |
||||
|
||||
const ( |
||||
// bloomThrottling is the time to wait between processing two consecutive index
|
||||
// sections. It's useful during chain upgrades to prevent disk overload.
|
||||
bloomThrottling = 100 * time.Millisecond |
||||
) |
||||
|
||||
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
|
||||
// for the Ethereum header bloom filters, permitting blazing fast filtering.
|
||||
type BloomIndexer struct { |
||||
size uint64 // section size to generate bloombits for
|
||||
db ethdb.Database // database instance to write index data and metadata into
|
||||
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
|
||||
section uint64 // Section is the section number being processed currently
|
||||
head common.Hash // Head is the hash of the last header processed
|
||||
} |
||||
|
||||
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
|
||||
// canonical chain for fast logs filtering.
|
||||
func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer { |
||||
backend := &BloomIndexer{ |
||||
db: db, |
||||
size: size, |
||||
} |
||||
table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) |
||||
|
||||
return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") |
||||
} |
||||
|
||||
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
|
||||
// section.
|
||||
func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { |
||||
gen, err := bloombits.NewGenerator(uint(b.size)) |
||||
b.gen, b.section, b.head = gen, section, common.Hash{} |
||||
return err |
||||
} |
||||
|
||||
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
|
||||
// the index.
|
||||
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { |
||||
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) |
||||
b.head = header.Hash() |
||||
return nil |
||||
} |
||||
|
||||
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
|
||||
// writing it out into the database.
|
||||
func (b *BloomIndexer) Commit() error { |
||||
batch := b.db.NewBatch() |
||||
for i := 0; i < types.BloomBitLength; i++ { |
||||
bits, err := b.gen.Bitset(uint(i)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) |
||||
} |
||||
return batch.Write() |
||||
} |
@ -0,0 +1,433 @@ |
||||
package filters |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"sync" |
||||
"time" |
||||
|
||||
ethereum "github.com/ethereum/go-ethereum" |
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
var ( |
||||
deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
|
||||
) |
||||
|
||||
// filter is a helper struct that holds meta information over the filter type
|
||||
// and associated subscription in the event system.
|
||||
type filter struct { |
||||
typ Type |
||||
deadline *time.Timer // filter is inactive when deadline triggers
|
||||
hashes []common.Hash |
||||
crit FilterCriteria |
||||
logs []*types.Log |
||||
s *Subscription // associated subscription in event system
|
||||
} |
||||
|
||||
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
|
||||
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
||||
type PublicFilterAPI struct { |
||||
backend Backend |
||||
mux *event.TypeMux |
||||
quit chan struct{} |
||||
chainDb ethdb.Database |
||||
events *EventSystem |
||||
filtersMu sync.Mutex |
||||
filters map[rpc.ID]*filter |
||||
} |
||||
|
||||
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
|
||||
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { |
||||
api := &PublicFilterAPI{ |
||||
backend: backend, |
||||
mux: backend.EventMux(), |
||||
chainDb: backend.ChainDb(), |
||||
events: NewEventSystem(backend.EventMux(), backend, lightMode), |
||||
filters: make(map[rpc.ID]*filter), |
||||
} |
||||
go api.timeoutLoop() |
||||
|
||||
return api |
||||
} |
||||
|
||||
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
|
||||
// Tt is started when the api is created.
|
||||
func (api *PublicFilterAPI) timeoutLoop() { |
||||
ticker := time.NewTicker(5 * time.Minute) |
||||
for { |
||||
<-ticker.C |
||||
api.filtersMu.Lock() |
||||
for id, f := range api.filters { |
||||
select { |
||||
case <-f.deadline.C: |
||||
f.s.Unsubscribe() |
||||
delete(api.filters, id) |
||||
default: |
||||
continue |
||||
} |
||||
} |
||||
api.filtersMu.Unlock() |
||||
} |
||||
} |
||||
|
||||
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
|
||||
// as transactions enter the pending state.
|
||||
//
|
||||
// It is part of the filter package because this filter can be used through the
|
||||
// `eth_getFilterChanges` polling method that is also used for log filters.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
|
||||
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { |
||||
var ( |
||||
pendingTxs = make(chan []common.Hash) |
||||
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) |
||||
) |
||||
|
||||
api.filtersMu.Lock() |
||||
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} |
||||
api.filtersMu.Unlock() |
||||
|
||||
go func() { |
||||
for { |
||||
select { |
||||
case ph := <-pendingTxs: |
||||
api.filtersMu.Lock() |
||||
if f, found := api.filters[pendingTxSub.ID]; found { |
||||
f.hashes = append(f.hashes, ph...) |
||||
} |
||||
api.filtersMu.Unlock() |
||||
case <-pendingTxSub.Err(): |
||||
api.filtersMu.Lock() |
||||
delete(api.filters, pendingTxSub.ID) |
||||
api.filtersMu.Unlock() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
return pendingTxSub.ID |
||||
} |
||||
|
||||
// NewPendingTransactions creates a subscription that is triggered each time a transaction
|
||||
// enters the transaction pool and was signed from one of the transactions this nodes manages.
|
||||
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { |
||||
notifier, supported := rpc.NotifierFromContext(ctx) |
||||
if !supported { |
||||
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported |
||||
} |
||||
|
||||
rpcSub := notifier.CreateSubscription() |
||||
|
||||
go func() { |
||||
txHashes := make(chan []common.Hash, 128) |
||||
pendingTxSub := api.events.SubscribePendingTxs(txHashes) |
||||
|
||||
for { |
||||
select { |
||||
case hashes := <-txHashes: |
||||
// To keep the original behaviour, send a single tx hash in one notification.
|
||||
// TODO(rjl493456442) Send a batch of tx hashes in one notification
|
||||
for _, h := range hashes { |
||||
notifier.Notify(rpcSub.ID, h) |
||||
} |
||||
case <-rpcSub.Err(): |
||||
pendingTxSub.Unsubscribe() |
||||
return |
||||
case <-notifier.Closed(): |
||||
pendingTxSub.Unsubscribe() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
return rpcSub, nil |
||||
} |
||||
|
||||
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
|
||||
// It is part of the filter package since polling goes with eth_getFilterChanges.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
|
||||
func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { |
||||
var ( |
||||
headers = make(chan *types.Header) |
||||
headerSub = api.events.SubscribeNewHeads(headers) |
||||
) |
||||
|
||||
api.filtersMu.Lock() |
||||
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} |
||||
api.filtersMu.Unlock() |
||||
|
||||
go func() { |
||||
for { |
||||
select { |
||||
case h := <-headers: |
||||
api.filtersMu.Lock() |
||||
if f, found := api.filters[headerSub.ID]; found { |
||||
f.hashes = append(f.hashes, h.Hash()) |
||||
} |
||||
api.filtersMu.Unlock() |
||||
case <-headerSub.Err(): |
||||
api.filtersMu.Lock() |
||||
delete(api.filters, headerSub.ID) |
||||
api.filtersMu.Unlock() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
return headerSub.ID |
||||
} |
||||
|
||||
// NewHeads send a notification each time a new (header) block is appended to the chain.
|
||||
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { |
||||
notifier, supported := rpc.NotifierFromContext(ctx) |
||||
if !supported { |
||||
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported |
||||
} |
||||
|
||||
rpcSub := notifier.CreateSubscription() |
||||
|
||||
go func() { |
||||
headers := make(chan *types.Header) |
||||
headersSub := api.events.SubscribeNewHeads(headers) |
||||
|
||||
for { |
||||
select { |
||||
case h := <-headers: |
||||
notifier.Notify(rpcSub.ID, h) |
||||
case <-rpcSub.Err(): |
||||
headersSub.Unsubscribe() |
||||
return |
||||
case <-notifier.Closed(): |
||||
headersSub.Unsubscribe() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
return rpcSub, nil |
||||
} |
||||
|
||||
// GetFilterChanges returns the logs for the filter with the given id since
|
||||
// last time it was called. This can be used for polling.
|
||||
//
|
||||
// For pending transaction and block filters the result is []common.Hash.
|
||||
// (pending)Log filters return []Log.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
|
||||
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { |
||||
api.filtersMu.Lock() |
||||
defer api.filtersMu.Unlock() |
||||
|
||||
if f, found := api.filters[id]; found { |
||||
if !f.deadline.Stop() { |
||||
// timer expired but filter is not yet removed in timeout loop
|
||||
// receive timer value and reset timer
|
||||
<-f.deadline.C |
||||
} |
||||
f.deadline.Reset(deadline) |
||||
|
||||
switch f.typ { |
||||
case PendingTransactionsSubscription, BlocksSubscription: |
||||
hashes := f.hashes |
||||
f.hashes = nil |
||||
return returnHashes(hashes), nil |
||||
case LogsSubscription: |
||||
logs := f.logs |
||||
f.logs = nil |
||||
return returnLogs(logs), nil |
||||
} |
||||
} |
||||
|
||||
return []interface{}{}, fmt.Errorf("filter not found") |
||||
} |
||||
|
||||
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
|
||||
// otherwise the given hashes array is returned.
|
||||
func returnHashes(hashes []common.Hash) []common.Hash { |
||||
if hashes == nil { |
||||
return []common.Hash{} |
||||
} |
||||
return hashes |
||||
} |
||||
|
||||
// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
|
||||
// otherwise the given logs array is returned.
|
||||
func returnLogs(logs []*types.Log) []*types.Log { |
||||
if logs == nil { |
||||
return []*types.Log{} |
||||
} |
||||
return logs |
||||
} |
||||
|
||||
// Logs creates a subscription that fires for all new log that match the given filter criteria.
|
||||
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { |
||||
notifier, supported := rpc.NotifierFromContext(ctx) |
||||
if !supported { |
||||
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported |
||||
} |
||||
|
||||
var ( |
||||
rpcSub = notifier.CreateSubscription() |
||||
matchedLogs = make(chan []*types.Log) |
||||
) |
||||
|
||||
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
go func() { |
||||
|
||||
for { |
||||
select { |
||||
case logs := <-matchedLogs: |
||||
for _, log := range logs { |
||||
notifier.Notify(rpcSub.ID, &log) |
||||
} |
||||
case <-rpcSub.Err(): // client send an unsubscribe request
|
||||
logsSub.Unsubscribe() |
||||
return |
||||
case <-notifier.Closed(): // connection dropped
|
||||
logsSub.Unsubscribe() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
return rpcSub, nil |
||||
} |
||||
|
||||
// NewFilter creates a new filter and returns the filter id. It can be
|
||||
// used to retrieve logs when the state changes. This method cannot be
|
||||
// used to fetch logs that are already stored in the state.
|
||||
//
|
||||
// Default criteria for the from and to block are "latest".
|
||||
// Using "latest" as block number will return logs for mined blocks.
|
||||
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
|
||||
// In case logs are removed (chain reorg) previously returned logs are returned
|
||||
// again but with the removed property set to true.
|
||||
//
|
||||
// In case "fromBlock" > "toBlock" an error is returned.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
|
||||
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { |
||||
logs := make(chan []*types.Log) |
||||
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs) |
||||
if err != nil { |
||||
return rpc.ID(""), err |
||||
} |
||||
|
||||
api.filtersMu.Lock() |
||||
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub} |
||||
api.filtersMu.Unlock() |
||||
|
||||
go func() { |
||||
for { |
||||
select { |
||||
case l := <-logs: |
||||
api.filtersMu.Lock() |
||||
if f, found := api.filters[logsSub.ID]; found { |
||||
f.logs = append(f.logs, l...) |
||||
} |
||||
api.filtersMu.Unlock() |
||||
case <-logsSub.Err(): |
||||
api.filtersMu.Lock() |
||||
delete(api.filters, logsSub.ID) |
||||
api.filtersMu.Unlock() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
|
||||
return logsSub.ID, nil |
||||
} |
||||
|
||||
// GetLogs returns logs matching the given argument that are stored within the state.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
|
||||
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { |
||||
var filter *Filter |
||||
if crit.BlockHash != nil { |
||||
// Block filter requested, construct a single-shot filter
|
||||
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) |
||||
} else { |
||||
// Convert the RPC block numbers into internal representations
|
||||
begin := rpc.LatestBlockNumber.Int64() |
||||
if crit.FromBlock != nil { |
||||
begin = crit.FromBlock.Int64() |
||||
} |
||||
end := rpc.LatestBlockNumber.Int64() |
||||
if crit.ToBlock != nil { |
||||
end = crit.ToBlock.Int64() |
||||
} |
||||
// Construct the range filter
|
||||
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) |
||||
} |
||||
// Run the filter and return all the logs
|
||||
logs, err := filter.Logs(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return returnLogs(logs), err |
||||
} |
||||
|
||||
// UninstallFilter removes the filter with the given filter id.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
|
||||
func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { |
||||
api.filtersMu.Lock() |
||||
f, found := api.filters[id] |
||||
if found { |
||||
delete(api.filters, id) |
||||
} |
||||
api.filtersMu.Unlock() |
||||
if found { |
||||
f.s.Unsubscribe() |
||||
} |
||||
|
||||
return found |
||||
} |
||||
|
||||
// GetFilterLogs returns the logs for the filter with the given id.
|
||||
// If the filter could not be found an empty array of logs is returned.
|
||||
//
|
||||
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
|
||||
func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) { |
||||
api.filtersMu.Lock() |
||||
f, found := api.filters[id] |
||||
api.filtersMu.Unlock() |
||||
|
||||
if !found || f.typ != LogsSubscription { |
||||
return nil, fmt.Errorf("filter not found") |
||||
} |
||||
|
||||
var filter *Filter |
||||
if f.crit.BlockHash != nil { |
||||
// Block filter requested, construct a single-shot filter
|
||||
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) |
||||
} else { |
||||
// Convert the RPC block numbers into internal representations
|
||||
begin := rpc.LatestBlockNumber.Int64() |
||||
if f.crit.FromBlock != nil { |
||||
begin = f.crit.FromBlock.Int64() |
||||
} |
||||
end := rpc.LatestBlockNumber.Int64() |
||||
if f.crit.ToBlock != nil { |
||||
end = f.crit.ToBlock.Int64() |
||||
} |
||||
// Construct the range filter
|
||||
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) |
||||
} |
||||
// Run the filter and return all the logs
|
||||
logs, err := filter.Logs(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return returnLogs(logs), nil |
||||
} |
@ -0,0 +1,350 @@ |
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package filters |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"math/big" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/core/bloombits" |
||||
ethtypes "github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
// Backend provides the APIs needed for filter
|
||||
type Backend interface { |
||||
ChainDb() ethdb.Database |
||||
EventMux() *event.TypeMux |
||||
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) |
||||
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) |
||||
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) |
||||
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) |
||||
|
||||
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription |
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription |
||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription |
||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription |
||||
|
||||
BloomStatus() (uint64, uint64) |
||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) |
||||
} |
||||
|
||||
// Filter can be used to retrieve and filter logs.
|
||||
type Filter struct { |
||||
backend Backend |
||||
|
||||
db ethdb.Database |
||||
addresses []common.Address |
||||
topics [][]common.Hash |
||||
|
||||
block common.Hash // Block hash if filtering a single block
|
||||
begin, end int64 // Range interval if filtering multiple blocks
|
||||
|
||||
matcher *bloombits.Matcher |
||||
} |
||||
|
||||
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
|
||||
// figure out whether a particular block is interesting or not.
|
||||
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { |
||||
// Flatten the address and topic filter clauses into a single bloombits filter
|
||||
// system. Since the bloombits are not positional, nil topics are permitted,
|
||||
// which get flattened into a nil byte slice.
|
||||
var filters [][][]byte |
||||
if len(addresses) > 0 { |
||||
filter := make([][]byte, len(addresses)) |
||||
for i, address := range addresses { |
||||
filter[i] = address.Bytes() |
||||
} |
||||
filters = append(filters, filter) |
||||
} |
||||
for _, topicList := range topics { |
||||
filter := make([][]byte, len(topicList)) |
||||
for i, topic := range topicList { |
||||
filter[i] = topic.Bytes() |
||||
} |
||||
filters = append(filters, filter) |
||||
} |
||||
size, _ := backend.BloomStatus() |
||||
|
||||
// Create a generic filter and convert it into a range filter
|
||||
filter := newFilter(backend, addresses, topics) |
||||
|
||||
filter.matcher = bloombits.NewMatcher(size, filters) |
||||
filter.begin = begin |
||||
filter.end = end |
||||
|
||||
return filter |
||||
} |
||||
|
||||
// NewBlockFilter creates a new filter which directly inspects the contents of
|
||||
// a block to figure out whether it is interesting or not.
|
||||
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { |
||||
// Create a generic filter and convert it into a block filter
|
||||
filter := newFilter(backend, addresses, topics) |
||||
filter.block = block |
||||
return filter |
||||
} |
||||
|
||||
// newFilter creates a generic filter that can either filter based on a block hash,
|
||||
// or based on range queries. The search criteria needs to be explicitly set.
|
||||
func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { |
||||
return &Filter{ |
||||
backend: backend, |
||||
addresses: addresses, |
||||
topics: topics, |
||||
db: backend.ChainDb(), |
||||
} |
||||
} |
||||
|
||||
// Logs searches the blockchain for matching log entries, returning all from the
|
||||
// first block that contains matches, updating the start of the filter accordingly.
|
||||
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { |
||||
// If we're doing singleton block filtering, execute and return
|
||||
if f.block != (common.Hash{}) { |
||||
header, err := f.backend.HeaderByHash(ctx, f.block) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if header == nil { |
||||
return nil, errors.New("unknown block") |
||||
} |
||||
return f.blockLogs(ctx, header) |
||||
} |
||||
// Figure out the limits of the filter range
|
||||
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) |
||||
if header == nil { |
||||
return nil, nil |
||||
} |
||||
head := header.Number.Uint64() |
||||
|
||||
if f.begin == -1 { |
||||
f.begin = int64(head) |
||||
} |
||||
end := uint64(f.end) |
||||
if f.end == -1 { |
||||
end = head |
||||
} |
||||
// Gather all indexed logs, and finish with non indexed ones
|
||||
var ( |
||||
logs []*types.Log |
||||
err error |
||||
) |
||||
size, sections := f.backend.BloomStatus() |
||||
if indexed := sections * size; indexed > uint64(f.begin) { |
||||
if indexed > end { |
||||
logs, err = f.indexedLogs(ctx, end) |
||||
} else { |
||||
logs, err = f.indexedLogs(ctx, indexed-1) |
||||
} |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
} |
||||
rest, err := f.unindexedLogs(ctx, end) |
||||
logs = append(logs, rest...) |
||||
return logs, err |
||||
} |
||||
|
||||
// indexedLogs returns the logs matching the filter criteria based on the bloom
|
||||
// bits indexed available locally or via the network.
|
||||
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { |
||||
// Create a matcher session and request servicing from the backend
|
||||
matches := make(chan uint64, 64) |
||||
|
||||
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer session.Close() |
||||
|
||||
f.backend.ServiceFilter(ctx, session) |
||||
|
||||
// Iterate over the matches until exhausted or context closed
|
||||
var logs []*types.Log |
||||
|
||||
for { |
||||
select { |
||||
case number, ok := <-matches: |
||||
// Abort if all matches have been fulfilled
|
||||
if !ok { |
||||
err := session.Error() |
||||
if err == nil { |
||||
f.begin = int64(end) + 1 |
||||
} |
||||
return logs, err |
||||
} |
||||
f.begin = int64(number) + 1 |
||||
|
||||
// Retrieve the suggested block and pull any truly matching logs
|
||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) |
||||
if header == nil || err != nil { |
||||
return logs, err |
||||
} |
||||
found, err := f.checkMatches(ctx, header) |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
logs = append(logs, found...) |
||||
|
||||
case <-ctx.Done(): |
||||
return logs, ctx.Err() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// indexedLogs returns the logs matching the filter criteria based on raw block
|
||||
// iteration and bloom matching.
|
||||
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { |
||||
var logs []*types.Log |
||||
|
||||
for ; f.begin <= int64(end); f.begin++ { |
||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) |
||||
if header == nil || err != nil { |
||||
return logs, err |
||||
} |
||||
found, err := f.blockLogs(ctx, header) |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
logs = append(logs, found...) |
||||
} |
||||
return logs, nil |
||||
} |
||||
|
||||
// blockLogs returns the logs matching the filter criteria within a single block.
|
||||
func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { |
||||
if bloomFilter(header.Bloom, f.addresses, f.topics) { |
||||
found, err := f.checkMatches(ctx, header) |
||||
if err != nil { |
||||
return logs, err |
||||
} |
||||
logs = append(logs, found...) |
||||
} |
||||
return logs, nil |
||||
} |
||||
|
||||
// checkMatches checks if the receipts belonging to the given header contain any log events that
|
||||
// match the filter criteria. This function is called when the bloom filter signals a potential match.
|
||||
func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { |
||||
// Get the logs of the block
|
||||
logsList, err := f.backend.GetLogs(ctx, header.Hash()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
var unfiltered []*types.Log |
||||
for _, logs := range logsList { |
||||
unfiltered = append(unfiltered, logs...) |
||||
} |
||||
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) |
||||
if len(logs) > 0 { |
||||
// We have matching logs, check if we need to resolve full logs via the light client
|
||||
if logs[0].TxHash == (common.Hash{}) { |
||||
receipts, err := f.backend.GetReceipts(ctx, header.Hash()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
unfiltered = unfiltered[:0] |
||||
for _, receipt := range receipts { |
||||
unfiltered = append(unfiltered, receipt.Logs...) |
||||
} |
||||
logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) |
||||
} |
||||
return logs, nil |
||||
} |
||||
return nil, nil |
||||
} |
||||
|
||||
func includes(addresses []common.Address, a common.Address) bool { |
||||
for _, addr := range addresses { |
||||
if addr == a { |
||||
return true |
||||
} |
||||
} |
||||
|
||||
return false |
||||
} |
||||
|
||||
// filterLogs creates a slice of logs matching the given criteria.
|
||||
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { |
||||
var ret []*types.Log |
||||
Logs: |
||||
for _, log := range logs { |
||||
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { |
||||
continue |
||||
} |
||||
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { |
||||
continue |
||||
} |
||||
|
||||
if len(addresses) > 0 && !includes(addresses, log.Address) { |
||||
continue |
||||
} |
||||
// If the to filtered topics is greater than the amount of topics in logs, skip.
|
||||
if len(topics) > len(log.Topics) { |
||||
continue Logs |
||||
} |
||||
for i, sub := range topics { |
||||
match := len(sub) == 0 // empty rule set == wildcard
|
||||
for _, topic := range sub { |
||||
if log.Topics[i] == topic { |
||||
match = true |
||||
break |
||||
} |
||||
} |
||||
if !match { |
||||
continue Logs |
||||
} |
||||
} |
||||
ret = append(ret, log) |
||||
} |
||||
return ret |
||||
} |
||||
|
||||
func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool { |
||||
if len(addresses) > 0 { |
||||
var included bool |
||||
for _, addr := range addresses { |
||||
if types.BloomLookup(bloom, addr) { |
||||
included = true |
||||
break |
||||
} |
||||
} |
||||
if !included { |
||||
return false |
||||
} |
||||
} |
||||
|
||||
for _, sub := range topics { |
||||
included := len(sub) == 0 // empty rule set == wildcard
|
||||
for _, topic := range sub { |
||||
if types.BloomLookup(bloom, topic) { |
||||
included = true |
||||
break |
||||
} |
||||
} |
||||
if !included { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
@ -0,0 +1,136 @@ |
||||
package filters |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"math/big" |
||||
|
||||
ethereum "github.com/ethereum/go-ethereum" |
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/common/hexutil" |
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
) |
||||
|
||||
// FilterCriteria represents a request to create a new filter.
|
||||
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
|
||||
type FilterCriteria ethereum.FilterQuery |
||||
|
||||
// UnmarshalJSON sets *args fields with given data.
|
||||
func (args *FilterCriteria) UnmarshalJSON(data []byte) error { |
||||
type input struct { |
||||
BlockHash *common.Hash `json:"blockHash"` |
||||
FromBlock *rpc.BlockNumber `json:"fromBlock"` |
||||
ToBlock *rpc.BlockNumber `json:"toBlock"` |
||||
Addresses interface{} `json:"address"` |
||||
Topics []interface{} `json:"topics"` |
||||
} |
||||
|
||||
var raw input |
||||
if err := json.Unmarshal(data, &raw); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if raw.BlockHash != nil { |
||||
if raw.FromBlock != nil || raw.ToBlock != nil { |
||||
// BlockHash is mutually exclusive with FromBlock/ToBlock criteria
|
||||
return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other") |
||||
} |
||||
args.BlockHash = raw.BlockHash |
||||
} else { |
||||
if raw.FromBlock != nil { |
||||
args.FromBlock = big.NewInt(raw.FromBlock.Int64()) |
||||
} |
||||
|
||||
if raw.ToBlock != nil { |
||||
args.ToBlock = big.NewInt(raw.ToBlock.Int64()) |
||||
} |
||||
} |
||||
|
||||
args.Addresses = []common.Address{} |
||||
|
||||
if raw.Addresses != nil { |
||||
// raw.Address can contain a single address or an array of addresses
|
||||
switch rawAddr := raw.Addresses.(type) { |
||||
case []interface{}: |
||||
for i, addr := range rawAddr { |
||||
if strAddr, ok := addr.(string); ok { |
||||
addr, err := decodeAddress(strAddr) |
||||
if err != nil { |
||||
return fmt.Errorf("invalid address at index %d: %v", i, err) |
||||
} |
||||
args.Addresses = append(args.Addresses, addr) |
||||
} else { |
||||
return fmt.Errorf("non-string address at index %d", i) |
||||
} |
||||
} |
||||
case string: |
||||
addr, err := decodeAddress(rawAddr) |
||||
if err != nil { |
||||
return fmt.Errorf("invalid address: %v", err) |
||||
} |
||||
args.Addresses = []common.Address{addr} |
||||
default: |
||||
return errors.New("invalid addresses in query") |
||||
} |
||||
} |
||||
|
||||
// topics is an array consisting of strings and/or arrays of strings.
|
||||
// JSON null values are converted to common.Hash{} and ignored by the filter manager.
|
||||
if len(raw.Topics) > 0 { |
||||
args.Topics = make([][]common.Hash, len(raw.Topics)) |
||||
for i, t := range raw.Topics { |
||||
switch topic := t.(type) { |
||||
case nil: |
||||
// ignore topic when matching logs
|
||||
|
||||
case string: |
||||
// match specific topic
|
||||
top, err := decodeTopic(topic) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
args.Topics[i] = []common.Hash{top} |
||||
|
||||
case []interface{}: |
||||
// or case e.g. [null, "topic0", "topic1"]
|
||||
for _, rawTopic := range topic { |
||||
if rawTopic == nil { |
||||
// null component, match all
|
||||
args.Topics[i] = nil |
||||
break |
||||
} |
||||
if topic, ok := rawTopic.(string); ok { |
||||
parsed, err := decodeTopic(topic) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
args.Topics[i] = append(args.Topics[i], parsed) |
||||
} else { |
||||
return fmt.Errorf("invalid topic(s)") |
||||
} |
||||
} |
||||
default: |
||||
return fmt.Errorf("invalid topic(s)") |
||||
} |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func decodeAddress(s string) (common.Address, error) { |
||||
b, err := hexutil.Decode(s) |
||||
if err == nil && len(b) != common.AddressLength { |
||||
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength) |
||||
} |
||||
return common.BytesToAddress(b), err |
||||
} |
||||
|
||||
func decodeTopic(s string) (common.Hash, error) { |
||||
b, err := hexutil.Decode(s) |
||||
if err == nil && len(b) != common.HashLength { |
||||
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength) |
||||
} |
||||
return common.BytesToHash(b), err |
||||
} |
@ -0,0 +1,506 @@ |
||||
// Copyright 2015 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package filters implements an ethereum filtering system for block,
|
||||
// transactions and log events.
|
||||
package filters |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"sync" |
||||
"time" |
||||
|
||||
ethereum "github.com/ethereum/go-ethereum" |
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/event" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/rawdb" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
// Type determines the kind of filter and is used to put the filter in to
|
||||
// the correct bucket when added.
|
||||
type Type byte |
||||
|
||||
const ( |
||||
// UnknownSubscription indicates an unknown subscription type
|
||||
UnknownSubscription Type = iota |
||||
// LogsSubscription queries for new or removed (chain reorg) logs
|
||||
LogsSubscription |
||||
// PendingLogsSubscription queries for logs in pending blocks
|
||||
PendingLogsSubscription |
||||
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
|
||||
MinedAndPendingLogsSubscription |
||||
// PendingTransactionsSubscription queries tx hashes for pending
|
||||
// transactions entering the pending state
|
||||
PendingTransactionsSubscription |
||||
// BlocksSubscription queries hashes for blocks that are imported
|
||||
BlocksSubscription |
||||
// LastIndexSubscription keeps track of the last index
|
||||
LastIndexSubscription |
||||
) |
||||
|
||||
const ( |
||||
|
||||
// txChanSize is the size of channel listening to NewTxsEvent.
|
||||
// The number is referenced from the size of tx pool.
|
||||
txChanSize = 4096 |
||||
// rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
|
||||
rmLogsChanSize = 10 |
||||
// logsChanSize is the size of channel listening to LogsEvent.
|
||||
logsChanSize = 10 |
||||
// chainEvChanSize is the size of channel listening to ChainEvent.
|
||||
chainEvChanSize = 10 |
||||
) |
||||
|
||||
type subscription struct { |
||||
id rpc.ID |
||||
typ Type |
||||
created time.Time |
||||
logsCrit ethereum.FilterQuery |
||||
logs chan []*types.Log |
||||
hashes chan []common.Hash |
||||
headers chan *types.Header |
||||
installed chan struct{} // closed when the filter is installed
|
||||
err chan error // closed when the filter is uninstalled
|
||||
} |
||||
|
||||
// EventSystem creates subscriptions, processes events and broadcasts them to the
|
||||
// subscription which match the subscription criteria.
|
||||
type EventSystem struct { |
||||
mux *event.TypeMux |
||||
backend Backend |
||||
lightMode bool |
||||
lastHead *types.Header |
||||
|
||||
// Subscriptions
|
||||
txsSub event.Subscription // Subscription for new transaction event
|
||||
logsSub event.Subscription // Subscription for new log event
|
||||
rmLogsSub event.Subscription // Subscription for removed log event
|
||||
chainSub event.Subscription // Subscription for new chain event
|
||||
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
|
||||
|
||||
// Channels
|
||||
install chan *subscription // install filter for event notification
|
||||
uninstall chan *subscription // remove filter for event notification
|
||||
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
|
||||
logsCh chan []*types.Log // Channel to receive new log event
|
||||
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
||||
chainCh chan core.ChainEvent // Channel to receive new chain event
|
||||
} |
||||
|
||||
// NewEventSystem creates a new manager that listens for event on the given mux,
|
||||
// parses and filters them. It uses the all map to retrieve filter changes. The
|
||||
// work loop holds its own index that is used to forward events to filters.
|
||||
//
|
||||
// The returned manager has a loop that needs to be stopped with the Stop function
|
||||
// or by stopping the given mux.
|
||||
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { |
||||
m := &EventSystem{ |
||||
mux: mux, |
||||
backend: backend, |
||||
lightMode: lightMode, |
||||
install: make(chan *subscription), |
||||
uninstall: make(chan *subscription), |
||||
txsCh: make(chan core.NewTxsEvent, txChanSize), |
||||
logsCh: make(chan []*types.Log, logsChanSize), |
||||
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), |
||||
chainCh: make(chan core.ChainEvent, chainEvChanSize), |
||||
} |
||||
|
||||
// Subscribe events
|
||||
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh) |
||||
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) |
||||
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) |
||||
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) |
||||
// TODO(rjl493456442): use feed to subscribe pending log event
|
||||
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) |
||||
|
||||
// Make sure none of the subscriptions are empty
|
||||
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || |
||||
m.pendingLogSub.Closed() { |
||||
log.Crit("Subscribe for event system failed") |
||||
} |
||||
|
||||
go m.eventLoop() |
||||
return m |
||||
} |
||||
|
||||
// Subscription is created when the client registers itself for a particular event.
|
||||
type Subscription struct { |
||||
ID rpc.ID |
||||
f *subscription |
||||
es *EventSystem |
||||
unsubOnce sync.Once |
||||
} |
||||
|
||||
// Err returns a channel that is closed when unsubscribed.
|
||||
func (sub *Subscription) Err() <-chan error { |
||||
return sub.f.err |
||||
} |
||||
|
||||
// Unsubscribe uninstalls the subscription from the event broadcast loop.
|
||||
func (sub *Subscription) Unsubscribe() { |
||||
sub.unsubOnce.Do(func() { |
||||
uninstallLoop: |
||||
for { |
||||
// write uninstall request and consume logs/hashes. This prevents
|
||||
// the eventLoop broadcast method to deadlock when writing to the
|
||||
// filter event channel while the subscription loop is waiting for
|
||||
// this method to return (and thus not reading these events).
|
||||
select { |
||||
case sub.es.uninstall <- sub.f: |
||||
break uninstallLoop |
||||
case <-sub.f.logs: |
||||
case <-sub.f.hashes: |
||||
case <-sub.f.headers: |
||||
} |
||||
} |
||||
|
||||
// wait for filter to be uninstalled in work loop before returning
|
||||
// this ensures that the manager won't use the event channel which
|
||||
// will probably be closed by the client asap after this method returns.
|
||||
<-sub.Err() |
||||
}) |
||||
} |
||||
|
||||
// subscribe installs the subscription in the event broadcast loop.
|
||||
func (es *EventSystem) subscribe(sub *subscription) *Subscription { |
||||
es.install <- sub |
||||
<-sub.installed |
||||
return &Subscription{ID: sub.id, f: sub, es: es} |
||||
} |
||||
|
||||
// SubscribeLogs creates a subscription that will write all logs matching the
|
||||
// given criteria to the given logs channel. Default value for the from and to
|
||||
// block is "latest". If the fromBlock > toBlock an error is returned.
|
||||
func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { |
||||
var from, to rpc.BlockNumber |
||||
if crit.FromBlock == nil { |
||||
from = rpc.LatestBlockNumber |
||||
} else { |
||||
from = rpc.BlockNumber(crit.FromBlock.Int64()) |
||||
} |
||||
if crit.ToBlock == nil { |
||||
to = rpc.LatestBlockNumber |
||||
} else { |
||||
to = rpc.BlockNumber(crit.ToBlock.Int64()) |
||||
} |
||||
|
||||
// only interested in pending logs
|
||||
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { |
||||
return es.subscribePendingLogs(crit, logs), nil |
||||
} |
||||
// only interested in new mined logs
|
||||
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { |
||||
return es.subscribeLogs(crit, logs), nil |
||||
} |
||||
// only interested in mined logs within a specific block range
|
||||
if from >= 0 && to >= 0 && to >= from { |
||||
return es.subscribeLogs(crit, logs), nil |
||||
} |
||||
// interested in mined logs from a specific block number, new logs and pending logs
|
||||
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { |
||||
return es.subscribeMinedPendingLogs(crit, logs), nil |
||||
} |
||||
// interested in logs from a specific block number to new mined blocks
|
||||
if from >= 0 && to == rpc.LatestBlockNumber { |
||||
return es.subscribeLogs(crit, logs), nil |
||||
} |
||||
return nil, fmt.Errorf("invalid from and to block combination: from > to") |
||||
} |
||||
|
||||
// subscribeMinedPendingLogs creates a subscription that returned mined and
|
||||
// pending logs that match the given criteria.
|
||||
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { |
||||
sub := &subscription{ |
||||
id: rpc.NewID(), |
||||
typ: MinedAndPendingLogsSubscription, |
||||
logsCrit: crit, |
||||
created: time.Now(), |
||||
logs: logs, |
||||
hashes: make(chan []common.Hash), |
||||
headers: make(chan *types.Header), |
||||
installed: make(chan struct{}), |
||||
err: make(chan error), |
||||
} |
||||
return es.subscribe(sub) |
||||
} |
||||
|
||||
// subscribeLogs creates a subscription that will write all logs matching the
|
||||
// given criteria to the given logs channel.
|
||||
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { |
||||
sub := &subscription{ |
||||
id: rpc.NewID(), |
||||
typ: LogsSubscription, |
||||
logsCrit: crit, |
||||
created: time.Now(), |
||||
logs: logs, |
||||
hashes: make(chan []common.Hash), |
||||
headers: make(chan *types.Header), |
||||
installed: make(chan struct{}), |
||||
err: make(chan error), |
||||
} |
||||
return es.subscribe(sub) |
||||
} |
||||
|
||||
// subscribePendingLogs creates a subscription that writes transaction hashes for
|
||||
// transactions that enter the transaction pool.
|
||||
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { |
||||
sub := &subscription{ |
||||
id: rpc.NewID(), |
||||
typ: PendingLogsSubscription, |
||||
logsCrit: crit, |
||||
created: time.Now(), |
||||
logs: logs, |
||||
hashes: make(chan []common.Hash), |
||||
headers: make(chan *types.Header), |
||||
installed: make(chan struct{}), |
||||
err: make(chan error), |
||||
} |
||||
return es.subscribe(sub) |
||||
} |
||||
|
||||
// SubscribeNewHeads creates a subscription that writes the header of a block that is
|
||||
// imported in the chain.
|
||||
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { |
||||
sub := &subscription{ |
||||
id: rpc.NewID(), |
||||
typ: BlocksSubscription, |
||||
created: time.Now(), |
||||
logs: make(chan []*types.Log), |
||||
hashes: make(chan []common.Hash), |
||||
headers: headers, |
||||
installed: make(chan struct{}), |
||||
err: make(chan error), |
||||
} |
||||
return es.subscribe(sub) |
||||
} |
||||
|
||||
// SubscribePendingTxs creates a subscription that writes transaction hashes for
|
||||
// transactions that enter the transaction pool.
|
||||
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { |
||||
sub := &subscription{ |
||||
id: rpc.NewID(), |
||||
typ: PendingTransactionsSubscription, |
||||
created: time.Now(), |
||||
logs: make(chan []*types.Log), |
||||
hashes: hashes, |
||||
headers: make(chan *types.Header), |
||||
installed: make(chan struct{}), |
||||
err: make(chan error), |
||||
} |
||||
return es.subscribe(sub) |
||||
} |
||||
|
||||
type filterIndex map[Type]map[rpc.ID]*subscription |
||||
|
||||
// broadcast event to filters that match criteria.
|
||||
func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { |
||||
if ev == nil { |
||||
return |
||||
} |
||||
|
||||
switch e := ev.(type) { |
||||
case []*types.Log: |
||||
if len(e) > 0 { |
||||
for _, f := range filters[LogsSubscription] { |
||||
if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { |
||||
f.logs <- matchedLogs |
||||
} |
||||
} |
||||
} |
||||
case core.RemovedLogsEvent: |
||||
for _, f := range filters[LogsSubscription] { |
||||
if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { |
||||
f.logs <- matchedLogs |
||||
} |
||||
} |
||||
case *event.TypeMuxEvent: |
||||
if muxe, ok := e.Data.(core.PendingLogsEvent); ok { |
||||
for _, f := range filters[PendingLogsSubscription] { |
||||
if e.Time.After(f.created) { |
||||
if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { |
||||
f.logs <- matchedLogs |
||||
} |
||||
} |
||||
} |
||||
} |
||||
case core.NewTxsEvent: |
||||
hashes := make([]common.Hash, 0, len(e.Txs)) |
||||
for _, tx := range e.Txs { |
||||
hashes = append(hashes, tx.Hash()) |
||||
} |
||||
for _, f := range filters[PendingTransactionsSubscription] { |
||||
f.hashes <- hashes |
||||
} |
||||
case core.ChainEvent: |
||||
for _, f := range filters[BlocksSubscription] { |
||||
f.headers <- e.Block.Header() |
||||
} |
||||
if es.lightMode && len(filters[LogsSubscription]) > 0 { |
||||
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { |
||||
for _, f := range filters[LogsSubscription] { |
||||
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { |
||||
f.logs <- matchedLogs |
||||
} |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { |
||||
oldh := es.lastHead |
||||
es.lastHead = newHeader |
||||
if oldh == nil { |
||||
return |
||||
} |
||||
newh := newHeader |
||||
// find common ancestor, create list of rolled back and new block hashes
|
||||
var oldHeaders, newHeaders []*types.Header |
||||
for oldh.Hash() != newh.Hash() { |
||||
if oldh.Number.Uint64() >= newh.Number.Uint64() { |
||||
oldHeaders = append(oldHeaders, oldh) |
||||
oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) |
||||
} |
||||
if oldh.Number.Uint64() < newh.Number.Uint64() { |
||||
newHeaders = append(newHeaders, newh) |
||||
newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) |
||||
if newh == nil { |
||||
// happens when CHT syncing, nothing to do
|
||||
newh = oldh |
||||
} |
||||
} |
||||
} |
||||
// roll back old blocks
|
||||
for _, h := range oldHeaders { |
||||
callBack(h, true) |
||||
} |
||||
// check new blocks (array is in reverse order)
|
||||
for i := len(newHeaders) - 1; i >= 0; i-- { |
||||
callBack(newHeaders[i], false) |
||||
} |
||||
} |
||||
|
||||
// filter logs of a single header in light client mode
|
||||
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { |
||||
if bloomFilter(header.Bloom, addresses, topics) { |
||||
// Get the logs of the block
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) |
||||
defer cancel() |
||||
logsList, err := es.backend.GetLogs(ctx, header.Hash()) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
var unfiltered []*types.Log |
||||
for _, logs := range logsList { |
||||
for _, log := range logs { |
||||
logcopy := *log |
||||
logcopy.Removed = remove |
||||
unfiltered = append(unfiltered, &logcopy) |
||||
} |
||||
} |
||||
logs := filterLogs(unfiltered, nil, nil, addresses, topics) |
||||
if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) { |
||||
// We have matching but non-derived logs
|
||||
receipts, err := es.backend.GetReceipts(ctx, header.Hash()) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
unfiltered = unfiltered[:0] |
||||
for _, receipt := range receipts { |
||||
for _, log := range receipt.Logs { |
||||
logcopy := *log |
||||
logcopy.Removed = remove |
||||
unfiltered = append(unfiltered, &logcopy) |
||||
} |
||||
} |
||||
logs = filterLogs(unfiltered, nil, nil, addresses, topics) |
||||
} |
||||
return logs |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// eventLoop (un)installs filters and processes mux events.
|
||||
func (es *EventSystem) eventLoop() { |
||||
// Ensure all subscriptions get cleaned up
|
||||
defer func() { |
||||
es.pendingLogSub.Unsubscribe() |
||||
es.txsSub.Unsubscribe() |
||||
es.logsSub.Unsubscribe() |
||||
es.rmLogsSub.Unsubscribe() |
||||
es.chainSub.Unsubscribe() |
||||
}() |
||||
|
||||
index := make(filterIndex) |
||||
for i := UnknownSubscription; i < LastIndexSubscription; i++ { |
||||
index[i] = make(map[rpc.ID]*subscription) |
||||
} |
||||
|
||||
for { |
||||
select { |
||||
// Handle subscribed events
|
||||
case ev := <-es.txsCh: |
||||
es.broadcast(index, ev) |
||||
case ev := <-es.logsCh: |
||||
es.broadcast(index, ev) |
||||
case ev := <-es.rmLogsCh: |
||||
es.broadcast(index, ev) |
||||
case ev := <-es.chainCh: |
||||
es.broadcast(index, ev) |
||||
case ev, active := <-es.pendingLogSub.Chan(): |
||||
if !active { // system stopped
|
||||
return |
||||
} |
||||
es.broadcast(index, ev) |
||||
|
||||
case f := <-es.install: |
||||
if f.typ == MinedAndPendingLogsSubscription { |
||||
// the type are logs and pending logs subscriptions
|
||||
index[LogsSubscription][f.id] = f |
||||
index[PendingLogsSubscription][f.id] = f |
||||
} else { |
||||
index[f.typ][f.id] = f |
||||
} |
||||
close(f.installed) |
||||
|
||||
case f := <-es.uninstall: |
||||
if f.typ == MinedAndPendingLogsSubscription { |
||||
// the type are logs and pending logs subscriptions
|
||||
delete(index[LogsSubscription], f.id) |
||||
delete(index[PendingLogsSubscription], f.id) |
||||
} else { |
||||
delete(index[f.typ], f.id) |
||||
} |
||||
close(f.err) |
||||
|
||||
// System stopped
|
||||
case <-es.txsSub.Err(): |
||||
return |
||||
case <-es.logsSub.Err(): |
||||
return |
||||
case <-es.rmLogsSub.Err(): |
||||
return |
||||
case <-es.chainSub.Err(): |
||||
return |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue