diff --git a/core/api_backend.go b/core/api_backend.go deleted file mode 100644 index e8b433e30..000000000 --- a/core/api_backend.go +++ /dev/null @@ -1,119 +0,0 @@ -package core - -import ( - "context" - "errors" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rpc" - "github.com/harmony-one/harmony/accounts" - "github.com/harmony-one/harmony/core/state" - "github.com/harmony-one/harmony/core/types" -) - -// HmyAPIBackend ... -type HmyAPIBackend struct { - blockchain *BlockChain - txPool *TxPool - accountManager *accounts.Manager - nodeAPI NodeAPIFunctions -} - -// NodeAPIFunctions is the list of functions from node used to call rpc apis. -type NodeAPIFunctions interface { - AddPendingTransaction(newTx *types.Transaction) -} - -// NewBackend ... -func NewBackend(blockchain *BlockChain, txPool *TxPool, accountManager *accounts.Manager, nodeAPI NodeAPIFunctions) *HmyAPIBackend { - return &HmyAPIBackend{blockchain, txPool, accountManager, nodeAPI} -} - -// ChainDb ... -func (b *HmyAPIBackend) ChainDb() ethdb.Database { - return b.blockchain.db -} - -// GetBlock ... -func (b *HmyAPIBackend) GetBlock(ctx context.Context, hash common.Hash) (*types.Block, error) { - return b.blockchain.GetBlockByHash(hash), nil -} - -// GetPoolTransaction ... -func (b *HmyAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { - return b.txPool.Get(hash) -} - -// BlockByNumber ... -func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { - // Pending block is only known by the miner - if blockNr == rpc.PendingBlockNumber { - return nil, errors.New("not implemented") - } - // Otherwise resolve and return the block - if blockNr == rpc.LatestBlockNumber { - return b.blockchain.CurrentBlock(), nil - } - return b.blockchain.GetBlockByNumber(uint64(blockNr)), nil -} - -// StateAndHeaderByNumber ... -func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) { - // Pending state is only known by the miner - if blockNr == rpc.PendingBlockNumber { - return nil, nil, errors.New("not implemented") - } - // Otherwise resolve the block number and return its state - header, err := b.HeaderByNumber(ctx, blockNr) - if header == nil || err != nil { - return nil, nil, err - } - stateDb, err := b.blockchain.StateAt(header.Root) - return stateDb, header, err -} - -// HeaderByNumber ... -func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { - // Pending block is only known by the miner - if blockNr == rpc.PendingBlockNumber { - return nil, errors.New("not implemented") - } - // Otherwise resolve and return the block - if blockNr == rpc.LatestBlockNumber { - return b.blockchain.CurrentBlock().Header(), nil - } - return b.blockchain.GetHeaderByNumber(uint64(blockNr)), nil -} - -// GetPoolNonce ... -func (b *HmyAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { - return b.txPool.State().GetNonce(addr), nil -} - -// SendTx ... -func (b *HmyAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - b.nodeAPI.AddPendingTransaction(signedTx) - return nil -} - -// ChainConfig ... -func (b *HmyAPIBackend) ChainConfig() *params.ChainConfig { - return b.blockchain.chainConfig -} - -// CurrentBlock ... -func (b *HmyAPIBackend) CurrentBlock() *types.Block { - return types.NewBlockWithHeader(b.blockchain.CurrentHeader()) -} - -// AccountManager ... -func (b *HmyAPIBackend) AccountManager() *accounts.Manager { - return b.accountManager -} - -// GetReceipts ... -func (b *HmyAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { - return b.blockchain.GetReceiptsByHash(hash), nil -} diff --git a/core/blockchain.go b/core/blockchain.go index 6789c4c58..5a47ffcdd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1780,3 +1780,10 @@ func (bc *BlockChain) StoreEpochBlockNumber( } return nil } + +// ChainDB ... +// TODO(ricl): in eth, this is not exposed. I expose it here because I need it in Harmony object. +// In eth, chainDB is initialized within Ethereum object +func (bc *BlockChain) ChainDB() ethdb.Database { + return bc.db +} diff --git a/core/chain_indexer.go b/core/chain_indexer.go new file mode 100644 index 000000000..db6195292 --- /dev/null +++ b/core/chain_indexer.go @@ -0,0 +1,496 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "context" + "encoding/binary" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/harmony-one/harmony/core/rawdb" + "github.com/harmony-one/harmony/core/types" +) + +// ChainIndexerBackend defines the methods needed to process chain segments in +// the background and write the segment results into the database. These can be +// used to create filter blooms or CHTs. +type ChainIndexerBackend interface { + // Reset initiates the processing of a new chain segment, potentially terminating + // any partially completed operations (in case of a reorg). + Reset(ctx context.Context, section uint64, prevHead common.Hash) error + + // Process crunches through the next header in the chain segment. The caller + // will ensure a sequential order of headers. + Process(ctx context.Context, header *types.Header) error + + // Commit finalizes the section metadata and stores it into the database. + Commit() error +} + +// ChainIndexerChain interface is used for connecting the indexer to a blockchain +type ChainIndexerChain interface { + // CurrentHeader retrieves the latest locally known header. + CurrentHeader() *types.Header + + // SubscribeChainHeadEvent subscribes to new head header notifications. + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription +} + +// ChainIndexer does a post-processing job for equally sized sections of the +// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is +// connected to the blockchain through the event system by starting a +// ChainHeadEventLoop in a goroutine. +// +// Further child ChainIndexers can be added which use the output of the parent +// section indexer. These child indexers receive new head notifications only +// after an entire section has been finished or in case of rollbacks that might +// affect already finished sections. +type ChainIndexer struct { + chainDb ethdb.Database // Chain database to index the data from + indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into + backend ChainIndexerBackend // Background processor generating the index data content + children []*ChainIndexer // Child indexers to cascade chain updates to + + active uint32 // Flag whether the event loop was started + update chan struct{} // Notification channel that headers should be processed + quit chan chan error // Quit channel to tear down running goroutines + ctx context.Context + ctxCancel func() + + sectionSize uint64 // Number of blocks in a single chain segment to process + confirmsReq uint64 // Number of confirmations before processing a completed segment + + storedSections uint64 // Number of sections successfully indexed into the database + knownSections uint64 // Number of sections known to be complete (block wise) + cascadedHead uint64 // Block number of the last completed section cascaded to subindexers + + checkpointSections uint64 // Number of sections covered by the checkpoint + checkpointHead common.Hash // Section head belonging to the checkpoint + + throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources + + log log.Logger + lock sync.RWMutex +} + +// NewChainIndexer creates a new chain indexer to do background processing on +// chain segments of a given size after certain number of confirmations passed. +// The throttling parameter might be used to prevent database thrashing. +func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { + c := &ChainIndexer{ + chainDb: chainDb, + indexDb: indexDb, + backend: backend, + update: make(chan struct{}, 1), + quit: make(chan chan error), + sectionSize: section, + confirmsReq: confirm, + throttling: throttling, + log: log.New("type", kind), + } + // Initialize database dependent fields and start the updater + c.loadValidSections() + c.ctx, c.ctxCancel = context.WithCancel(context.Background()) + + go c.updateLoop() + + return c +} + +// AddCheckpoint adds a checkpoint. Sections are never processed and the chain +// is not expected to be available before this point. The indexer assumes that +// the backend has sufficient information available to process subsequent sections. +// +// Note: knownSections == 0 and storedSections == checkpointSections until +// syncing reaches the checkpoint +func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + c.checkpointSections = section + 1 + c.checkpointHead = shead + + if section < c.storedSections { + return + } + c.setSectionHead(section, shead) + c.setValidSections(section + 1) +} + +// Start creates a goroutine to feed chain head events into the indexer for +// cascading background processing. Children do not need to be started, they +// are notified about new events by their parents. +func (c *ChainIndexer) Start(chain ChainIndexerChain) { + events := make(chan ChainHeadEvent, 10) + sub := chain.SubscribeChainHeadEvent(events) + + go c.eventLoop(chain.CurrentHeader(), events, sub) +} + +// Close tears down all goroutines belonging to the indexer and returns any error +// that might have occurred internally. +func (c *ChainIndexer) Close() error { + var errs []error + + c.ctxCancel() + + // Tear down the primary update loop + errc := make(chan error) + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + // If needed, tear down the secondary event loop + if atomic.LoadUint32(&c.active) != 0 { + c.quit <- errc + if err := <-errc; err != nil { + errs = append(errs, err) + } + } + // Close all children + for _, child := range c.children { + if err := child.Close(); err != nil { + errs = append(errs, err) + } + } + // Return any failures + switch { + case len(errs) == 0: + return nil + + case len(errs) == 1: + return errs[0] + + default: + return fmt.Errorf("%v", errs) + } +} + +// eventLoop is a secondary - optional - event loop of the indexer which is only +// started for the outermost indexer to push chain head events into a processing +// queue. +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) { + // Mark the chain indexer as active, requiring an additional teardown + atomic.StoreUint32(&c.active, 1) + + defer sub.Unsubscribe() + + // Fire the initial new head event to start any outstanding processing + c.newHead(currentHeader.Number.Uint64(), false) + + var ( + prevHeader = currentHeader + prevHash = currentHeader.Hash() + ) + for { + select { + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil + return + + case ev, ok := <-events: + // Received a new event, ensure it's not nil (closing) and update + if !ok { + errc := <-c.quit + errc <- nil + return + } + header := ev.Block.Header() + if header.ParentHash != prevHash { + // Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then) + // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly? + + if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash { + if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { + c.newHead(h.Number.Uint64(), true) + } + } + } + c.newHead(header.Number.Uint64(), false) + + prevHeader, prevHash = header, header.Hash() + } + } +} + +// newHead notifies the indexer about new chain heads and/or reorgs. +func (c *ChainIndexer) newHead(head uint64, reorg bool) { + c.lock.Lock() + defer c.lock.Unlock() + + // If a reorg happened, invalidate all sections until that point + if reorg { + // Revert the known section number to the reorg point + known := head / c.sectionSize + stored := known + if known < c.checkpointSections { + known = 0 + } + if stored < c.checkpointSections { + stored = c.checkpointSections + } + if known < c.knownSections { + c.knownSections = known + } + // Revert the stored sections from the database to the reorg point + if stored < c.storedSections { + c.setValidSections(stored) + } + // Update the new head number to the finalized section end and notify children + head = known * c.sectionSize + + if head < c.cascadedHead { + c.cascadedHead = head + for _, child := range c.children { + child.newHead(c.cascadedHead, true) + } + } + return + } + // No reorg, calculate the number of newly known sections and update if high enough + var sections uint64 + if head >= c.confirmsReq { + sections = (head + 1 - c.confirmsReq) / c.sectionSize + if sections < c.checkpointSections { + sections = 0 + } + if sections > c.knownSections { + if c.knownSections < c.checkpointSections { + // syncing reached the checkpoint, verify section head + syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1) + if syncedHead != c.checkpointHead { + c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead) + return + } + } + c.knownSections = sections + + select { + case c.update <- struct{}{}: + default: + } + } + } +} + +// updateLoop is the main event loop of the indexer which pushes chain segments +// down into the processing backend. +func (c *ChainIndexer) updateLoop() { + var ( + updating bool + updated time.Time + ) + + for { + select { + case errc := <-c.quit: + // Chain indexer terminating, report no failure and abort + errc <- nil + return + + case <-c.update: + // Section headers completed (or rolled back), update the index + c.lock.Lock() + if c.knownSections > c.storedSections { + // Periodically print an upgrade log message to the user + if time.Since(updated) > 8*time.Second { + if c.knownSections > c.storedSections+1 { + updating = true + c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) + } + updated = time.Now() + } + // Cache the current section count and head to allow unlocking the mutex + section := c.storedSections + var oldHead common.Hash + if section > 0 { + oldHead = c.SectionHead(section - 1) + } + // Process the newly defined section in the background + c.lock.Unlock() + newHead, err := c.processSection(section, oldHead) + if err != nil { + select { + case <-c.ctx.Done(): + <-c.quit <- nil + return + default: + } + c.log.Error("Section processing failed", "error", err) + } + c.lock.Lock() + + // If processing succeeded and no reorgs occcurred, mark the section completed + if err == nil && oldHead == c.SectionHead(section-1) { + c.setSectionHead(section, newHead) + c.setValidSections(section + 1) + if c.storedSections == c.knownSections && updating { + updating = false + c.log.Info("Finished upgrading chain index") + } + c.cascadedHead = c.storedSections*c.sectionSize - 1 + for _, child := range c.children { + c.log.Trace("Cascading chain index update", "head", c.cascadedHead) + child.newHead(c.cascadedHead, false) + } + } else { + // If processing failed, don't retry until further notification + c.log.Debug("Chain index processing failed", "section", section, "err", err) + c.knownSections = c.storedSections + } + } + // If there are still further sections to process, reschedule + if c.knownSections > c.storedSections { + time.AfterFunc(c.throttling, func() { + select { + case c.update <- struct{}{}: + default: + } + }) + } + c.lock.Unlock() + } + } +} + +// processSection processes an entire section by calling backend functions while +// ensuring the continuity of the passed headers. Since the chain mutex is not +// held while processing, the continuity can be broken by a long reorg, in which +// case the function returns with an error. +func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { + c.log.Trace("Processing new chain section", "section", section) + + // Reset and partial processing + + if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { + c.setValidSections(0) + return common.Hash{}, err + } + + for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { + hash := rawdb.ReadCanonicalHash(c.chainDb, number) + if hash == (common.Hash{}) { + return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) + } + header := rawdb.ReadHeader(c.chainDb, hash, number) + if header == nil { + return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) + } else if header.ParentHash != lastHead { + return common.Hash{}, fmt.Errorf("chain reorged during section processing") + } + if err := c.backend.Process(c.ctx, header); err != nil { + return common.Hash{}, err + } + lastHead = header.Hash() + } + if err := c.backend.Commit(); err != nil { + return common.Hash{}, err + } + return lastHead, nil +} + +// Sections returns the number of processed sections maintained by the indexer +// and also the information about the last header indexed for potential canonical +// verifications. +func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) +} + +// AddChildIndexer adds a child ChainIndexer that can use the output of this one +func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { + c.lock.Lock() + defer c.lock.Unlock() + + c.children = append(c.children, indexer) + + // Cascade any pending updates to new children too + sections := c.storedSections + if c.knownSections < sections { + // if a section is "stored" but not "known" then it is a checkpoint without + // available chain data so we should not cascade it yet + sections = c.knownSections + } + if sections > 0 { + indexer.newHead(sections*c.sectionSize-1, false) + } +} + +// loadValidSections reads the number of valid sections from the index database +// and caches is into the local state. +func (c *ChainIndexer) loadValidSections() { + data, _ := c.indexDb.Get([]byte("count")) + if len(data) == 8 { + c.storedSections = binary.BigEndian.Uint64(data) + } +} + +// setValidSections writes the number of valid sections to the index database +func (c *ChainIndexer) setValidSections(sections uint64) { + // Set the current number of valid sections in the database + var data [8]byte + binary.BigEndian.PutUint64(data[:], sections) + c.indexDb.Put([]byte("count"), data[:]) + + // Remove any reorged sections, caching the valids in the mean time + for c.storedSections > sections { + c.storedSections-- + c.removeSectionHead(c.storedSections) + } + c.storedSections = sections // needed if new > old +} + +// SectionHead retrieves the last block hash of a processed section from the +// index database. +func (c *ChainIndexer) SectionHead(section uint64) common.Hash { + var data [8]byte + binary.BigEndian.PutUint64(data[:], section) + + hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) + if len(hash) == len(common.Hash{}) { + return common.BytesToHash(hash) + } + return common.Hash{} +} + +// setSectionHead writes the last block hash of a processed section to the index +// database. +func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { + var data [8]byte + binary.BigEndian.PutUint64(data[:], section) + + c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) +} + +// removeSectionHead removes the reference to a processed section from the index +// database. +func (c *ChainIndexer) removeSectionHead(section uint64) { + var data [8]byte + binary.BigEndian.PutUint64(data[:], section) + + c.indexDb.Delete(append([]byte("shead"), data[:]...)) +} diff --git a/core/types/block.go b/core/types/block.go index f7db428f7..7f6c2737d 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "golang.org/x/crypto/sha3" @@ -75,7 +76,7 @@ type Header struct { Root common.Hash `json:"stateRoot" gencodec:"required"` TxHash common.Hash `json:"transactionsRoot" gencodec:"required"` ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"` - Bloom Bloom `json:"logsBloom" gencodec:"required"` + Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"` Difficulty *big.Int `json:"difficulty" gencodec:"required"` Number *big.Int `json:"number" gencodec:"required"` GasLimit uint64 `json:"gasLimit" gencodec:"required"` @@ -351,7 +352,7 @@ func (b *Block) Nonce() uint64 { return binary.BigEndian.Uint64(b.header.Nonce[: func (b *Block) ShardID() uint32 { return b.header.ShardID } // Bloom returns header bloom. -func (b *Block) Bloom() Bloom { return b.header.Bloom } +func (b *Block) Bloom() ethtypes.Bloom { return b.header.Bloom } // Coinbase returns header coinbase. func (b *Block) Coinbase() common.Address { return b.header.Coinbase } diff --git a/core/types/bloom9.go b/core/types/bloom9.go index 5b4ca44b1..0af7e8410 100644 --- a/core/types/bloom9.go +++ b/core/types/bloom9.go @@ -17,10 +17,9 @@ package types import ( - "fmt" "math/big" - "github.com/ethereum/go-ethereum/common/hexutil" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" ) @@ -36,66 +35,16 @@ const ( BloomBitLength = 8 * BloomByteLength ) -// Bloom represents a 2048 bit bloom filter. -type Bloom [BloomByteLength]byte - // BytesToBloom converts a byte slice to a bloom filter. // It panics if b is not of suitable size. -func BytesToBloom(b []byte) Bloom { - var bloom Bloom +func BytesToBloom(b []byte) ethtypes.Bloom { + var bloom ethtypes.Bloom bloom.SetBytes(b) return bloom } -// SetBytes sets the content of b to the given bytes. -// It panics if d is not of suitable size. -func (b *Bloom) SetBytes(d []byte) { - if len(b) < len(d) { - panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d))) - } - copy(b[BloomByteLength-len(d):], d) -} - -// Add adds d to the filter. Future calls of Test(d) will return true. -func (b *Bloom) Add(d *big.Int) { - bin := new(big.Int).SetBytes(b[:]) - bin.Or(bin, bloom9(d.Bytes())) - b.SetBytes(bin.Bytes()) -} - -// Big converts b to a big integer. -func (b Bloom) Big() *big.Int { - return new(big.Int).SetBytes(b[:]) -} - -// Bytes returns bytes of Bloom. -func (b Bloom) Bytes() []byte { - return b[:] -} - -// Test tests if an input may belong to the bloom. -func (b Bloom) Test(test *big.Int) bool { - return BloomLookup(b, test) -} - -// TestBytes tests if the input represented by test []byte may belong to the bloom. -func (b Bloom) TestBytes(test []byte) bool { - return b.Test(new(big.Int).SetBytes(test)) - -} - -// MarshalText encodes b as a hex string with 0x prefix. -func (b Bloom) MarshalText() ([]byte, error) { - return hexutil.Bytes(b[:]).MarshalText() -} - -// UnmarshalText b as a hex string with 0x prefix. -func (b *Bloom) UnmarshalText(input []byte) error { - return hexutil.UnmarshalFixedText("Bloom", input, b[:]) -} - // CreateBloom creates a Bloom given the receipts. -func CreateBloom(receipts Receipts) Bloom { +func CreateBloom(receipts Receipts) ethtypes.Bloom { bin := new(big.Int) for _, receipt := range receipts { bin.Or(bin, LogsBloom(receipt.Logs)) @@ -135,7 +84,7 @@ func bloom9(b []byte) *big.Int { var Bloom9 = bloom9 // BloomLookup checks if a topic may belong to the Bloom. -func BloomLookup(bin Bloom, topic bytesBacked) bool { +func BloomLookup(bin ethtypes.Bloom, topic bytesBacked) bool { bloom := bin.Big() cmp := bloom9(topic.Bytes()) diff --git a/core/types/bloom9_test.go b/core/types/bloom9_test.go index a28ac0e7a..718a2078d 100644 --- a/core/types/bloom9_test.go +++ b/core/types/bloom9_test.go @@ -19,6 +19,8 @@ package types import ( "math/big" "testing" + + ethtypes "github.com/ethereum/go-ethereum/core/types" ) func TestBloom(t *testing.T) { @@ -33,7 +35,7 @@ func TestBloom(t *testing.T) { "lo", } - var bloom Bloom + var bloom ethtypes.Bloom for _, data := range positive { bloom.Add(new(big.Int).SetBytes([]byte(data))) } diff --git a/core/types/gen_header_json.go b/core/types/gen_header_json.go index d2c655941..7684102f0 100644 --- a/core/types/gen_header_json.go +++ b/core/types/gen_header_json.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethtypes "github.com/ethereum/go-ethereum/core/types" ) var _ = (*headerMarshaling)(nil) @@ -20,7 +21,7 @@ func (h Header) MarshalJSON() ([]byte, error) { Root common.Hash `json:"stateRoot" gencodec:"required"` TxHash common.Hash `json:"transactionsRoot" gencodec:"required"` ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"` - Bloom Bloom `json:"logsBloom" gencodec:"required"` + Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"` Difficulty *hexutil.Big `json:"difficulty" gencodec:"required"` Number *hexutil.Big `json:"number" gencodec:"required"` GasLimit hexutil.Uint64 `json:"gasLimit" gencodec:"required"` @@ -57,7 +58,7 @@ func (h *Header) UnmarshalJSON(input []byte) error { Root *common.Hash `json:"stateRoot" gencodec:"required"` TxHash *common.Hash `json:"transactionsRoot" gencodec:"required"` ReceiptHash *common.Hash `json:"receiptsRoot" gencodec:"required"` - Bloom *Bloom `json:"logsBloom" gencodec:"required"` + Bloom *ethtypes.Bloom `json:"logsBloom" gencodec:"required"` Difficulty *hexutil.Big `json:"difficulty" gencodec:"required"` Number *hexutil.Big `json:"number" gencodec:"required"` GasLimit *hexutil.Uint64 `json:"gasLimit" gencodec:"required"` diff --git a/core/types/gen_receipt_json.go b/core/types/gen_receipt_json.go index 5c807a4cc..ca9c9c361 100644 --- a/core/types/gen_receipt_json.go +++ b/core/types/gen_receipt_json.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethtypes "github.com/ethereum/go-ethereum/core/types" ) var _ = (*receiptMarshaling)(nil) @@ -18,7 +19,7 @@ func (r Receipt) MarshalJSON() ([]byte, error) { PostState hexutil.Bytes `json:"root"` Status hexutil.Uint64 `json:"status"` CumulativeGasUsed hexutil.Uint64 `json:"cumulativeGasUsed" gencodec:"required"` - Bloom Bloom `json:"logsBloom" gencodec:"required"` + Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"` Logs []*Log `json:"logs" gencodec:"required"` TxHash common.Hash `json:"transactionHash" gencodec:"required"` ContractAddress common.Address `json:"contractAddress"` @@ -42,7 +43,7 @@ func (r *Receipt) UnmarshalJSON(input []byte) error { PostState *hexutil.Bytes `json:"root"` Status *hexutil.Uint64 `json:"status"` CumulativeGasUsed *hexutil.Uint64 `json:"cumulativeGasUsed" gencodec:"required"` - Bloom *Bloom `json:"logsBloom" gencodec:"required"` + Bloom *ethtypes.Bloom `json:"logsBloom" gencodec:"required"` Logs []*Log `json:"logs" gencodec:"required"` TxHash *common.Hash `json:"transactionHash" gencodec:"required"` ContractAddress *common.Address `json:"contractAddress"` diff --git a/core/types/receipt.go b/core/types/receipt.go index aee69a66f..93babd3ef 100644 --- a/core/types/receipt.go +++ b/core/types/receipt.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" ) @@ -45,11 +46,11 @@ const ( // Receipt represents the results of a transaction. type Receipt struct { // Consensus fields - PostState []byte `json:"root"` - Status uint64 `json:"status"` - CumulativeGasUsed uint64 `json:"cumulativeGasUsed" gencodec:"required"` - Bloom Bloom `json:"logsBloom" gencodec:"required"` - Logs []*Log `json:"logs" gencodec:"required"` + PostState []byte `json:"root"` + Status uint64 `json:"status"` + CumulativeGasUsed uint64 `json:"cumulativeGasUsed" gencodec:"required"` + Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"` + Logs []*Log `json:"logs" gencodec:"required"` // Implementation fields (don't reorder!) TxHash common.Hash `json:"transactionHash" gencodec:"required"` @@ -68,14 +69,14 @@ type receiptMarshaling struct { type receiptRLP struct { PostStateOrStatus []byte CumulativeGasUsed uint64 - Bloom Bloom + Bloom ethtypes.Bloom Logs []*Log } type receiptStorageRLP struct { PostStateOrStatus []byte CumulativeGasUsed uint64 - Bloom Bloom + Bloom ethtypes.Bloom TxHash common.Hash ContractAddress common.Address Logs []*LogForStorage diff --git a/hmy/api_backend.go b/hmy/api_backend.go new file mode 100644 index 000000000..f87a46a7a --- /dev/null +++ b/hmy/api_backend.go @@ -0,0 +1,187 @@ +package hmy + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/accounts" + "github.com/harmony-one/harmony/api/proto" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/state" + "github.com/harmony-one/harmony/core/types" +) + +// APIBackend An implementation of Backend. Full client. +type APIBackend struct { + hmy *Harmony +} + +// ChainDb ... +func (b *APIBackend) ChainDb() ethdb.Database { + return b.hmy.chainDb +} + +// GetBlock ... +func (b *APIBackend) GetBlock(ctx context.Context, hash common.Hash) (*types.Block, error) { + return b.hmy.blockchain.GetBlockByHash(hash), nil +} + +// GetPoolTransaction ... +func (b *APIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { + return b.hmy.txPool.Get(hash) +} + +// BlockByNumber ... +func (b *APIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { + // Pending block is only known by the miner + if blockNr == rpc.PendingBlockNumber { + return nil, errors.New("not implemented") + } + // Otherwise resolve and return the block + if blockNr == rpc.LatestBlockNumber { + return b.hmy.blockchain.CurrentBlock(), nil + } + return b.hmy.blockchain.GetBlockByNumber(uint64(blockNr)), nil +} + +// StateAndHeaderByNumber ... +func (b *APIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) { + // Pending state is only known by the miner + if blockNr == rpc.PendingBlockNumber { + return nil, nil, errors.New("not implemented") + } + // Otherwise resolve the block number and return its state + header, err := b.HeaderByNumber(ctx, blockNr) + if header == nil || err != nil { + return nil, nil, err + } + stateDb, err := b.hmy.blockchain.StateAt(header.Root) + return stateDb, header, err +} + +// HeaderByNumber ... +func (b *APIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { + // Pending block is only known by the miner + if blockNr == rpc.PendingBlockNumber { + return nil, errors.New("not implemented") + } + // Otherwise resolve and return the block + if blockNr == rpc.LatestBlockNumber { + return b.hmy.blockchain.CurrentBlock().Header(), nil + } + return b.hmy.blockchain.GetHeaderByNumber(uint64(blockNr)), nil +} + +// GetPoolNonce ... +func (b *APIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { + return b.hmy.txPool.State().GetNonce(addr), nil +} + +// SendTx ... +func (b *APIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { + // return b.hmy.txPool.Add(ctx, signedTx) + b.hmy.nodeAPI.AddPendingTransaction(signedTx) + return nil // TODO(ricl): AddPendingTransaction should return error +} + +// ChainConfig ... +func (b *APIBackend) ChainConfig() *params.ChainConfig { + return b.hmy.blockchain.Config() +} + +// CurrentBlock ... +func (b *APIBackend) CurrentBlock() *types.Block { + return types.NewBlockWithHeader(b.hmy.blockchain.CurrentHeader()) +} + +// AccountManager ... +func (b *APIBackend) AccountManager() *accounts.Manager { + return b.hmy.accountManager +} + +// GetReceipts ... +func (b *APIBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { + return b.hmy.blockchain.GetReceiptsByHash(hash), nil +} + +// EventMux ... +func (b *APIBackend) EventMux() *event.TypeMux { return b.hmy.eventMux } + +// BloomStatus ... +func (b *APIBackend) BloomStatus() (uint64, uint64) { + sections, _, _ := b.hmy.bloomIndexer.Sections() + return params.BloomBitsBlocks, sections +} + +// ProtocolVersion ... +func (b *APIBackend) ProtocolVersion() int { + return proto.ProtocolVersion +} + +// Filter related APIs + +// GetLogs ... +func (b *APIBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) { + // TODO(ricl): implement + return nil, nil +} + +// HeaderByHash ... +func (b *APIBackend) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) { + // TODO(ricl): implement + return nil, nil +} + +// ServiceFilter ... +func (b *APIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { + // TODO(ricl): implement +} + +// SubscribeNewTxsEvent ... +func (b *APIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return b.hmy.TxPool().SubscribeNewTxsEvent(ch) +} + +// SubscribeChainEvent ... +func (b *APIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { + return b.hmy.BlockChain().SubscribeChainEvent(ch) +} + +// SubscribeChainHeadEvent ... +func (b *APIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return b.hmy.BlockChain().SubscribeChainHeadEvent(ch) +} + +// SubscribeChainSideEvent ... +func (b *APIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { + return b.hmy.BlockChain().SubscribeChainSideEvent(ch) +} + +// SubscribeRemovedLogsEvent ... +func (b *APIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { + return b.hmy.BlockChain().SubscribeRemovedLogsEvent(ch) +} + +// SubscribeLogsEvent ... +func (b *APIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.hmy.BlockChain().SubscribeLogsEvent(ch) +} + +// GetPoolTransactions ... +func (b *APIBackend) GetPoolTransactions() (types.Transactions, error) { + pending, err := b.hmy.txPool.Pending() + if err != nil { + return nil, err + } + var txs types.Transactions + for _, batch := range pending { + txs = append(txs, batch...) + } + return txs, nil +} diff --git a/hmy/backend.go b/hmy/backend.go new file mode 100644 index 000000000..b92b6b3c8 --- /dev/null +++ b/hmy/backend.go @@ -0,0 +1,64 @@ +package hmy + +import ( + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" + "github.com/harmony-one/harmony/accounts" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" +) + +// Harmony implements the Harmony full node service. +type Harmony struct { + // Channel for shutting down the service + shutdownChan chan bool // Channel for shutting down the Harmony + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + + blockchain *core.BlockChain + txPool *core.TxPool + accountManager *accounts.Manager + eventMux *event.TypeMux + // DB interfaces + chainDb ethdb.Database // Block chain database + + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + APIBackend *APIBackend + + nodeAPI NodeAPI +} + +// NodeAPI is the list of functions from node used to call rpc apis. +type NodeAPI interface { + AddPendingTransaction(newTx *types.Transaction) + Blockchain() *core.BlockChain + AccountManager() *accounts.Manager +} + +// New creates a new Harmony object (including the +// initialisation of the common Harmony object) +func New(nodeAPI NodeAPI, txPool *core.TxPool, eventMux *event.TypeMux) (*Harmony, error) { + chainDb := nodeAPI.Blockchain().ChainDB() + hmy := &Harmony{ + shutdownChan: make(chan bool), + bloomRequests: make(chan chan *bloombits.Retrieval), + blockchain: nodeAPI.Blockchain(), + txPool: txPool, + accountManager: nodeAPI.AccountManager(), + eventMux: eventMux, + chainDb: chainDb, + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + nodeAPI: nodeAPI, + } + + hmy.APIBackend = &APIBackend{hmy} + + return hmy, nil +} + +// TxPool ... +func (s *Harmony) TxPool() *core.TxPool { return s.txPool } + +// BlockChain ... +func (s *Harmony) BlockChain() *core.BlockChain { return s.blockchain } diff --git a/hmy/bloombits.go b/hmy/bloombits.go new file mode 100644 index 000000000..adb23adb2 --- /dev/null +++ b/hmy/bloombits.go @@ -0,0 +1,138 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package hmy + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bitutil" + "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" +) + +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + bloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + bloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + bloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + bloomRetrievalWait = time.Duration(0) +) + +// startBloomHandlers starts a batch of goroutines to accept bloom bit database +// retrievals from possibly a range of filters and serving the data to satisfy. +func (hmy *Harmony) startBloomHandlers(sectionSize uint64) { + for i := 0; i < bloomServiceThreads; i++ { + go func() { + for { + select { + case <-hmy.shutdownChan: + return + + case request := <-hmy.bloomRequests: + task := <-request + task.Bitsets = make([][]byte, len(task.Sections)) + for i, section := range task.Sections { + head := rawdb.ReadCanonicalHash(hmy.chainDb, (section+1)*sectionSize-1) + if compVector, err := rawdb.ReadBloomBits(hmy.chainDb, task.Bit, section, head); err == nil { + if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { + task.Bitsets[i] = blob + } else { + task.Error = err + } + } else { + task.Error = err + } + } + request <- task + } + } + }() + } +} + +const ( + // bloomThrottling is the time to wait between processing two consecutive index + // sections. It's useful during chain upgrades to prevent disk overload. + bloomThrottling = 100 * time.Millisecond +) + +// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index +// for the Ethereum header bloom filters, permitting blazing fast filtering. +type BloomIndexer struct { + size uint64 // section size to generate bloombits for + db ethdb.Database // database instance to write index data and metadata into + gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index + section uint64 // Section is the section number being processed currently + head common.Hash // Head is the hash of the last header processed +} + +// NewBloomIndexer returns a chain indexer that generates bloom bits data for the +// canonical chain for fast logs filtering. +func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer { + backend := &BloomIndexer{ + db: db, + size: size, + } + table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) + + return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") +} + +// Reset implements core.ChainIndexerBackend, starting a new bloombits index +// section. +func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { + gen, err := bloombits.NewGenerator(uint(b.size)) + b.gen, b.section, b.head = gen, section, common.Hash{} + return err +} + +// Process implements core.ChainIndexerBackend, adding a new header's bloom into +// the index. +func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { + b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) + b.head = header.Hash() + return nil +} + +// Commit implements core.ChainIndexerBackend, finalizing the bloom section and +// writing it out into the database. +func (b *BloomIndexer) Commit() error { + batch := b.db.NewBatch() + for i := 0; i < types.BloomBitLength; i++ { + bits, err := b.gen.Bitset(uint(i)) + if err != nil { + return err + } + rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) + } + return batch.Write() +} diff --git a/internal/hmyapi/README.md b/internal/hmyapi/README.md index a62a4aa79..8e33d8b8c 100644 --- a/internal/hmyapi/README.md +++ b/internal/hmyapi/README.md @@ -42,6 +42,7 @@ * [x] hmy_getTransactionByBlockHashAndIndex - get transaction object of block by block hash and index number * [x] hmy_getTransactionByBlockNumberAndIndex - get transaction object of block by block number and index number * [ ] hmy_sign - sign message using node specific sign method. +* [ ] hmy_pendingTransactions - returns the pending transactions list. ### Contract related * [ ] hmy_call - call contract method @@ -53,14 +54,13 @@ * ~~[ ] hmy_compileSerpent~~ - DEPRECATED ### Subscribes -* [ ] hmy_pendingTransactions - pending transaction subscriber * [ ] hmy_getLogs - log subscriber -* [ ] hmy_newFilter - creates a filter object, based on filter options -* [ ] hmy_newBlockFilter - creates a filter in the node, to notify when a new block arrives -* [ ] hmy_newPendingTransactionFilter - creates a filter in the node, to notify when new pending transactions arrive +* [x] hmy_newFilter - creates a filter object, based on filter options +* [x] hmy_newBlockFilter - creates a filter in the node, to notify when a new block arrives +* [x] hmy_newPendingTransactionFilter - creates a filter in the node, to notify when new pending transactions arrive * [ ] hmy_getFilterChanges - polling method for a filter * [ ] hmy_getFilterLogs - returns an array of all logs matching filter with given id. -* [ ] hmy_uninstallFilter - uninstalls a filter with given id +* [x] hmy_uninstallFilter - uninstalls a filter with given id ### Others, not very important for current stage of work @@ -74,6 +74,10 @@ * [ ] db_getString * [ ] db_putHex * [ ] db_getHex + +### SHH Whisper Protocol +The ``shh`` is for the whisper protocol to communicate p2p and broadcast + * [ ] shh_post * [ ] shh_version * [ ] shh_newIdentity @@ -83,18 +87,4 @@ * [ ] shh_newFilter * [ ] shh_uninstallFilter * [ ] shh_getFilterChanges -* [ ] shh_getMessages - - - - - - - - - - - - - - +* [ ] shh_getMessages \ No newline at end of file diff --git a/internal/hmyapi/backend.go b/internal/hmyapi/backend.go index 0df6594c4..70517e037 100644 --- a/internal/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -1,12 +1,63 @@ package hmyapi import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/accounts" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/state" + "github.com/harmony-one/harmony/core/types" ) +// Backend interface provides the common API services (that are provided by +// both full and light clients) with access to necessary functions. +// implementations: +// * hmy/api_backend.go +type Backend interface { + // General Ethereum API + // Downloader() *downloader.Downloader + ProtocolVersion() int + // SuggestPrice(ctx context.Context) (*big.Int, error) + ChainDb() ethdb.Database + EventMux() *event.TypeMux + AccountManager() *accounts.Manager + // ExtRPCEnabled() bool + // RPCGasCap() *big.Int // global gas cap for eth_call over rpc: DoS protection + + // BlockChain API + // SetHead(number uint64) + HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) + StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) + GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) + GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + // GetTd(blockHash common.Hash) *big.Int + // GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription + + // TxPool API + SendTx(ctx context.Context, signedTx *types.Transaction) error + // GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) + GetPoolTransactions() (types.Transactions, error) + GetPoolTransaction(txHash common.Hash) *types.Transaction + GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) + // Stats() (pending int, queued int) + // TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + + ChainConfig() *params.ChainConfig + CurrentBlock() *types.Block +} + // GetAPIs returns all the APIs. -func GetAPIs(b *core.HmyAPIBackend) []rpc.API { +func GetAPIs(b Backend) []rpc.API { nonceLock := new(AddrLocker) return []rpc.API{ { diff --git a/internal/hmyapi/blockchain.go b/internal/hmyapi/blockchain.go index 9672c6add..820df6097 100644 --- a/internal/hmyapi/blockchain.go +++ b/internal/hmyapi/blockchain.go @@ -6,17 +6,16 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" - "github.com/harmony-one/harmony/core" ) // PublicBlockChainAPI provides an API to access the Harmony blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { - b *core.HmyAPIBackend + b Backend } // NewPublicBlockChainAPI creates a new Harmony blockchain API. -func NewPublicBlockChainAPI(b *core.HmyAPIBackend) *PublicBlockChainAPI { +func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI { return &PublicBlockChainAPI{b} } diff --git a/internal/hmyapi/debug.go b/internal/hmyapi/debug.go index ed7e2c3e0..223a8edff 100644 --- a/internal/hmyapi/debug.go +++ b/internal/hmyapi/debug.go @@ -5,17 +5,16 @@ import ( "errors" "github.com/ethereum/go-ethereum/log" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" ) // DebugAPI Internal JSON RPC for debugging purpose type DebugAPI struct { - b *core.HmyAPIBackend + b Backend } // NewDebugAPI Creates a new DebugAPI instance -func NewDebugAPI(b *core.HmyAPIBackend) *DebugAPI { +func NewDebugAPI(b Backend) *DebugAPI { return &DebugAPI{b} } diff --git a/internal/hmyapi/filters/api.go b/internal/hmyapi/filters/api.go new file mode 100644 index 000000000..7ffec68d2 --- /dev/null +++ b/internal/hmyapi/filters/api.go @@ -0,0 +1,433 @@ +package filters + +import ( + "context" + "fmt" + "sync" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core/types" +) + +var ( + deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline +) + +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ Type + deadline *time.Timer // filter is inactive when deadline triggers + hashes []common.Hash + crit FilterCriteria + logs []*types.Log + s *Subscription // associated subscription in event system +} + +// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various +// information related to the Ethereum protocol such als blocks, transactions and logs. +type PublicFilterAPI struct { + backend Backend + mux *event.TypeMux + quit chan struct{} + chainDb ethdb.Database + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter +} + +// NewPublicFilterAPI returns a new PublicFilterAPI instance. +func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { + api := &PublicFilterAPI{ + backend: backend, + mux: backend.EventMux(), + chainDb: backend.ChainDb(), + events: NewEventSystem(backend.EventMux(), backend, lightMode), + filters: make(map[rpc.ID]*filter), + } + go api.timeoutLoop() + + return api +} + +// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. +// Tt is started when the api is created. +func (api *PublicFilterAPI) timeoutLoop() { + ticker := time.NewTicker(5 * time.Minute) + for { + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + f.s.Unsubscribe() + delete(api.filters, id) + default: + continue + } + } + api.filtersMu.Unlock() + } +} + +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used through the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + var ( + pendingTxs = make(chan []common.Hash) + pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) + ) + + api.filtersMu.Lock() + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case ph := <-pendingTxs: + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph...) + } + api.filtersMu.Unlock() + case <-pendingTxSub.Err(): + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return pendingTxSub.ID +} + +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + txHashes := make(chan []common.Hash, 128) + pendingTxSub := api.events.SubscribePendingTxs(txHashes) + + for { + select { + case hashes := <-txHashes: + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + for _, h := range hashes { + notifier.Notify(rpcSub.ID, h) + } + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe() + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + var ( + headers = make(chan *types.Header) + headerSub = api.events.SubscribeNewHeads(headers) + ) + + api.filtersMu.Lock() + api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case h := <-headers: + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID]; found { + f.hashes = append(f.hashes, h.Hash()) + } + api.filtersMu.Unlock() + case <-headerSub.Err(): + api.filtersMu.Lock() + delete(api.filters, headerSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return headerSub.ID +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + headers := make(chan *types.Header) + headersSub := api.events.SubscribeNewHeads(headers) + + for { + select { + case h := <-headers: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + headersSub.Unsubscribe() + return + case <-notifier.Closed(): + headersSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time it was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + if f, found := api.filters[id]; found { + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case PendingTransactionsSubscription, BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes), nil + case LogsSubscription: + logs := f.logs + f.logs = nil + return returnLogs(logs), nil + } + } + + return []interface{}{}, fmt.Errorf("filter not found") +} + +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes +} + +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []*types.Log) []*types.Log { + if logs == nil { + return []*types.Log{} + } + return logs +} + +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + var ( + rpcSub = notifier.CreateSubscription() + matchedLogs = make(chan []*types.Log) + ) + + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) + if err != nil { + return nil, err + } + + go func() { + + for { + select { + case logs := <-matchedLogs: + for _, log := range logs { + notifier.Notify(rpcSub.ID, &log) + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { + logs := make(chan []*types.Log) + logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs) + if err != nil { + return rpc.ID(""), err + } + + api.filtersMu.Lock() + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case l := <-logs: + api.filtersMu.Lock() + if f, found := api.filters[logsSub.ID]; found { + f.logs = append(f.logs, l...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, logsSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return logsSub.ID, nil +} + +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs +func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { + var filter *Filter + if crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if crit.FromBlock != nil { + begin = crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if crit.ToBlock != nil { + end = crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), err +} + +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) + } + api.filtersMu.Unlock() + if found { + f.s.Unsubscribe() + } + + return found +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found || f.typ != LogsSubscription { + return nil, fmt.Errorf("filter not found") + } + + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil +} diff --git a/internal/hmyapi/filters/filter.go b/internal/hmyapi/filters/filter.go new file mode 100644 index 000000000..ee36badd9 --- /dev/null +++ b/internal/hmyapi/filters/filter.go @@ -0,0 +1,350 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package filters + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/bloombits" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" +) + +// Backend provides the APIs needed for filter +type Backend interface { + ChainDb() ethdb.Database + EventMux() *event.TypeMux + HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) + GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) + GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) + + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription + SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + + BloomStatus() (uint64, uint64) + ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) +} + +// Filter can be used to retrieve and filter logs. +type Filter struct { + backend Backend + + db ethdb.Database + addresses []common.Address + topics [][]common.Hash + + block common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks + + matcher *bloombits.Matcher +} + +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { + // Flatten the address and topic filter clauses into a single bloombits filter + // system. Since the bloombits are not positional, nil topics are permitted, + // which get flattened into a nil byte slice. + var filters [][][]byte + if len(addresses) > 0 { + filter := make([][]byte, len(addresses)) + for i, address := range addresses { + filter[i] = address.Bytes() + } + filters = append(filters, filter) + } + for _, topicList := range topics { + filter := make([][]byte, len(topicList)) + for i, topic := range topicList { + filter[i] = topic.Bytes() + } + filters = append(filters, filter) + } + size, _ := backend.BloomStatus() + + // Create a generic filter and convert it into a range filter + filter := newFilter(backend, addresses, topics) + + filter.matcher = bloombits.NewMatcher(size, filters) + filter.begin = begin + filter.end = end + + return filter +} + +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { + // Create a generic filter and convert it into a block filter + filter := newFilter(backend, addresses, topics) + filter.block = block + return filter +} + +// newFilter creates a generic filter that can either filter based on a block hash, +// or based on range queries. The search criteria needs to be explicitly set. +func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { + return &Filter{ + backend: backend, + addresses: addresses, + topics: topics, + db: backend.ChainDb(), + } +} + +// Logs searches the blockchain for matching log entries, returning all from the +// first block that contains matches, updating the start of the filter accordingly. +func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { + // If we're doing singleton block filtering, execute and return + if f.block != (common.Hash{}) { + header, err := f.backend.HeaderByHash(ctx, f.block) + if err != nil { + return nil, err + } + if header == nil { + return nil, errors.New("unknown block") + } + return f.blockLogs(ctx, header) + } + // Figure out the limits of the filter range + header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + if header == nil { + return nil, nil + } + head := header.Number.Uint64() + + if f.begin == -1 { + f.begin = int64(head) + } + end := uint64(f.end) + if f.end == -1 { + end = head + } + // Gather all indexed logs, and finish with non indexed ones + var ( + logs []*types.Log + err error + ) + size, sections := f.backend.BloomStatus() + if indexed := sections * size; indexed > uint64(f.begin) { + if indexed > end { + logs, err = f.indexedLogs(ctx, end) + } else { + logs, err = f.indexedLogs(ctx, indexed-1) + } + if err != nil { + return logs, err + } + } + rest, err := f.unindexedLogs(ctx, end) + logs = append(logs, rest...) + return logs, err +} + +// indexedLogs returns the logs matching the filter criteria based on the bloom +// bits indexed available locally or via the network. +func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { + // Create a matcher session and request servicing from the backend + matches := make(chan uint64, 64) + + session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) + if err != nil { + return nil, err + } + defer session.Close() + + f.backend.ServiceFilter(ctx, session) + + // Iterate over the matches until exhausted or context closed + var logs []*types.Log + + for { + select { + case number, ok := <-matches: + // Abort if all matches have been fulfilled + if !ok { + err := session.Error() + if err == nil { + f.begin = int64(end) + 1 + } + return logs, err + } + f.begin = int64(number) + 1 + + // Retrieve the suggested block and pull any truly matching logs + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) + if header == nil || err != nil { + return logs, err + } + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + + case <-ctx.Done(): + return logs, ctx.Err() + } + } +} + +// indexedLogs returns the logs matching the filter criteria based on raw block +// iteration and bloom matching. +func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { + var logs []*types.Log + + for ; f.begin <= int64(end); f.begin++ { + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) + if header == nil || err != nil { + return logs, err + } + found, err := f.blockLogs(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + if bloomFilter(header.Bloom, f.addresses, f.topics) { + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// checkMatches checks if the receipts belonging to the given header contain any log events that +// match the filter criteria. This function is called when the bloom filter signals a potential match. +func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + // Get the logs of the block + logsList, err := f.backend.GetLogs(ctx, header.Hash()) + if err != nil { + return nil, err + } + var unfiltered []*types.Log + for _, logs := range logsList { + unfiltered = append(unfiltered, logs...) + } + logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + if len(logs) > 0 { + // We have matching logs, check if we need to resolve full logs via the light client + if logs[0].TxHash == (common.Hash{}) { + receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil, err + } + unfiltered = unfiltered[:0] + for _, receipt := range receipts { + unfiltered = append(unfiltered, receipt.Logs...) + } + logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) + } + return logs, nil + } + return nil, nil +} + +func includes(addresses []common.Address, a common.Address) bool { + for _, addr := range addresses { + if addr == a { + return true + } + } + + return false +} + +// filterLogs creates a slice of logs matching the given criteria. +func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log { + var ret []*types.Log +Logs: + for _, log := range logs { + if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { + continue + } + if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { + continue + } + + if len(addresses) > 0 && !includes(addresses, log.Address) { + continue + } + // If the to filtered topics is greater than the amount of topics in logs, skip. + if len(topics) > len(log.Topics) { + continue Logs + } + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if log.Topics[i] == topic { + match = true + break + } + } + if !match { + continue Logs + } + } + ret = append(ret, log) + } + return ret +} + +func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool { + if len(addresses) > 0 { + var included bool + for _, addr := range addresses { + if types.BloomLookup(bloom, addr) { + included = true + break + } + } + if !included { + return false + } + } + + for _, sub := range topics { + included := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if types.BloomLookup(bloom, topic) { + included = true + break + } + } + if !included { + return false + } + } + return true +} diff --git a/internal/hmyapi/filters/filter_criteria.go b/internal/hmyapi/filters/filter_criteria.go new file mode 100644 index 000000000..2de7b554f --- /dev/null +++ b/internal/hmyapi/filters/filter_criteria.go @@ -0,0 +1,136 @@ +package filters + +import ( + "encoding/json" + "errors" + "fmt" + "math/big" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/rpc" +) + +// FilterCriteria represents a request to create a new filter. +// Same as ethereum.FilterQuery but with UnmarshalJSON() method. +type FilterCriteria ethereum.FilterQuery + +// UnmarshalJSON sets *args fields with given data. +func (args *FilterCriteria) UnmarshalJSON(data []byte) error { + type input struct { + BlockHash *common.Hash `json:"blockHash"` + FromBlock *rpc.BlockNumber `json:"fromBlock"` + ToBlock *rpc.BlockNumber `json:"toBlock"` + Addresses interface{} `json:"address"` + Topics []interface{} `json:"topics"` + } + + var raw input + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + if raw.BlockHash != nil { + if raw.FromBlock != nil || raw.ToBlock != nil { + // BlockHash is mutually exclusive with FromBlock/ToBlock criteria + return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other") + } + args.BlockHash = raw.BlockHash + } else { + if raw.FromBlock != nil { + args.FromBlock = big.NewInt(raw.FromBlock.Int64()) + } + + if raw.ToBlock != nil { + args.ToBlock = big.NewInt(raw.ToBlock.Int64()) + } + } + + args.Addresses = []common.Address{} + + if raw.Addresses != nil { + // raw.Address can contain a single address or an array of addresses + switch rawAddr := raw.Addresses.(type) { + case []interface{}: + for i, addr := range rawAddr { + if strAddr, ok := addr.(string); ok { + addr, err := decodeAddress(strAddr) + if err != nil { + return fmt.Errorf("invalid address at index %d: %v", i, err) + } + args.Addresses = append(args.Addresses, addr) + } else { + return fmt.Errorf("non-string address at index %d", i) + } + } + case string: + addr, err := decodeAddress(rawAddr) + if err != nil { + return fmt.Errorf("invalid address: %v", err) + } + args.Addresses = []common.Address{addr} + default: + return errors.New("invalid addresses in query") + } + } + + // topics is an array consisting of strings and/or arrays of strings. + // JSON null values are converted to common.Hash{} and ignored by the filter manager. + if len(raw.Topics) > 0 { + args.Topics = make([][]common.Hash, len(raw.Topics)) + for i, t := range raw.Topics { + switch topic := t.(type) { + case nil: + // ignore topic when matching logs + + case string: + // match specific topic + top, err := decodeTopic(topic) + if err != nil { + return err + } + args.Topics[i] = []common.Hash{top} + + case []interface{}: + // or case e.g. [null, "topic0", "topic1"] + for _, rawTopic := range topic { + if rawTopic == nil { + // null component, match all + args.Topics[i] = nil + break + } + if topic, ok := rawTopic.(string); ok { + parsed, err := decodeTopic(topic) + if err != nil { + return err + } + args.Topics[i] = append(args.Topics[i], parsed) + } else { + return fmt.Errorf("invalid topic(s)") + } + } + default: + return fmt.Errorf("invalid topic(s)") + } + } + } + + return nil +} + +func decodeAddress(s string) (common.Address, error) { + b, err := hexutil.Decode(s) + if err == nil && len(b) != common.AddressLength { + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength) + } + return common.BytesToAddress(b), err +} + +func decodeTopic(s string) (common.Hash, error) { + b, err := hexutil.Decode(s) + if err == nil && len(b) != common.HashLength { + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength) + } + return common.BytesToHash(b), err +} diff --git a/internal/hmyapi/filters/filter_system.go b/internal/hmyapi/filters/filter_system.go new file mode 100644 index 000000000..470378e9e --- /dev/null +++ b/internal/hmyapi/filters/filter_system.go @@ -0,0 +1,506 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package filters implements an ethereum filtering system for block, +// transactions and log events. +package filters + +import ( + "context" + "fmt" + "sync" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/rawdb" + "github.com/harmony-one/harmony/core/types" +) + +// Type determines the kind of filter and is used to put the filter in to +// the correct bucket when added. +type Type byte + +const ( + // UnknownSubscription indicates an unknown subscription type + UnknownSubscription Type = iota + // LogsSubscription queries for new or removed (chain reorg) logs + LogsSubscription + // PendingLogsSubscription queries for logs in pending blocks + PendingLogsSubscription + // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. + MinedAndPendingLogsSubscription + // PendingTransactionsSubscription queries tx hashes for pending + // transactions entering the pending state + PendingTransactionsSubscription + // BlocksSubscription queries hashes for blocks that are imported + BlocksSubscription + // LastIndexSubscription keeps track of the last index + LastIndexSubscription +) + +const ( + + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // rmLogsChanSize is the size of channel listening to RemovedLogsEvent. + rmLogsChanSize = 10 + // logsChanSize is the size of channel listening to LogsEvent. + logsChanSize = 10 + // chainEvChanSize is the size of channel listening to ChainEvent. + chainEvChanSize = 10 +) + +type subscription struct { + id rpc.ID + typ Type + created time.Time + logsCrit ethereum.FilterQuery + logs chan []*types.Log + hashes chan []common.Hash + headers chan *types.Header + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled +} + +// EventSystem creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria. +type EventSystem struct { + mux *event.TypeMux + backend Backend + lightMode bool + lastHead *types.Header + + // Subscriptions + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event + pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event +} + +// NewEventSystem creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { + m := &EventSystem{ + mux: mux, + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + } + + // Subscribe events + m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh) + m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) + m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) + m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + // TODO(rjl493456442): use feed to subscribe pending log event + m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + + // Make sure none of the subscriptions are empty + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + m.pendingLogSub.Closed() { + log.Crit("Subscribe for event system failed") + } + + go m.eventLoop() + return m +} + +// Subscription is created when the client registers itself for a particular event. +type Subscription struct { + ID rpc.ID + f *subscription + es *EventSystem + unsubOnce sync.Once +} + +// Err returns a channel that is closed when unsubscribed. +func (sub *Subscription) Err() <-chan error { + return sub.f.err +} + +// Unsubscribe uninstalls the subscription from the event broadcast loop. +func (sub *Subscription) Unsubscribe() { + sub.unsubOnce.Do(func() { + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case sub.es.uninstall <- sub.f: + break uninstallLoop + case <-sub.f.logs: + case <-sub.f.hashes: + case <-sub.f.headers: + } + } + + // wait for filter to be uninstalled in work loop before returning + // this ensures that the manager won't use the event channel which + // will probably be closed by the client asap after this method returns. + <-sub.Err() + }) +} + +// subscribe installs the subscription in the event broadcast loop. +func (es *EventSystem) subscribe(sub *subscription) *Subscription { + es.install <- sub + <-sub.installed + return &Subscription{ID: sub.id, f: sub, es: es} +} + +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) { + var from, to rpc.BlockNumber + if crit.FromBlock == nil { + from = rpc.LatestBlockNumber + } else { + from = rpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = rpc.LatestBlockNumber + } else { + to = rpc.BlockNumber(crit.ToBlock.Int64()) + } + + // only interested in pending logs + if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { + return es.subscribePendingLogs(crit, logs), nil + } + // only interested in new mined logs + if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + // only interested in mined logs within a specific block range + if from >= 0 && to >= 0 && to >= from { + return es.subscribeLogs(crit, logs), nil + } + // interested in mined logs from a specific block number, new logs and pending logs + if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { + return es.subscribeMinedPendingLogs(crit, logs), nil + } + // interested in logs from a specific block number to new mined blocks + if from >= 0 && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + return nil, fmt.Errorf("invalid from and to block combination: from > to") +} + +// subscribeMinedPendingLogs creates a subscription that returned mined and +// pending logs that match the given criteria. +func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: MinedAndPendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// subscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// subscribePendingLogs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan []common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribeNewHeads creates a subscription that writes the header of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: make(chan []common.Hash), + headers: headers, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +// SubscribePendingTxs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + hashes: hashes, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + +type filterIndex map[Type]map[rpc.ID]*subscription + +// broadcast event to filters that match criteria. +func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { + if ev == nil { + return + } + + switch e := ev.(type) { + case []*types.Log: + if len(e) > 0 { + for _, f := range filters[LogsSubscription] { + if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + case core.RemovedLogsEvent: + for _, f := range filters[LogsSubscription] { + if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + case *event.TypeMuxEvent: + if muxe, ok := e.Data.(core.PendingLogsEvent); ok { + for _, f := range filters[PendingLogsSubscription] { + if e.Time.After(f.created) { + if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + } + } + case core.NewTxsEvent: + hashes := make([]common.Hash, 0, len(e.Txs)) + for _, tx := range e.Txs { + hashes = append(hashes, tx.Hash()) + } + for _, f := range filters[PendingTransactionsSubscription] { + f.hashes <- hashes + } + case core.ChainEvent: + for _, f := range filters[BlocksSubscription] { + f.headers <- e.Block.Header() + } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } + }) + } + } +} + +func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { + oldh := es.lastHead + es.lastHead = newHeader + if oldh == nil { + return + } + newh := newHeader + // find common ancestor, create list of rolled back and new block hashes + var oldHeaders, newHeaders []*types.Header + for oldh.Hash() != newh.Hash() { + if oldh.Number.Uint64() >= newh.Number.Uint64() { + oldHeaders = append(oldHeaders, oldh) + oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) + } + if oldh.Number.Uint64() < newh.Number.Uint64() { + newHeaders = append(newHeaders, newh) + newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) + if newh == nil { + // happens when CHT syncing, nothing to do + newh = oldh + } + } + } + // roll back old blocks + for _, h := range oldHeaders { + callBack(h, true) + } + // check new blocks (array is in reverse order) + for i := len(newHeaders) - 1; i >= 0; i-- { + callBack(newHeaders[i], false) + } +} + +// filter logs of a single header in light client mode +func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { + if bloomFilter(header.Bloom, addresses, topics) { + // Get the logs of the block + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + logsList, err := es.backend.GetLogs(ctx, header.Hash()) + if err != nil { + return nil + } + var unfiltered []*types.Log + for _, logs := range logsList { + for _, log := range logs { + logcopy := *log + logcopy.Removed = remove + unfiltered = append(unfiltered, &logcopy) + } + } + logs := filterLogs(unfiltered, nil, nil, addresses, topics) + if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) { + // We have matching but non-derived logs + receipts, err := es.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil + } + unfiltered = unfiltered[:0] + for _, receipt := range receipts { + for _, log := range receipt.Logs { + logcopy := *log + logcopy.Removed = remove + unfiltered = append(unfiltered, &logcopy) + } + } + logs = filterLogs(unfiltered, nil, nil, addresses, topics) + } + return logs + } + return nil +} + +// eventLoop (un)installs filters and processes mux events. +func (es *EventSystem) eventLoop() { + // Ensure all subscriptions get cleaned up + defer func() { + es.pendingLogSub.Unsubscribe() + es.txsSub.Unsubscribe() + es.logsSub.Unsubscribe() + es.rmLogsSub.Unsubscribe() + es.chainSub.Unsubscribe() + }() + + index := make(filterIndex) + for i := UnknownSubscription; i < LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*subscription) + } + + for { + select { + // Handle subscribed events + case ev := <-es.txsCh: + es.broadcast(index, ev) + case ev := <-es.logsCh: + es.broadcast(index, ev) + case ev := <-es.rmLogsCh: + es.broadcast(index, ev) + case ev := <-es.chainCh: + es.broadcast(index, ev) + case ev, active := <-es.pendingLogSub.Chan(): + if !active { // system stopped + return + } + es.broadcast(index, ev) + + case f := <-es.install: + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + index[LogsSubscription][f.id] = f + index[PendingLogsSubscription][f.id] = f + } else { + index[f.typ][f.id] = f + } + close(f.installed) + + case f := <-es.uninstall: + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + delete(index[LogsSubscription], f.id) + delete(index[PendingLogsSubscription], f.id) + } else { + delete(index[f.typ], f.id) + } + close(f.err) + + // System stopped + case <-es.txsSub.Err(): + return + case <-es.logsSub.Err(): + return + case <-es.rmLogsSub.Err(): + return + case <-es.chainSub.Err(): + return + } + } +} diff --git a/internal/hmyapi/harmony.go b/internal/hmyapi/harmony.go index f0a890d99..eb9e988a5 100644 --- a/internal/hmyapi/harmony.go +++ b/internal/hmyapi/harmony.go @@ -6,17 +6,16 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/harmony-one/harmony/api/proto" - "github.com/harmony-one/harmony/core" ) // PublicHarmonyAPI provides an API to access Harmony related information. // It offers only methods that operate on public data that is freely available to anyone. type PublicHarmonyAPI struct { - b *core.HmyAPIBackend + b Backend } // NewPublicHarmonyAPI ... -func NewPublicHarmonyAPI(b *core.HmyAPIBackend) *PublicHarmonyAPI { +func NewPublicHarmonyAPI(b Backend) *PublicHarmonyAPI { return &PublicHarmonyAPI{b} } diff --git a/internal/hmyapi/private_account.go b/internal/hmyapi/private_account.go index 5cab19bac..2620cf8f4 100644 --- a/internal/hmyapi/private_account.go +++ b/internal/hmyapi/private_account.go @@ -6,8 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/harmony/accounts" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/hmy" ) // PrivateAccountAPI provides an API to access accounts managed by this node. @@ -16,7 +16,7 @@ import ( type PrivateAccountAPI struct { am *accounts.Manager nonceLock *AddrLocker - b *core.HmyAPIBackend + b *hmy.APIBackend } // NewAccount will create a new account and returns the address for the new account. diff --git a/internal/hmyapi/sendtxargs.go b/internal/hmyapi/sendtxargs.go index c4ca405d1..79bb3b31b 100644 --- a/internal/hmyapi/sendtxargs.go +++ b/internal/hmyapi/sendtxargs.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" ) @@ -28,7 +27,7 @@ type SendTxArgs struct { } // setDefaults is a helper function that fills in default values for unspecified tx fields. -func (args *SendTxArgs) setDefaults(ctx context.Context, b *core.HmyAPIBackend) error { +func (args *SendTxArgs) setDefaults(ctx context.Context, b Backend) error { if args.Gas == nil { args.Gas = new(hexutil.Uint64) *(*uint64)(args.Gas) = 90000 diff --git a/internal/hmyapi/transactionpool.go b/internal/hmyapi/transactionpool.go index c5f9f3106..c42b48170 100644 --- a/internal/hmyapi/transactionpool.go +++ b/internal/hmyapi/transactionpool.go @@ -8,19 +8,18 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/accounts" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" ) // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - b *core.HmyAPIBackend + b Backend nonceLock *AddrLocker } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. -func NewPublicTransactionPoolAPI(b *core.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI { +func NewPublicTransactionPoolAPI(b Backend, nonceLock *AddrLocker) *PublicTransactionPoolAPI { return &PublicTransactionPoolAPI{b, nonceLock} } @@ -184,3 +183,30 @@ func (s *PublicTransactionPoolAPI) GetTransactionReceipt(ctx context.Context, ha } return fields, nil } + +// PendingTransactions returns the transactions that are in the transaction pool +// and have a from address that is one of the accounts this node manages. +func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, error) { + pending, err := s.b.GetPoolTransactions() + if err != nil { + return nil, err + } + accounts := make(map[common.Address]struct{}) + for _, wallet := range s.b.AccountManager().Wallets() { + for _, account := range wallet.Accounts() { + accounts[account.Address] = struct{}{} + } + } + transactions := make([]*RPCTransaction, 0, len(pending)) + for _, tx := range pending { + var signer types.Signer = types.HomesteadSigner{} + if tx.Protected() { + signer = types.NewEIP155Signer(tx.ChainID()) + } + from, _ := types.Sender(signer, tx) + if _, exists := accounts[from]; exists { + transactions = append(transactions, newRPCPendingTransaction(tx)) + } + } + return transactions, nil +} diff --git a/internal/hmyapi/types.go b/internal/hmyapi/types.go index 96c17e2bc..5ce8903a5 100644 --- a/internal/hmyapi/types.go +++ b/internal/hmyapi/types.go @@ -5,6 +5,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/harmony-one/harmony/core/types" ) @@ -69,7 +70,7 @@ type RPCBlock struct { ParentHash common.Hash `json:"parentHash"` Nonce types.BlockNonce `json:"nonce"` MixHash common.Hash `json:"mixHash"` - LogsBloom types.Bloom `json:"logsBloom"` + LogsBloom ethtypes.Bloom `json:"logsBloom"` StateRoot common.Hash `json:"stateRoot"` Miner common.Address `json:"miner"` Difficulty *hexutil.Big `json:"difficulty"` diff --git a/internal/hmyapi/util.go b/internal/hmyapi/util.go index cb1cfde1e..40a0e63b1 100644 --- a/internal/hmyapi/util.go +++ b/internal/hmyapi/util.go @@ -6,12 +6,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" ) // SubmitTransaction is a helper function that submits tx to txPool and logs a message. -func SubmitTransaction(ctx context.Context, b *core.HmyAPIBackend, tx *types.Transaction) (common.Hash, error) { +func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) { if err := b.SendTx(ctx, tx); err != nil { return common.Hash{}, err } diff --git a/node/node.go b/node/node.go index bd2bc5b1f..0d95eb7e1 100644 --- a/node/node.go +++ b/node/node.go @@ -492,3 +492,8 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { return nodeConfig, chanPeer } + +// AccountManager ... +func (node *Node) AccountManager() *accounts.Manager { + return node.accountManager +} diff --git a/node/rpc.go b/node/rpc.go index a5ba77a36..d684f3764 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -6,10 +6,12 @@ import ( "strconv" "strings" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" - "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/hmy" "github.com/harmony-one/harmony/internal/hmyapi" + "github.com/harmony-one/harmony/internal/hmyapi/filters" ) const ( @@ -36,15 +38,16 @@ var ( wsModules = []string{"net", "web3"} wsOrigins = []string{"*"} - apiBackend *core.HmyAPIBackend + harmony *hmy.Harmony ) // StartRPC start RPC service func (node *Node) StartRPC(nodePort string) error { // Gather all the possible APIs to surface - apiBackend = core.NewBackend(node.Blockchain(), node.TxPool, node.accountManager, node) + harmony, _ = hmy.New(node, node.TxPool, new(event.TypeMux)) + + apis := node.APIs() - apis := hmyapi.GetAPIs(apiBackend) for _, service := range node.serviceManager.GetServices() { apis = append(apis, service.APIs()...) } @@ -131,3 +134,20 @@ func (node *Node) stopWS() { wsHandler = nil } } + +// APIs return the collection of RPC services the ethereum package offers. +// NOTE, some of these services probably need to be moved to somewhere else. +func (node *Node) APIs() []rpc.API { + // Gather all the possible APIs to surface + apis := hmyapi.GetAPIs(harmony.APIBackend) + + // Append all the local APIs and return + return append(apis, []rpc.API{ + { + Namespace: "hmy", + Version: "1.0", + Service: filters.NewPublicFilterAPI(harmony.APIBackend, false), + Public: true, + }, + }...) +} diff --git a/scripts/travis_checker.sh b/scripts/travis_checker.sh index 792570d80..16e819e6d 100755 --- a/scripts/travis_checker.sh +++ b/scripts/travis_checker.sh @@ -68,6 +68,7 @@ then then echo "All generated files seem up to date." else + echo "go generate FAILED!" echo "go generate changed working tree contents!" "${progdir}/print_file.sh" "${gogenerate_status_diff}" "git status diff" ok=false diff --git a/test/configs/ten-oneshard.txt b/test/configs/ten-oneshard.txt index cf674290b..af8d1add9 100644 --- a/test/configs/ten-oneshard.txt +++ b/test/configs/ten-oneshard.txt @@ -1,4 +1,4 @@ -127.0.0.1 9000 leader 0 +127.0.0.1 9000 validator 0 127.0.0.1 9001 validator 0 127.0.0.1 9002 validator 0 127.0.0.1 9003 validator 0