Merge branch 'main' into traceDB

pull/3799/head
PeekPI 3 years ago committed by GitHub
commit 52ebb89552
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      cmd/harmony/flags.go
  2. 26
      cmd/harmony/main.go
  3. 6
      consensus/consensus.go
  4. 15
      consensus/consensus_service.go
  5. 11
      consensus/consensus_v2.go
  6. 3
      consensus/enums.go
  7. 11
      consensus/validator.go
  8. 13
      consensus/view_change.go
  9. 50
      core/headerchain.go
  10. 2
      go.mod
  11. 3
      hmy/hmy.go
  12. 1
      internal/configs/harmony/harmony.go
  13. 16
      node/api.go
  14. 1
      p2p/discovery/option.go
  15. 4
      p2p/discovery/option_test.go
  16. 4
      rpc/blockchain.go
  17. 1
      rpc/common/types.go

@ -238,6 +238,11 @@ var (
Usage: "run node in offline mode", Usage: "run node in offline mode",
DefValue: defaultConfig.General.IsOffline, DefValue: defaultConfig.General.IsOffline,
} }
isBackupFlag = cli.BoolFlag{
Name: "run.backup",
Usage: "run node in backup mode",
DefValue: defaultConfig.General.IsBackup,
}
dataDirFlag = cli.StringFlag{ dataDirFlag = cli.StringFlag{
Name: "datadir", Name: "datadir",
Usage: "directory of chain database", Usage: "directory of chain database",
@ -353,6 +358,10 @@ func applyGeneralFlags(cmd *cobra.Command, config *harmonyconfig.HarmonyConfig)
if cli.IsFlagChanged(cmd, taraceFlag) { if cli.IsFlagChanged(cmd, taraceFlag) {
config.General.TraceEnable = cli.GetBoolFlagValue(cmd, taraceFlag) config.General.TraceEnable = cli.GetBoolFlagValue(cmd, taraceFlag)
}
if cli.IsFlagChanged(cmd, isBackupFlag) {
config.General.IsBackup = cli.GetBoolFlagValue(cmd, isBackupFlag)
} }
} }

@ -688,15 +688,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
) )
nodeconfig.GetDefaultConfig().DBDir = nodeConfig.DBDir nodeconfig.GetDefaultConfig().DBDir = nodeConfig.DBDir
switch hc.General.NodeType { processNodeType(hc, currentNode, currentConsensus)
case nodeTypeExplorer:
nodeconfig.SetDefaultRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
case nodeTypeValidator:
nodeconfig.SetDefaultRole(nodeconfig.Validator)
currentNode.NodeConfig.SetRole(nodeconfig.Validator)
}
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID)))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(shard.BeaconChainShardID)) currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(shard.BeaconChainShardID))
currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey
@ -725,6 +717,22 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi
return currentNode return currentNode
} }
func processNodeType(hc harmonyconfig.HarmonyConfig, currentNode *node.Node, currentConsensus *consensus.Consensus) {
switch hc.General.NodeType {
case nodeTypeExplorer:
nodeconfig.SetDefaultRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
case nodeTypeValidator:
nodeconfig.SetDefaultRole(nodeconfig.Validator)
currentNode.NodeConfig.SetRole(nodeconfig.Validator)
if hc.General.IsBackup {
currentConsensus.SetIsBackup(true)
}
}
}
func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid uint32) { func setupPrometheusService(node *node.Node, hc harmonyconfig.HarmonyConfig, sid uint32) {
prometheusConfig := prometheus.Config{ prometheusConfig := prometheus.Config{
Enabled: hc.Prometheus.Enabled, Enabled: hc.Prometheus.Enabled,

@ -48,6 +48,8 @@ type Consensus struct {
phase FBFTPhase phase FBFTPhase
// current indicates what state a node is in // current indicates what state a node is in
current State current State
// isBackup declarative the node is in backup mode
isBackup bool
// 2 types of timeouts: normal and viewchange // 2 types of timeouts: normal and viewchange
consensusTimeout map[TimeoutType]*utils.Timeout consensusTimeout map[TimeoutType]*utils.Timeout
// Commits collected from validators. // Commits collected from validators.
@ -185,6 +187,10 @@ func (consensus *Consensus) SetBlockVerifier(verifier VerifyBlockFunc) {
consensus.vc.SetVerifyBlock(consensus.VerifyBlock) consensus.vc.SetVerifyBlock(consensus.VerifyBlock)
} }
func (consensus *Consensus) IsBackup() bool {
return consensus.isBackup
}
// New create a new Consensus record // New create a new Consensus record
func New( func New(
host p2p.Host, shard uint32, leader p2p.Peer, multiBLSPriKey multibls.PrivateKeys, host p2p.Host, shard uint32, leader p2p.Peer, multiBLSPriKey multibls.PrivateKeys,

@ -173,12 +173,25 @@ func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKe
// SetMode sets the mode of consensus // SetMode sets the mode of consensus
func (consensus *Consensus) SetMode(m Mode) { func (consensus *Consensus) SetMode(m Mode) {
if m == Normal && consensus.isBackup {
m = NormalBackup
}
consensus.getLogger().Debug(). consensus.getLogger().Debug().
Str("Mode", m.String()). Str("Mode", m.String()).
Msg("[SetMode]") Msg("[SetMode]")
consensus.current.SetMode(m) consensus.current.SetMode(m)
} }
// SetIsBackup sets the mode of consensus
func (consensus *Consensus) SetIsBackup(isBackup bool) {
consensus.getLogger().Debug().
Bool("IsBackup", isBackup).
Msg("[SetIsBackup]")
consensus.isBackup = isBackup
consensus.current.SetIsBackup(isBackup)
}
// Mode returns the mode of consensus // Mode returns the mode of consensus
func (consensus *Consensus) Mode() Mode { func (consensus *Consensus) Mode() Mode {
return consensus.current.Mode() return consensus.current.Mode()
@ -200,7 +213,7 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
if consensus.IgnoreViewIDCheck.IsSet() { if consensus.IgnoreViewIDCheck.IsSet() {
//in syncing mode, node accepts incoming messages without viewID/leaderKey checking //in syncing mode, node accepts incoming messages without viewID/leaderKey checking
//so only set mode to normal when new node enters consensus and need checking viewID //so only set mode to normal when new node enters consensus and need checking viewID
consensus.current.SetMode(Normal) consensus.SetMode(Normal)
consensus.SetViewIDs(msg.ViewID) consensus.SetViewIDs(msg.ViewID)
if !msg.HasSingleSender() { if !msg.HasSingleSender() {
return errors.New("Leader message can not have multiple sender keys") return errors.New("Leader message can not have multiple sender keys")

@ -92,10 +92,17 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb
return errors.Wrapf(err, "unable to parse consensus msg with type: %s", msg.Type) return errors.Wrapf(err, "unable to parse consensus msg with type: %s", msg.Type)
} }
canHandleViewChange := true
intendedForValidator, intendedForLeader := intendedForValidator, intendedForLeader :=
!consensus.IsLeader(), !consensus.IsLeader(),
consensus.IsLeader() consensus.IsLeader()
// if in backup normal mode, force ignore view change event and leader event.
if consensus.current.Mode() == NormalBackup {
canHandleViewChange = false
intendedForLeader = false
}
// Route message to handler // Route message to handler
switch t := msg.Type; true { switch t := msg.Type; true {
// Handle validator intended messages first // Handle validator intended messages first
@ -113,9 +120,9 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb
consensus.onCommit(fbftMsg) consensus.onCommit(fbftMsg)
// Handle view change messages // Handle view change messages
case t == msg_pb.MessageType_VIEWCHANGE: case t == msg_pb.MessageType_VIEWCHANGE && canHandleViewChange:
consensus.onViewChange(fbftMsg) consensus.onViewChange(fbftMsg)
case t == msg_pb.MessageType_NEWVIEW: case t == msg_pb.MessageType_NEWVIEW && canHandleViewChange:
consensus.onNewView(fbftMsg) consensus.onNewView(fbftMsg)
} }

@ -14,6 +14,8 @@ const (
Syncing Syncing
// Listening .. // Listening ..
Listening Listening
// NormalBackup Backup Node ..
NormalBackup
) )
// FBFTPhase : different phases of consensus // FBFTPhase : different phases of consensus
@ -32,6 +34,7 @@ var (
ViewChanging: "ViewChanging", ViewChanging: "ViewChanging",
Syncing: "Syncing", Syncing: "Syncing",
Listening: "Listening", Listening: "Listening",
NormalBackup: "NormalBackup",
} }
phaseNames = map[FBFTPhase]string{ phaseNames = map[FBFTPhase]string{
FBFTAnnounce: "Announce", FBFTAnnounce: "Announce",

@ -57,9 +57,14 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
return return
} }
consensus.prepare() consensus.prepare()
consensus.switchPhase("Announce", FBFTPrepare)
} }
func (consensus *Consensus) prepare() { func (consensus *Consensus) prepare() {
if consensus.IsBackup() {
return
}
priKeys := consensus.getPriKeysInCommittee() priKeys := consensus.getPriKeysInCommittee()
p2pMsgs := consensus.constructP2pMessages(msg_pb.MessageType_PREPARE, nil, priKeys) p2pMsgs := consensus.constructP2pMessages(msg_pb.MessageType_PREPARE, nil, priKeys)
@ -71,12 +76,14 @@ func (consensus *Consensus) prepare() {
Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). Str("blockHash", hex.EncodeToString(consensus.blockHash[:])).
Msg("[OnAnnounce] Sent Prepare Message!!") Msg("[OnAnnounce] Sent Prepare Message!!")
} }
consensus.switchPhase("Announce", FBFTPrepare)
} }
// sendCommitMessages send out commit messages to leader // sendCommitMessages send out commit messages to leader
func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) { func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) {
if consensus.IsBackup() {
return
}
priKeys := consensus.getPriKeysInCommittee() priKeys := consensus.getPriKeysInCommittee()
// Sign commit signature on the received block and construct the p2p messages // Sign commit signature on the received block and construct the p2p messages

@ -37,7 +37,8 @@ type State struct {
// it is the next view id // it is the next view id
viewChangingID uint64 viewChangingID uint64
viewMux sync.RWMutex viewMux sync.RWMutex
isBackup bool
} }
// Mode return the current node mode // Mode return the current node mode
@ -49,6 +50,10 @@ func (pm *State) Mode() Mode {
// SetMode set the node mode as required // SetMode set the node mode as required
func (pm *State) SetMode(s Mode) { func (pm *State) SetMode(s Mode) {
if s == Normal && pm.isBackup {
s = NormalBackup
}
pm.modeMux.Lock() pm.modeMux.Lock()
defer pm.modeMux.Unlock() defer pm.modeMux.Unlock()
pm.mode = s pm.mode = s
@ -95,6 +100,10 @@ func (pm *State) GetViewChangeDuraion() time.Duration {
return time.Duration(diff * diff * int64(viewChangeDuration)) return time.Duration(diff * diff * int64(viewChangeDuration))
} }
func (pm *State) SetIsBackup(isBackup bool) {
pm.isBackup = isBackup
}
// fallbackNextViewID return the next view ID and duration when there is an exception // fallbackNextViewID return the next view ID and duration when there is an exception
// to calculate the time-based viewId // to calculate the time-based viewId
func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) {
@ -229,7 +238,7 @@ func createTimeout() map[TimeoutType]*utils.Timeout {
// startViewChange start the view change process // startViewChange start the view change process
func (consensus *Consensus) startViewChange() { func (consensus *Consensus) startViewChange() {
if consensus.disableViewChange { if consensus.disableViewChange || consensus.IsBackup() {
return return
} }
consensus.mutex.Lock() consensus.mutex.Lock()

@ -39,9 +39,10 @@ import (
) )
const ( const (
headerCacheLimit = 512 headerCacheLimit = 2048 // with 2s/block, 2048 headers is roughly block produced in 1 hour.
tdCacheLimit = 1024 tdCacheLimit = 1024
numberCacheLimit = 2048 numberCacheLimit = 4096
canonicalCacheLimit = 4096
) )
// HeaderChain implements the basic block header chain logic that is shared by // HeaderChain implements the basic block header chain logic that is shared by
@ -58,9 +59,10 @@ type HeaderChain struct {
currentHeader atomic.Value // Current head of the header chain (may be above the block chain!) currentHeader atomic.Value // Current head of the header chain (may be above the block chain!)
currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time) currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time)
headerCache *lru.Cache // Cache for the most recent block headers headerCache *lru.Cache // Cache for the most recent block headers
tdCache *lru.Cache // Cache for the most recent block total difficulties tdCache *lru.Cache // Cache for the most recent block total difficulties
numberCache *lru.Cache // Cache for the most recent block numbers numberCache *lru.Cache // Cache for the most recent block numbers
canonicalCache *lru.Cache // number -> Hash
procInterrupt func() bool procInterrupt func() bool
@ -76,6 +78,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
headerCache, _ := lru.New(headerCacheLimit) headerCache, _ := lru.New(headerCacheLimit)
tdCache, _ := lru.New(tdCacheLimit) tdCache, _ := lru.New(tdCacheLimit)
numberCache, _ := lru.New(numberCacheLimit) numberCache, _ := lru.New(numberCacheLimit)
canonicalHash, _ := lru.New(canonicalCacheLimit)
// Seed a fast but crypto originating random generator // Seed a fast but crypto originating random generator
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
@ -84,14 +87,15 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
} }
hc := &HeaderChain{ hc := &HeaderChain{
config: config, config: config,
chainDb: chainDb, chainDb: chainDb,
headerCache: headerCache, headerCache: headerCache,
tdCache: tdCache, tdCache: tdCache,
numberCache: numberCache, numberCache: numberCache,
procInterrupt: procInterrupt, canonicalCache: canonicalHash,
rand: mrand.New(mrand.NewSource(seed.Int64())), procInterrupt: procInterrupt,
engine: engine, rand: mrand.New(mrand.NewSource(seed.Int64())),
engine: engine,
} }
hc.genesisHeader = hc.GetHeaderByNumber(0) hc.genesisHeader = hc.GetHeaderByNumber(0)
@ -191,6 +195,8 @@ func (hc *HeaderChain) WriteHeader(header *block.Header) (status WriteStatus, er
hc.headerCache.Add(hash, header) hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number) hc.numberCache.Add(hash, number)
// when writing headers, it will write to canonical by default
hc.canonicalCache.Add(number, hash)
return return
} }
@ -434,13 +440,26 @@ func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool {
// GetHeaderByNumber retrieves a block header from the database by number, // GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found. // caching it (associated with its hash) if found.
func (hc *HeaderChain) GetHeaderByNumber(number uint64) *block.Header { func (hc *HeaderChain) GetHeaderByNumber(number uint64) *block.Header {
hash := rawdb.ReadCanonicalHash(hc.chainDb, number) hash := hc.getHashByNumber(number)
if hash == (common.Hash{}) { if hash == (common.Hash{}) {
return nil return nil
} }
return hc.GetHeader(hash, number) return hc.GetHeader(hash, number)
} }
func (hc *HeaderChain) getHashByNumber(number uint64) common.Hash {
// Since canonical chain is immutable, it's safe to read header
// hash by number from cache.
if hash, ok := hc.canonicalCache.Get(number); ok {
return hash.(common.Hash)
}
hash := rawdb.ReadCanonicalHash(hc.chainDb, number)
if hash != (common.Hash{}) {
hc.canonicalCache.Add(number, hash)
}
return hash
}
// CurrentHeader retrieves the current head header of the canonical chain. The // CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache. // header is retrieved from the HeaderChain's internal cache.
func (hc *HeaderChain) CurrentHeader() *block.Header { func (hc *HeaderChain) CurrentHeader() *block.Header {
@ -505,6 +524,7 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) error {
hc.headerCache.Purge() hc.headerCache.Purge()
hc.tdCache.Purge() hc.tdCache.Purge()
hc.numberCache.Purge() hc.numberCache.Purge()
hc.canonicalCache.Purge()
if hc.CurrentHeader() == nil { if hc.CurrentHeader() == nil {
hc.currentHeader.Store(hc.genesisHeader) hc.currentHeader.Store(hc.genesisHeader)

@ -34,7 +34,7 @@ require (
github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.12.1 github.com/libp2p/go-libp2p-kad-dht v0.12.2
github.com/libp2p/go-libp2p-pubsub v0.4.1 github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multiaddr-dns v0.3.1 github.com/multiformats/go-multiaddr-dns v0.3.1

@ -102,6 +102,8 @@ type NodeAPI interface {
ListBlockedPeer() []peer.ID ListBlockedPeer() []peer.ID
GetConsensusInternal() commonRPC.ConsensusInternal GetConsensusInternal() commonRPC.ConsensusInternal
IsBackup() bool
SetNodeBackupMode(isBackup bool) bool
// debug API // debug API
GetConsensusMode() string GetConsensusMode() string
@ -206,6 +208,7 @@ func (hmy *Harmony) GetNodeMetadata() commonRPC.NodeMetadata {
Role: cfg.Role().String(), Role: cfg.Role().String(),
DNSZone: cfg.DNSZone, DNSZone: cfg.DNSZone,
Archival: cfg.GetArchival(), Archival: cfg.GetArchival(),
IsBackup: hmy.NodeAPI.IsBackup(),
NodeBootTime: hmy.NodeAPI.GetNodeBootTime(), NodeBootTime: hmy.NodeAPI.GetNodeBootTime(),
PeerID: nodeconfig.GetPeerID(), PeerID: nodeconfig.GetPeerID(),
Consensus: consensusInternal, Consensus: consensusInternal,

@ -56,6 +56,7 @@ type GeneralConfig struct {
NoStaking bool NoStaking bool
ShardID int ShardID int
IsArchival bool IsArchival bool
IsBackup bool
IsBeaconArchival bool IsBeaconArchival bool
IsOffline bool IsOffline bool
DataDir string DataDir string

@ -143,6 +143,22 @@ func (node *Node) GetConsensusInternal() rpc_common.ConsensusInternal {
} }
} }
// IsBackup returns the node is in backup mode
func (node *Node) IsBackup() bool {
return node.Consensus.IsBackup()
}
// SetNodeBackupMode change node backup mode
func (node *Node) SetNodeBackupMode(isBackup bool) bool {
if node.Consensus.IsBackup() == isBackup {
return false
}
node.Consensus.SetIsBackup(isBackup)
node.Consensus.ResetViewChangeState()
return true
}
func (node *Node) GetConfig() rpc_common.Config { func (node *Node) GetConfig() rpc_common.Config {
return rpc_common.Config{ return rpc_common.Config{
HarmonyConfig: *node.HarmonyConfig, HarmonyConfig: *node.HarmonyConfig,

@ -33,6 +33,7 @@ func (opt DHTConfig) getLibp2pRawOptions() ([]libp2p_dht.Option, error) {
opts = append(opts, dsOption) opts = append(opts, dsOption)
} }
opts = append(opts, libp2p_dht.Concurrency(1))
return opts, nil return opts, nil
} }

@ -40,14 +40,14 @@ func TestDHTOption_getLibp2pRawOptions(t *testing.T) {
opt: DHTConfig{ opt: DHTConfig{
BootNodes: testAddrStr, BootNodes: testAddrStr,
}, },
expLen: 1, expLen: 2,
}, },
{ {
opt: DHTConfig{ opt: DHTConfig{
BootNodes: testAddrStr, BootNodes: testAddrStr,
DataStoreFile: &validPath, DataStoreFile: &validPath,
}, },
expLen: 2, expLen: 3,
}, },
{ {
opt: DHTConfig{ opt: DHTConfig{

@ -838,6 +838,10 @@ func isBlockGreaterThanLatest(hmy *hmy.Harmony, blockNum rpc.BlockNumber) bool {
return uint64(blockNum) > hmy.CurrentBlock().NumberU64() return uint64(blockNum) > hmy.CurrentBlock().NumberU64()
} }
func (s *PublicBlockchainService) SetNodeToBackupMode(ctx context.Context, isBackup bool) (bool, error) {
return s.hmy.NodeAPI.SetNodeBackupMode(isBackup), nil
}
func combineCacheKey(number uint64, version Version, blockArgs *rpc_common.BlockArgs) string { func combineCacheKey(number uint64, version Version, blockArgs *rpc_common.BlockArgs) string {
// no need format blockArgs.Signers[] as a part of cache key // no need format blockArgs.Signers[] as a part of cache key
// because it's not input from rpc caller, it's caculate with blockArgs.WithSigners // because it's not input from rpc caller, it's caculate with blockArgs.WithSigners

@ -64,6 +64,7 @@ type NodeMetadata struct {
Role string `json:"role"` Role string `json:"role"`
DNSZone string `json:"dns-zone"` DNSZone string `json:"dns-zone"`
Archival bool `json:"is-archival"` Archival bool `json:"is-archival"`
IsBackup bool `json:"is-backup"`
NodeBootTime int64 `json:"node-unix-start-time"` NodeBootTime int64 `json:"node-unix-start-time"`
PeerID peer.ID `json:"peerid"` PeerID peer.ID `json:"peerid"`
Consensus ConsensusInternal `json:"consensus"` Consensus ConsensusInternal `json:"consensus"`

Loading…
Cancel
Save