Typed cache & Node cleanup. (#4409)

* Channels usage through methods.

* Fix retry count. Removed proposedBlock.

* keysToAddrs rewritten to lrucache.
pull/4412/head
Konstantin 2 years ago committed by GitHub
parent 49e984e268
commit d9bc7a8721
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      api/service/blockproposal/service.go
  2. 24
      consensus/consensus.go
  3. 4
      consensus/consensus_service.go
  4. 4
      consensus/consensus_test.go
  5. 12
      consensus/consensus_v2.go
  6. 5
      consensus/view_change.go
  7. 4
      core/evm.go
  8. 27
      internal/utils/lrucache/lrucache.go
  9. 44
      node/node.go
  10. 28
      node/node_newblock.go
  11. 2
      node/service_setup.go

@ -10,17 +10,15 @@ import (
type Service struct {
stopChan chan struct{}
stoppedChan chan struct{}
readySignal chan consensus.ProposalType
commitSigsChan chan []byte
c *consensus.Consensus
messageChan chan *msg_pb.Message
waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})
waitForConsensusReady func(c *consensus.Consensus, stopChan chan struct{}, stoppedChan chan struct{})
}
// New returns a block proposal service.
func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service {
func New(c *consensus.Consensus, waitForConsensusReady func(c *consensus.Consensus, stopChan chan struct{}, stoppedChan chan struct{})) *Service {
return &Service{
readySignal: readySignal,
commitSigsChan: commitSigsChan,
c: c,
waitForConsensusReady: waitForConsensusReady,
stopChan: make(chan struct{}),
stoppedChan: make(chan struct{}),
@ -34,7 +32,7 @@ func (s *Service) Start() error {
}
func (s *Service) run() {
s.waitForConsensusReady(s.readySignal, s.commitSigsChan, s.stopChan, s.stoppedChan)
s.waitForConsensusReady(s.c, s.stopChan, s.stoppedChan)
}
// Stop stops block proposal service.

@ -87,9 +87,9 @@ type Consensus struct {
// ViewChange struct
vc *viewChange
// Signal channel for proposing a new block and start new consensus
ReadySignal chan ProposalType
readySignal chan ProposalType
// Channel to send full commit signatures to finish new block proposal
CommitSigChannel chan []byte
commitSigChannel chan []byte
// The post-consensus job func passed from Node object
// Called when consensus on a new block is done
PostConsensusJob func(*types.Block) error
@ -139,6 +139,22 @@ func (consensus *Consensus) Blockchain() core.BlockChain {
return consensus.registry.GetBlockchain()
}
func (consensus *Consensus) ReadySignal(p ProposalType) {
consensus.readySignal <- p
}
func (consensus *Consensus) GetReadySignal() chan ProposalType {
return consensus.readySignal
}
func (consensus *Consensus) CommitSigChannel() chan []byte {
return consensus.commitSigChannel
}
func (consensus *Consensus) GetCommitSigChannel() chan []byte {
return consensus.commitSigChannel
}
// VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (consensus *Consensus) verifyBlock(block *types.Block) error {
if !consensus.FBFTLog.IsBlockVerified(block.Hash()) {
@ -274,8 +290,8 @@ func New(
consensus.SetCurBlockViewID(0)
consensus.ShardID = shard
consensus.SlashChan = make(chan slash.Record)
consensus.ReadySignal = make(chan ProposalType)
consensus.CommitSigChannel = make(chan []byte)
consensus.readySignal = make(chan ProposalType)
consensus.commitSigChannel = make(chan []byte)
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.IgnoreViewIDCheck = abool.NewBool(false)

@ -458,10 +458,10 @@ func (consensus *Consensus) updateConsensusInformation() Mode {
if (oldLeader != nil && consensus.LeaderPubKey != nil &&
!consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.isLeader() {
go func() {
consensus.getLogger().Info().
consensus.GetLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal <- SyncProposal
consensus.ReadySignal(SyncProposal)
}()
}
return Normal

@ -66,8 +66,8 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan slash.Record), consensus.SlashChan)
assert.NotNil(t, consensus.SlashChan)
assert.IsType(t, make(chan ProposalType), consensus.ReadySignal)
assert.NotNil(t, consensus.ReadySignal)
assert.IsType(t, make(chan ProposalType), consensus.GetReadySignal())
assert.NotNil(t, consensus.GetReadySignal())
assert.IsType(t, make(chan [vdfAndSeedSize]byte), consensus.RndChannel)
assert.NotNil(t, consensus.RndChannel)

@ -254,13 +254,13 @@ func (consensus *Consensus) finalCommit() {
// No pipelining
go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.ReadySignal <- SyncProposal
consensus.ReadySignal(SyncProposal)
}()
} else {
// pipelining
go func() {
select {
case consensus.CommitSigChannel <- commitSigAndBitmap:
case consensus.GetCommitSigChannel() <- commitSigAndBitmap:
case <-time.After(CommitSigSenderTimeout):
utils.Logger().Error().Err(err).Msg("[finalCommit] channel not received after 6s for commitSigAndBitmap")
}
@ -334,7 +334,7 @@ func (consensus *Consensus) StartChannel() {
consensus.start = true
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.mutex.Unlock()
consensus.ReadySignal <- SyncProposal
consensus.ReadySignal(SyncProposal)
return
}
consensus.mutex.Unlock()
@ -428,7 +428,7 @@ func (consensus *Consensus) BlockChannel(newBlock *types.Block) {
return
}
// Sleep to wait for the full block time
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time")
consensus.GetLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time")
time.AfterFunc(time.Until(consensus.NextBlockDue), func() {
consensus.StartFinalityCount()
consensus.mutex.Lock()
@ -587,7 +587,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
// Send signal to Node to propose the new block for consensus
consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal")
consensus.ReadySignal <- AsyncProposal
consensus.ReadySignal(AsyncProposal)
}()
return nil
@ -761,7 +761,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
if consensus.isLeader() && !consensus.getLeaderPubKey().Object.IsEqual(prev.Object) {
// leader changed
go func() {
consensus.ReadySignal <- SyncProposal
consensus.ReadySignal(SyncProposal)
}()
}
}

@ -422,10 +422,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] startNewView failed")
return
}
go func() {
consensus.ReadySignal <- SyncProposal
}()
go consensus.ReadySignal(SyncProposal)
return
}

@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/internal/params"
@ -39,9 +38,6 @@ import (
// ChainContext supports retrieving headers and consensus parameters from the
// current blockchain to be used during transaction processing.
type ChainContext interface {
// Engine retrieves the chain's consensus engine.
Engine() consensus_engine.Engine
// GetHeader returns the hash corresponding to their hash.
GetHeader(common.Hash, uint64) *block.Header

@ -0,0 +1,27 @@
package lrucache
import lru "github.com/hashicorp/golang-lru"
type Cache[K comparable, V any] struct {
cache *lru.Cache
}
func NewCache[K comparable, V any](size int) *Cache[K, V] {
c, _ := lru.New(size)
return &Cache[K, V]{
cache: c,
}
}
func (c *Cache[K, V]) Get(key K) (V, bool) {
v, ok := c.cache.Get(key)
if !ok {
var out V
return out, false
}
return v.(V), true
}
func (c *Cache[K, V]) Set(key K, value V) {
c.cache.Add(key, value)
}

@ -17,6 +17,7 @@ import (
"github.com/harmony-one/harmony/internal/shardchain/tikv_manage"
"github.com/harmony-one/harmony/internal/tikv"
"github.com/harmony-one/harmony/internal/tikv/redis_helper"
"github.com/harmony-one/harmony/internal/utils/lrucache"
"github.com/ethereum/go-ethereum/rlp"
harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony"
@ -132,8 +133,7 @@ type Node struct {
chainConfig params.ChainConfig
unixTimeAtNodeStart int64
// KeysToAddrs holds the addresses of bls keys run by the node
KeysToAddrs map[string]common.Address
keysToAddrsEpoch *big.Int
keysToAddrs *lrucache.Cache[uint64, map[string]common.Address]
keysToAddrsMutex sync.Mutex
// TransactionErrorSink contains error messages for any failed transaction, in memory only
TransactionErrorSink *types.TransactionErrorSink
@ -141,7 +141,6 @@ type Node struct {
BroadcastInvalidTx bool
// InSync flag indicates the node is in-sync or not
IsSynchronized *abool.AtomicBool
proposedBlock map[uint64]*types.Block
deciderCache *lru.Cache
committeeCache *lru.Cache
@ -1024,6 +1023,7 @@ func New(
TransactionErrorSink: types.NewTransactionErrorSink(),
crosslinks: crosslinks.New(),
syncID: GenerateSyncID(),
keysToAddrs: lrucache.NewCache[uint64, map[string]common.Address](10),
}
if consensusObj == nil {
panic("consensusObj is nil")
@ -1113,7 +1113,6 @@ func New(
node.committeeCache, _ = lru.New(16)
node.pendingCXReceipts = map[string]*types.CXReceiptsProof{}
node.proposedBlock = map[uint64]*types.Block{}
node.Consensus.VerifiedNewBlock = make(chan *types.Block, 1)
// the sequence number is the next block number to be added in consensus protocol, which is
// always one more than current chain header block
@ -1322,10 +1321,6 @@ func (node *Node) ShutDown() {
}
func (node *Node) populateSelfAddresses(epoch *big.Int) {
// reset the self addresses
node.KeysToAddrs = map[string]common.Address{}
node.keysToAddrsEpoch = epoch
shardID := node.Consensus.ShardID
shardState, err := node.Consensus.Blockchain().ReadShardState(epoch)
if err != nil {
@ -1344,7 +1339,7 @@ func (node *Node) populateSelfAddresses(epoch *big.Int) {
Msg("[PopulateSelfAddresses] failed to find shard committee")
return
}
keysToAddrs := map[string]common.Address{}
for _, blskey := range node.Consensus.GetPublicKeys() {
blsStr := blskey.Bytes.Hex()
shardkey := bls.FromLibBLSPublicKeyUnsafe(blskey.Object)
@ -1365,7 +1360,7 @@ func (node *Node) populateSelfAddresses(epoch *big.Int) {
Msg("[PopulateSelfAddresses] could not find address")
return
}
node.KeysToAddrs[blsStr] = *addr
keysToAddrs[blsStr] = *addr
utils.Logger().Debug().
Int64("epoch", epoch.Int64()).
Uint32("shard-id", shardID).
@ -1373,34 +1368,27 @@ func (node *Node) populateSelfAddresses(epoch *big.Int) {
Str("address", common2.MustAddressToBech32(*addr)).
Msg("[PopulateSelfAddresses]")
}
node.keysToAddrs.Set(epoch.Uint64(), keysToAddrs)
}
// GetAddressForBLSKey retrieves the ECDSA address associated with bls key for epoch
func (node *Node) GetAddressForBLSKey(blskey *bls_core.PublicKey, epoch *big.Int) common.Address {
// populate if first time setting or new epoch
node.keysToAddrsMutex.Lock()
defer node.keysToAddrsMutex.Unlock()
if node.keysToAddrsEpoch == nil || epoch.Cmp(node.keysToAddrsEpoch) != 0 {
node.populateSelfAddresses(epoch)
}
blsStr := blskey.SerializeToHexStr()
addr, ok := node.KeysToAddrs[blsStr]
if !ok {
return common.Address{}
}
return addr
return node.GetAddresses(epoch)[blskey.SerializeToHexStr()]
}
// GetAddresses retrieves all ECDSA addresses of the bls keys for epoch
func (node *Node) GetAddresses(epoch *big.Int) map[string]common.Address {
// populate if first time setting or new epoch
// populate if new epoch
if rs, ok := node.keysToAddrs.Get(epoch.Uint64()); ok {
return rs
}
node.keysToAddrsMutex.Lock()
defer node.keysToAddrsMutex.Unlock()
if node.keysToAddrsEpoch == nil || epoch.Cmp(node.keysToAddrsEpoch) != 0 {
node.populateSelfAddresses(epoch)
node.populateSelfAddresses(epoch)
node.keysToAddrsMutex.Unlock()
if rs, ok := node.keysToAddrs.Get(epoch.Uint64()); ok {
return rs
}
// self addresses map can never be nil
return node.KeysToAddrs
return make(map[string]common.Address)
}
// IsRunningBeaconChain returns whether the node is running on beacon chain.

@ -7,7 +7,6 @@ import (
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/bls"
staking "github.com/harmony-one/harmony/staking/types"
@ -27,7 +26,7 @@ const (
// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus.
// only leader will receive the ready signal
func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{}) {
func (node *Node) WaitForConsensusReadyV2(cs *consensus.Consensus, stopChan chan struct{}, stoppedChan chan struct{}) {
go func() {
// Setup stoppedChan
defer close(stoppedChan)
@ -47,12 +46,11 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
utils.Logger().Warn().
Msg("Consensus new block proposal: STOPPED!")
return
case proposalType := <-readySignal:
retryCount := 0
for node.Consensus != nil && node.Consensus.IsLeader() {
case proposalType := <-cs.GetReadySignal():
for retryCount := 0; retryCount < 3 && cs.IsLeader(); retryCount++ {
time.Sleep(SleepPeriod)
utils.Logger().Info().
Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1).
Uint64("blockNum", cs.Blockchain().CurrentBlock().NumberU64()+1).
Bool("asyncProposal", proposalType == consensus.AsyncProposal).
Msg("PROPOSING NEW BLOCK ------------------------------------------------")
@ -71,14 +69,14 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
} else {
utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB")
}
sigs, err := node.Consensus.BlockCommitSigs(node.Blockchain().CurrentBlock().NumberU64())
sigs, err := cs.BlockCommitSigs(cs.Blockchain().CurrentBlock().NumberU64())
if err != nil {
utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block")
} else {
newCommitSigsChan <- sigs
}
case commitSigs := <-commitSigsChan:
case commitSigs := <-cs.CommitSigChannel():
utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously")
if len(commitSigs) > bls.BLSSignatureSizeInBytes {
newCommitSigsChan <- commitSigs
@ -86,12 +84,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
}
}()
newBlock, err := node.ProposeNewBlock(newCommitSigsChan)
if err == nil {
if blk, ok := node.proposedBlock[newBlock.NumberU64()]; ok {
utils.Logger().Info().Uint64("blockNum", newBlock.NumberU64()).Str("blockHash", blk.Hash().Hex()).
Msg("Block with the same number was already proposed, abort.")
}
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
Uint64("epoch", newBlock.Epoch().Uint64()).
@ -102,18 +95,11 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
Msg("=========Successfully Proposed New Block==========")
// Send the new block to Consensus so it can be confirmed.
node.proposedBlock[newBlock.NumberU64()] = newBlock
delete(node.proposedBlock, newBlock.NumberU64()-10)
node.Consensus.BlockChannel(newBlock)
cs.BlockChannel(newBlock)
break
} else {
retryCount++
utils.Logger().Err(err).Int("retryCount", retryCount).
Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!")
if retryCount > 3 {
// break to avoid repeated failures
break
}
continue
}
}

@ -19,7 +19,7 @@ func (node *Node) RegisterValidatorServices() {
// Register new block service.
node.serviceManager.Register(
service.BlockProposal,
blockproposal.New(node.Consensus.ReadySignal, node.Consensus.CommitSigChannel, node.WaitForConsensusReadyV2),
blockproposal.New(node.Consensus, node.WaitForConsensusReadyV2),
)
}

Loading…
Cancel
Save