Factor out quorum logic from consensus, undo prior consolidation of values under core (#1717)

* [consensus] Factor out enums to core/values, begin factor out of consensus mechanisms

* [consensus] Make Mechanism explicit

* [consensus] Add ViewChange to QuorumPhase

* Update core/values/consensus.go

Co-Authored-By: Eugene Kim <ek@harmony.one>

* Update core/values/consensus.go

Co-Authored-By: Eugene Kim <ek@harmony.one>

* [mainnet-release] Address code comments

* [staking][consensus][project] Remove txgen, factor out consensus

* [consensus] Factor out PublicKeys

* [txgen] Bring back txgen

* [project] Undo prior consolidation of error values under core

* [consensus] Update tests using quorum decider

* [consensus] Fix overlooked resets during refactor

* [consensus] Fix wrong check of quorum phase

* [consensus] Address leftover TODO for prepare count

* [consensus] Simplfy reset switch

* [consensus] Fix mistake of wrong ReadSignature in ViewChange, need sender, not node PubKey
pull/1731/head
Edgar Aroutiounian 5 years ago committed by GitHub
parent 8b5a3235fd
commit 9f00923ac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      cmd/client/txgen/main.go
  2. 6
      cmd/harmony/main.go
  3. 47
      consensus/README.md
  4. 90
      consensus/consensus.go
  5. 5
      consensus/consensus_leader_msg.go
  6. 23
      consensus/consensus_leader_msg_test.go
  7. 114
      consensus/consensus_service.go
  8. 20
      consensus/consensus_service_test.go
  9. 7
      consensus/consensus_test.go
  10. 301
      consensus/consensus_v2.go
  11. 15
      consensus/consensus_validator_msg_test.go
  12. 21
      consensus/consensus_viewchange_msg.go
  13. 55
      consensus/enums.go
  14. 128
      consensus/fbft_log.go
  15. 23
      consensus/fbft_log_test.go
  16. 235
      consensus/quorum/quorum.go
  17. 248
      consensus/view_change.go
  18. 6
      core/block_validator.go
  19. 4
      core/blockchain.go
  20. 40
      core/error.go
  21. 4
      core/gaspool.go
  22. 5
      core/state_transition.go
  23. 65
      core/tx_pool.go
  24. 21
      core/tx_pool_test.go
  25. 3
      core/types/block.go
  26. 79
      core/values/error.go
  27. 4
      internal/chain/engine.go
  28. 2
      internal/configs/node/config.go
  29. 10
      internal/hmyapi/transactionpool.go
  30. 4
      internal/utils/bytes.go
  31. 2
      internal/utils/singleton.go
  32. 4
      node/node.go
  33. 31
      node/node.md
  34. 37
      node/node_explorer.go
  35. 4
      node/node_genesis.go
  36. 15
      node/node_handler_test.go
  37. 28
      node/node_test.go
  38. 0
      numeric/decimal.go
  39. 0
      numeric/decimal_test.go
  40. 2
      shard/shard_state.go
  41. 14
      specs/test/testplan.md
  42. 2
      staking/types/commission.go
  43. 22
      staking/types/delegation.go
  44. 16
      staking/types/messages.go
  45. 8
      staking/types/sign.go
  46. 7
      staking/types/transaction.go
  47. 2
      staking/types/validator.go

@ -11,6 +11,7 @@ import (
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/shardchain"
@ -98,23 +99,25 @@ func setUpTXGen() *node.Node {
fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil, decider)
chainDBFactory := &shardchain.MemDBFactory{}
txGen := node.New(myhost, consensusObj, chainDBFactory, false) //Changed it : no longer archival node.
txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID))
consensusObj.ChainReader = txGen.Blockchain()
consensusObj.PublicKeys = nil
genesisShardingConfig := core.ShardingSchedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch))
startIdx := 0
endIdx := startIdx + genesisShardingConfig.NumNodesPerShard()
pubs := []*bls2.PublicKey{}
for _, acct := range genesis.HarmonyAccounts[startIdx:endIdx] {
pub := &bls2.PublicKey{}
if err := pub.DeserializeHexStr(acct.BlsPublicKey); err != nil {
fmt.Printf("Can not deserialize public key. err: %v", err)
os.Exit(1)
}
consensusObj.PublicKeys = append(consensusObj.PublicKeys, pub)
pubs = append(pubs, pub)
}
consensusObj.Decider.UpdateParticipants(pubs)
txGen.NodeConfig.SetRole(nodeconfig.ClientNode)
if shardID == 0 {
txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon)

@ -19,6 +19,7 @@ import (
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/blsgen"
"github.com/harmony-one/harmony/internal/common"
@ -289,7 +290,10 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// Consensus object.
// TODO: consensus object shouldn't start here
// TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later.
currentConsensus, err := consensus.New(myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
currentConsensus, err := consensus.New(
myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey, decider,
)
currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address)
if err != nil {

@ -13,38 +13,45 @@ and scalable to traditional PBFT. For brevity, we will still call the whole proc
To reach the consensus of the next block, there are 3 phases: announce(i.e. pre-prepare in PBFT), prepare and commit.
* Announce(leader): The leader broadcasts ANNOUNCE message along with candidate of the next block.
* Prepare(validator): The validator will validate the block sent by leader and send PREPARE message; if the block is invalid, the validator will propose view change. If the prepare timeout, the validator will also propose view change.
* Prepared(leader): The leader will collect 2f+1 PREPARE message including itself and broadcast PREPARED message with the aggregated signature
* Commit(validator): The validator will check the validity of aggregated signature (# of signatures >= 2f+1) and send COMMIT message; if the commit timeout, the validator will also propose view change.
* Committed(leader): The leader will collect 2f+1 COMMIT message including itself and broadcast COMMITTED message with the aggregated signature
* Finalize(leader and validators): Both the leader and validators will finalize the block into blockchain together with 2f+1 aggregated signatures.
- Announce(leader): The leader broadcasts ANNOUNCE message along with candidate of the next block.
- Prepare(validator): The validator will validate the block sent by leader and send PREPARE message;
if the block is invalid, the validator will propose view change. If the prepare timeout, the validator will also propose view change.
- Prepared(leader): The leader will collect 2f+1 PREPARE message including itself and broadcast PREPARED message with the aggregated signature
- Commit(validator): The validator will check the validity of aggregated signature (# of signatures >= 2f+1) and
send COMMIT message; if the commit timeout, the validator will also propose view change.
- Committed(leader): The leader will collect 2f+1 COMMIT message including itself and broadcast COMMITTED message with the aggregated signature
- Finalize(leader and validators): Both the leader and validators will finalize the block into blockchain together with 2f+1 aggregated signatures.
### View changing mode
* ViewChange(validator): whenever a validator receives invalid block/signature from the leader, it should send VIEWCHANGE message with view v+1 together with its own prepared message(>=2f+1 aggregated prepare signatures) from previous views.
* NewView(new leader): when the new leader (uniquely determined) collect enough (2f+1) view change messages, it broadcasts the NEWVIEW message with aggregated VIEWCHANGE signatures.
* During the view changing process, if the new leader not send NEWVIEW message on time, the validator will propose ViewChange for the next view v+2 and so on...
- ViewChange(validator): whenever a validator receives invalid block/signature from the leader,
it should send VIEWCHANGE message with view v+1 together with its own prepared message(>=2f+1 aggregated prepare signatures) from previous views.
- NewView(new leader): when the new leader (uniquely determined) collect enough (2f+1) view change
messages, it broadcasts the NEWVIEW message with aggregated VIEWCHANGE signatures.
- During the view changing process, if the new leader not send NEWVIEW message on time, the
validator will propose ViewChange for the next view v+2 and so on...
## State Machine
The whole process of PBFT can be described as a state machine. We don't separate the roles of leader and validators, instead we use PbftState structure to describe the role and phase of a given node who is joining the consensus process. When a node receives a new message from its peer, its state will be updated. i.e. pbft_state --(upon receive new PbftMessage)--> new_pbft_state. Thus the most nature and clear way is to describe the whole process as state machine.
The whole process of PBFT can be described as a state machine. We don't separate the roles of leader
and validators, instead we use PBFTState structure to describe the role and phase of a given node
who is joining the consensus process. When a node receives a new message from its peer, its state will be updated. i.e. pbft_state --(upon
receive new PBFTMessage)-->
new_pbft_state. Thus the most nature and clear way is to describe the whole process as state machine.
```
// PbftState holds the state of a node in PBFT process
type PbftState struct {
```golang
// PBFTState holds the state of a node in PBFT process
type PBFTState struct {
IsLeader bool
phase PbftPhase // Announce, Prepare(d), Commit(ted)
phase PBFTPhase // Announce, Prepare(d), Commit(ted)
...
}
// PbftLog stores the data in PBFT process, it will be used in different phases in order to determine whether a new PbftMessage is valid or not.
type PbftLog struct {
// PBFTLog stores the data in PBFT process, it will be used in different phases in order to determine whether a new PBFTMessage is valid or not.
type PBFTLog struct {
blocks []*types.Block
messages []*PbftMessage
messages []*PBFTMessage
}
// entry point and main loop;
@ -83,6 +90,4 @@ func (consensus *Consensus) Start(stopChan chan struct{}, stoppedChan chan struc
}
}
```

@ -1,5 +1,4 @@
// Package consensus implements the Cosi PBFT consensus
package consensus // consensus
package consensus
import (
"fmt"
@ -9,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
@ -24,17 +24,20 @@ const (
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
// PbftLog stores the pbft messages and blocks during PBFT process
PbftLog *PbftLog
// phase: different phase of PBFT protocol: pre-prepare, prepare, commit, finish etc
phase PbftPhase
// mode: indicate a node is in normal or viewchanging mode
mode PbftMode
Decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
FBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
phase FBFTPhase
// current indicates what state a node is in
current State
// epoch: current epoch number
epoch uint64
// blockNum: the next blockNumber that PBFT is going to agree on, should be equal to the blockNumber of next block
// blockNum: the next blockNumber that FBFT is going to agree on,
// should be equal to the blockNumber of next block
blockNum uint64
// channel to receive consensus message
MsgChan chan []byte
@ -49,17 +52,17 @@ type Consensus struct {
consensusTimeout map[TimeoutType]*utils.Timeout
// Commits collected from validators.
prepareSigs map[string]*bls.Sign // key is the bls public key
commitSigs map[string]*bls.Sign // key is the bls public key
aggregatedPrepareSig *bls.Sign
aggregatedCommitSig *bls.Sign
prepareBitmap *bls_cosi.Mask
commitBitmap *bls_cosi.Mask
// Commits collected from view change
bhpSigs map[string]*bls.Sign // bhpSigs: blockHashPreparedSigs is the signature on m1 type message
nilSigs map[string]*bls.Sign // nilSigs: there is no prepared message when view change, it's signature on m2 type (i.e. nil) messages
viewIDSigs map[string]*bls.Sign // viewIDSigs: every validator sign on |viewID|blockHash| in view changing message
// bhpSigs: blockHashPreparedSigs is the signature on m1 type message
bhpSigs map[string]*bls.Sign
// nilSigs: there is no prepared message when view change,
// it's signature on m2 type (i.e. nil) messages
nilSigs map[string]*bls.Sign
bhpBitmap *bls_cosi.Mask
nilBitmap *bls_cosi.Mask
viewIDBitmap *bls_cosi.Mask
@ -79,8 +82,6 @@ type Consensus struct {
// Leader's address
leader p2p.Peer
// Public keys of the committee including leader and validators
PublicKeys []*bls.PublicKey
CommitteePublicKeys map[string]bool
pubKeyLock sync.Mutex
@ -131,9 +132,11 @@ type Consensus struct {
// will trigger state syncing when blockNum is low
blockNumLowChan chan struct{}
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap.
// Channel for DRG protocol to send pRnd (preimage of randomness resulting from combined vrf
// randomnesses) to consensus. The first 32 bytes are randomness, the rest is for bitmap.
PRndChannel chan []byte
// Channel for DRG protocol to send VDF. The first 516 bytes are the VDF/Proof and the last 32 bytes are the seed for deriving VDF
// Channel for DRG protocol to send VDF. The first 516 bytes are the VDF/Proof and the last 32
// bytes are the seed for deriving VDF
RndChannel chan [vdfAndSeedSize]byte
pendingRnds [][vdfAndSeedSize]byte // A list of pending randomness
@ -188,25 +191,9 @@ func (consensus *Consensus) WaitForSyncing() {
<-consensus.blockNumLowChan
}
// Quorum returns the consensus quorum of the current committee (2f+1).
func (consensus *Consensus) Quorum() int {
return len(consensus.PublicKeys)*2/3 + 1
}
// PreviousQuorum returns the quorum size of previous epoch
func (consensus *Consensus) PreviousQuorum() int {
return consensus.numPrevPubKeys*2/3 + 1
}
// VdfSeedSize returns the number of VRFs for VDF computation
func (consensus *Consensus) VdfSeedSize() int {
return len(consensus.PublicKeys) * 2 / 3
}
// RewardThreshold returns the threshold to stop accepting commit messages
// when leader receives enough signatures for block reward
func (consensus *Consensus) RewardThreshold() int {
return len(consensus.PublicKeys) * 9 / 10
return int(consensus.Decider.ParticipantsCount()) * 2 / 3
}
// GetBlockReward returns last node block reward
@ -214,37 +201,42 @@ func (consensus *Consensus) GetBlockReward() *big.Int {
return consensus.lastBlockReward
}
// New creates a new Consensus object
// TODO: put shardId into chain reader's chain config
func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKey) (*Consensus, error) {
// New create a new Consensus record
func New(
host p2p.Host, shard uint32, leader p2p.Peer, blsPriKey *bls.SecretKey,
Decider quorum.Decider,
) (*Consensus, error) {
consensus := Consensus{}
consensus.Decider = Decider
consensus.host = host
consensus.msgSender = NewMessageSender(host)
consensus.blockNumLowChan = make(chan struct{})
// pbft related
consensus.PbftLog = NewPbftLog()
consensus.phase = Announce
consensus.mode = PbftMode{mode: Normal}
// pbft timeout
// FBFT related
consensus.FBFTLog = NewFBFTLog()
consensus.phase = FBFTAnnounce
consensus.current = State{mode: Normal}
// FBFT timeout
consensus.consensusTimeout = createTimeout()
consensus.prepareSigs = map[string]*bls.Sign{}
consensus.commitSigs = map[string]*bls.Sign{}
consensus.CommitteePublicKeys = make(map[string]bool)
consensus.validators.Store(leader.ConsensusPubKey.SerializeToHexStr(), leader)
if blsPriKey != nil {
consensus.priKey = blsPriKey
consensus.PubKey = blsPriKey.GetPublicKey()
utils.Logger().Info().Str("publicKey", consensus.PubKey.SerializeToHexStr()).Msg("My Public Key")
utils.Logger().Info().
Str("publicKey", consensus.PubKey.SerializeToHexStr()).Msg("My Public Key")
} else {
utils.Logger().Error().Msg("the bls key is nil")
return nil, fmt.Errorf("nil bls key, aborting")
}
// viewID has to be initialized as the height of the blockchain during initialization
// as it was displayed on explorer as Height right now
// viewID has to be initialized as the height of
// the blockchain during initialization as it was
// displayed on explorer as Height right now
consensus.viewID = 0
consensus.ShardID = ShardID
consensus.ShardID = shard
consensus.MsgChan = make(chan []byte)
consensus.syncReadyChan = make(chan struct{})
consensus.syncNotReadyChan = make(chan struct{})
@ -254,6 +246,6 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
memprofiling.GetMemProfiling().Add("consensus.pbftLog", consensus.PbftLog)
memprofiling.GetMemProfiling().Add("consensus.FBFTLog", consensus.FBFTLog)
return &consensus, nil
}

@ -6,6 +6,7 @@ import (
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
)
@ -49,7 +50,7 @@ func (consensus *Consensus) constructPreparedMessage() ([]byte, *bls.Sign) {
buffer := bytes.NewBuffer([]byte{})
// 96 bytes aggregated signature
aggSig := bls_cosi.AggregateSig(consensus.GetPrepareSigsArray())
aggSig := bls_cosi.AggregateSig(consensus.Decider.ReadAllSignatures(quorum.Prepare))
buffer.Write(aggSig.Serialize())
// Bitmap
@ -82,7 +83,7 @@ func (consensus *Consensus) constructCommittedMessage() ([]byte, *bls.Sign) {
buffer := bytes.NewBuffer([]byte{})
// 96 bytes aggregated signature
aggSig := bls_cosi.AggregateSig(consensus.GetCommitSigsArray())
aggSig := bls_cosi.AggregateSig(consensus.Decider.ReadAllSignatures(quorum.Commit))
buffer.Write(aggSig.Serialize())
// Bitmap

@ -3,12 +3,13 @@ package consensus
import (
"testing"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
@ -21,9 +22,12 @@ func TestConstructAnnounceMessage(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
test.Fatalf("Cannot craeate consensus: %v", err)
test.Fatalf("Cannot create consensus: %v", err)
}
consensus.blockHash = [32]byte{}
@ -51,7 +55,10 @@ func TestConstructPreparedMessage(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
test.Fatalf("Cannot craeate consensus: %v", err)
}
@ -59,8 +66,8 @@ func TestConstructPreparedMessage(test *testing.T) {
consensus.blockHash = [32]byte{}
message := "test string"
consensus.prepareSigs[leaderPubKey.SerializeToHexStr()] = leaderPriKey.Sign(message)
consensus.prepareSigs[validatorPubKey.SerializeToHexStr()] = validatorPriKey.Sign(message)
consensus.Decider.AddSignature(quorum.Prepare, leaderPubKey, leaderPriKey.Sign(message))
consensus.Decider.AddSignature(quorum.Prepare, validatorPubKey, validatorPriKey.Sign(message))
// According to RJ these failures are benign.
if err := consensus.prepareBitmap.SetKey(leaderPubKey, true); err != nil {
test.Log(ctxerror.New("prepareBitmap.SetKey").WithCause(err))

@ -8,26 +8,23 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
"github.com/rs/zerolog"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/block"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/hash"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/profiler"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
"github.com/rs/zerolog"
)
// WaitForNewRandomness listens to the RndChannel to receive new VDF randomness.
@ -120,34 +117,30 @@ func (consensus *Consensus) GetViewID() uint64 {
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
func (consensus *Consensus) DebugPrintPublicKeys() {
var keys []string
for _, k := range consensus.PublicKeys {
keys = append(keys, hex.EncodeToString(k.Serialize()))
}
keys := consensus.Decider.DumpParticipants()
utils.Logger().Debug().Strs("PublicKeys", keys).Int("count", len(keys)).Msgf("Debug Public Keys")
}
// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex
func (consensus *Consensus) UpdatePublicKeys(pubKeys []*bls.PublicKey) int {
func (consensus *Consensus) UpdatePublicKeys(pubKeys []*bls.PublicKey) int64 {
consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...)
consensus.Decider.UpdateParticipants(pubKeys)
consensus.CommitteePublicKeys = map[string]bool{}
utils.Logger().Info().Msg("My Committee updated")
for i, pubKey := range consensus.PublicKeys {
utils.Logger().Info().Int("index", i).Str("BlsPubKey", pubKey.SerializeToHexStr()).Msg("Member")
consensus.CommitteePublicKeys[pubKey.SerializeToHexStr()] = true
for i, pubKey := range consensus.Decider.DumpParticipants() {
utils.Logger().Info().Int("index", i).Str("BlsPubKey", pubKey).Msg("Member")
consensus.CommitteePublicKeys[pubKey] = true
}
// TODO: use pubkey to identify leader rather than p2p.Peer.
consensus.leader = p2p.Peer{ConsensusPubKey: pubKeys[0]}
consensus.LeaderPubKey = pubKeys[0]
utils.Logger().Info().Str("info", consensus.LeaderPubKey.SerializeToHexStr()).Msg("My Leader")
utils.Logger().Info().
Str("info", consensus.LeaderPubKey.SerializeToHexStr()).Msg("My Leader")
consensus.pubKeyLock.Unlock()
// reset states after update public keys
consensus.ResetState()
consensus.ResetViewChangeState()
return len(consensus.PublicKeys)
return consensus.Decider.ParticipantsCount()
}
// NewFaker returns a faker consensus.
@ -191,24 +184,6 @@ func (consensus *Consensus) GetValidatorPeers() []p2p.Peer {
return validatorPeers
}
// GetPrepareSigsArray returns the signatures for prepare as a array
func (consensus *Consensus) GetPrepareSigsArray() []*bls.Sign {
sigs := []*bls.Sign{}
for _, sig := range consensus.prepareSigs {
sigs = append(sigs, sig)
}
return sigs
}
// GetCommitSigsArray returns the signatures for commit as a array
func (consensus *Consensus) GetCommitSigsArray() []*bls.Sign {
sigs := []*bls.Sign{}
for _, sig := range consensus.commitSigs {
sigs = append(sigs, sig)
}
return sigs
}
// GetBhpSigsArray returns the signatures for prepared message in viewchange
func (consensus *Consensus) GetBhpSigsArray() []*bls.Sign {
sigs := []*bls.Sign{}
@ -227,29 +202,19 @@ func (consensus *Consensus) GetNilSigsArray() []*bls.Sign {
return sigs
}
// GetViewIDSigsArray returns the signatures for viewID in viewchange
func (consensus *Consensus) GetViewIDSigsArray() []*bls.Sign {
sigs := []*bls.Sign{}
for _, sig := range consensus.viewIDSigs {
sigs = append(sigs, sig)
}
return sigs
}
// ResetState resets the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.getLogger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetState] Resetting consensus state")
consensus.switchPhase(Announce, true)
consensus.switchPhase(FBFTAnnounce, true)
consensus.blockHash = [32]byte{}
consensus.blockHeader = []byte{}
consensus.block = []byte{}
consensus.prepareSigs = map[string]*bls.Sign{}
consensus.commitSigs = map[string]*bls.Sign{}
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
consensus.Decider.Reset([]quorum.Phase{quorum.Prepare, quorum.Commit})
members := consensus.Decider.Participants()
prepareBitmap, _ := bls_cosi.NewMask(members, nil)
commitBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
consensus.aggregatedPrepareSig = nil
@ -336,13 +301,13 @@ func (consensus *Consensus) SetViewID(height uint64) {
}
// SetMode sets the mode of consensus
func (consensus *Consensus) SetMode(mode Mode) {
consensus.mode.SetMode(mode)
func (consensus *Consensus) SetMode(m Mode) {
consensus.current.SetMode(m)
}
// Mode returns the mode of consensus
func (consensus *Consensus) Mode() Mode {
return consensus.mode.Mode()
return consensus.current.Mode()
}
// RegisterPRndChannel registers the channel for receiving randomness preimage from DRG protocol
@ -356,14 +321,14 @@ func (consensus *Consensus) RegisterRndChannel(rndChannel chan [548]byte) {
}
// Check viewID, caller's responsibility to hold lock when change ignoreViewIDCheck
func (consensus *Consensus) checkViewID(msg *PbftMessage) error {
func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
// just ignore consensus check for the first time when node join
if consensus.ignoreViewIDCheck {
//in syncing mode, node accepts incoming messages without viewID/leaderKey checking
//so only set mode to normal when new node enters consensus and need checking viewID
consensus.mode.SetMode(Normal)
consensus.current.SetMode(Normal)
consensus.viewID = msg.ViewID
consensus.mode.SetViewID(msg.ViewID)
consensus.current.SetViewID(msg.ViewID)
consensus.LeaderPubKey = msg.SenderPubkey
consensus.ignoreViewIDCheck = false
consensus.consensusTimeout[timeoutConsensus].Start()
@ -399,12 +364,16 @@ func (consensus *Consensus) SetEpochNum(epoch uint64) {
}
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls.Sign, *bls_cosi.Mask, error) {
func (consensus *Consensus) ReadSignatureBitmapPayload(
recvPayload []byte, offset int,
) (*bls.Sign, *bls_cosi.Mask, error) {
if offset+96 > len(recvPayload) {
return nil, nil, errors.New("payload not have enough length")
}
sigAndBitmapPayload := recvPayload[offset:]
return chain.ReadSignatureBitmapByPublicKeys(sigAndBitmapPayload, consensus.PublicKeys)
return chain.ReadSignatureBitmapByPublicKeys(
sigAndBitmapPayload, consensus.Decider.Participants(),
)
}
func (consensus *Consensus) reportMetrics(block types.Block) {
@ -435,7 +404,7 @@ func (consensus *Consensus) reportMetrics(block types.Block) {
"key": hex.EncodeToString(consensus.PubKey.Serialize()),
"tps": tps,
"txCount": numOfTxs,
"nodeCount": len(consensus.PublicKeys) + 1,
"nodeCount": consensus.Decider.ParticipantsCount() + 1,
"latestBlockHash": hex.EncodeToString(consensus.blockHash[:]),
"latestTxHashes": txHashes,
"blockLatency": int(timeElapsed / time.Millisecond),
@ -450,7 +419,7 @@ func (consensus *Consensus) getLogger() *zerolog.Logger {
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.viewID).
Interface("phase", consensus.phase).
Str("mode", consensus.mode.Mode().String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}
@ -499,8 +468,8 @@ func (consensus *Consensus) getLeaderPubKeyFromCoinbase(header *block.Header) (*
// (b) node in committed but has any err during processing: Syncing mode
// (c) node in committed and everything looks good: Normal mode
func (consensus *Consensus) UpdateConsensusInformation() Mode {
var pubKeys []*bls.PublicKey
var hasError bool
pubKeys := []*bls.PublicKey{}
hasError := false
header := consensus.ChainReader.CurrentHeader()
@ -513,7 +482,8 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
if core.IsEpochLastBlockByHeader(header) {
// increase epoch by one if it's the last block
consensus.SetEpochNum(epoch.Uint64() + 1)
consensus.getLogger().Info().Uint64("headerNum", header.Number().Uint64()).Msg("[UpdateConsensusInformation] Epoch updated for next epoch")
consensus.getLogger().Info().Uint64("headerNum", header.Number().Uint64()).
Msg("[UpdateConsensusInformation] Epoch updated for next epoch")
nextEpoch := new(big.Int).Add(epoch, common.Big1)
pubKeys = core.CalculatePublicKeys(nextEpoch, header.ShardID())
} else {
@ -522,7 +492,8 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
}
if len(pubKeys) == 0 {
consensus.getLogger().Warn().Msg("[UpdateConsensusInformation] PublicKeys is Nil")
consensus.getLogger().Warn().
Msg("[UpdateConsensusInformation] PublicKeys is Nil")
hasError = true
}
@ -536,7 +507,8 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
if !core.IsEpochLastBlockByHeader(header) && header.Number().Uint64() != 0 {
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(header)
if err != nil || leaderPubKey == nil {
consensus.getLogger().Debug().Err(err).Msg("[SYNC] Unable to get leaderPubKey from coinbase")
consensus.getLogger().Debug().Err(err).
Msg("[SYNC] Unable to get leaderPubKey from coinbase")
consensus.ignoreViewIDCheck = true
hasError = true
} else {

@ -4,9 +4,10 @@ import (
"bytes"
"testing"
"github.com/harmony-one/harmony/crypto/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
@ -20,7 +21,10 @@ func TestPopulateMessageFields(t *testing.T) {
t.Fatalf("newhost failure: %v", err)
}
blsPriKey := bls.RandPrivateKey()
consensus, err := New(host, 0, leader, blsPriKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, blsPriKey, decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
@ -54,7 +58,10 @@ func TestSignAndMarshalConsensusMessage(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
@ -79,7 +86,10 @@ func TestSetViewID(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}

@ -3,6 +3,8 @@ package consensus
import (
"testing"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
@ -16,7 +18,10 @@ func TestNew(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
test.Fatalf("Cannot craeate consensus: %v", err)
}

@ -10,11 +10,10 @@ import (
"github.com/ethereum/go-ethereum/rlp"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/vdf/src/vdf_go"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
vrf_bls "github.com/harmony-one/harmony/crypto/vrf/bls"
@ -23,6 +22,7 @@ import (
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/vdf/src/vdf_go"
)
// handleMessageUpdate will update the consensus state according to received message
@ -37,14 +37,14 @@ func (consensus *Consensus) handleMessageUpdate(payload []byte) {
return
}
// when node is in ViewChanging mode, it still accepts normal message into PbftLog to avoid possible trap forever
// but drop PREPARE and COMMIT which are message types for leader
if consensus.mode.Mode() == ViewChanging && (msg.Type == msg_pb.MessageType_PREPARE || msg.Type == msg_pb.MessageType_COMMIT) {
// when node is in ViewChanging mode, it still accepts normal messages into FBFTLog
// in order to avoid possible trap forever but drop PREPARE and COMMIT
// which are message types specifically for a node acting as leader
switch {
case (consensus.current.Mode() == ViewChanging) &&
(msg.Type == msg_pb.MessageType_PREPARE || msg.Type == msg_pb.MessageType_COMMIT):
return
}
// listening mode will skip consensus process
if consensus.mode.Mode() == Listening {
case consensus.current.Mode() == Listening:
return
}
@ -105,38 +105,44 @@ func (consensus *Consensus) announce(block *types.Block) {
consensus.blockHeader = encodedBlockHeader
msgToSend := consensus.constructAnnounceMessage()
// save announce message to PbftLog
// save announce message to FBFTLog
msgPayload, _ := proto.GetConsensusMessagePayload(msgToSend)
// TODO(chao): don't unmarshall the message here and direclty pass the original object.
msg := &msg_pb.Message{}
_ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg)
FPBTMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Warn().Err(err).Msg("[Announce] Unable to parse pbft message")
utils.Logger().Warn().Err(err).Msg("[Announce] Unable to parse FPBT message")
return
}
// TODO(chao): review pbft log data structure
consensus.PbftLog.AddMessage(pbftMsg)
// TODO(chao): review FPBT log data structure
consensus.FBFTLog.AddMessage(FPBTMsg)
utils.Logger().Debug().
Str("MsgBlockHash", pbftMsg.BlockHash.Hex()).
Uint64("MsgViewID", pbftMsg.ViewID).
Uint64("MsgBlockNum", pbftMsg.BlockNum).
Msg("[Announce] Added Announce message in pbftLog")
consensus.PbftLog.AddBlock(block)
Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()).
Uint64("MsgViewID", FPBTMsg.ViewID).
Uint64("MsgBlockNum", FPBTMsg.BlockNum).
Msg("[Announce] Added Announce message in FPBT")
consensus.FBFTLog.AddBlock(block)
// Leader sign the block hash itself
consensus.prepareSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(consensus.blockHash[:])
consensus.Decider.AddSignature(
quorum.Prepare, consensus.PubKey, consensus.priKey.SignHash(consensus.blockHash[:]),
)
if err := consensus.prepareBitmap.SetKey(consensus.PubKey, true); err != nil {
utils.Logger().Warn().Err(err).Msg("[Announce] Leader prepareBitmap SetKey failed")
return
}
// Construct broadcast p2p message
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().
Str("groupID", string(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)))).
Str("groupID", string(nodeconfig.NewGroupIDByShardID(
nodeconfig.ShardID(consensus.ShardID),
))).
Msg("[Announce] Cannot send announce message")
} else {
utils.Logger().Info().
@ -147,14 +153,14 @@ func (consensus *Consensus) announce(block *types.Block) {
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Prepare.String()).
Str("To", FBFTPrepare.String()).
Msg("[Announce] Switching phase")
consensus.switchPhase(Prepare, true)
consensus.switchPhase(FBFTPrepare, true)
}
func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
utils.Logger().Debug().Msg("[OnAnnounce] Receive announce message")
if consensus.IsLeader() && consensus.mode.Mode() == Normal {
if consensus.IsLeader() && consensus.current.Mode() == Normal {
return
}
@ -163,7 +169,8 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
utils.Logger().Error().Err(err).Msg("[OnAnnounce] VerifySenderKey failed")
return
}
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
if !senderKey.IsEqual(consensus.LeaderPubKey) &&
consensus.current.Mode() == Normal && !consensus.ignoreViewIDCheck {
utils.Logger().Warn().
Str("senderKey", senderKey.SerializeToHexStr()).
Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
@ -175,7 +182,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
return
}
recvMsg, err := ParsePbftMessage(msg)
recvMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Error().
Err(err).
@ -205,7 +212,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
Msg("[OnAnnounce] BlockNum does not match")
return
}
if consensus.mode.Mode() == Normal {
if consensus.current.Mode() == Normal {
if err = chain.Engine.VerifyHeader(consensus.ChainReader, header, true); err != nil {
utils.Logger().Warn().
Err(err).
@ -233,9 +240,12 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
}
}
logMsgs := consensus.PbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID)
logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView(
msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID,
)
if len(logMsgs) > 0 {
if logMsgs[0].BlockHash != recvMsg.BlockHash && logMsgs[0].SenderPubkey.IsEqual(recvMsg.SenderPubkey) {
if logMsgs[0].BlockHash != recvMsg.BlockHash &&
logMsgs[0].SenderPubkey.IsEqual(recvMsg.SenderPubkey) {
utils.Logger().Debug().
Str("leaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("[OnAnnounce] Leader is malicious")
@ -251,7 +261,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
Msg("[OnAnnounce] Announce message Added")
consensus.PbftLog.AddMessage(recvMsg)
consensus.FBFTLog.AddMessage(recvMsg)
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
@ -259,13 +269,13 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
consensus.blockHash = recvMsg.BlockHash
// we have already added message and block, skip check viewID and send prepare message if is in ViewChanging mode
if consensus.mode.Mode() == ViewChanging {
if consensus.current.Mode() == ViewChanging {
utils.Logger().Debug().Msg("[OnAnnounce] Still in ViewChanging Mode, Exiting !!")
return
}
if consensus.checkViewID(recvMsg) != nil {
if consensus.mode.Mode() == Normal {
if consensus.current.Mode() == Normal {
utils.Logger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
@ -293,9 +303,9 @@ func (consensus *Consensus) prepare() {
}
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Prepare.String()).
Str("To", FBFTPrepare.String()).
Msg("[Announce] Switching Phase")
consensus.switchPhase(Prepare, true)
consensus.switchPhase(FBFTPrepare, true)
}
// TODO: move to consensus_leader.go later
@ -314,7 +324,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return
}
recvMsg, err := ParsePbftMessage(msg)
recvMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Error().Err(err).Msg("[OnPrepare] Unparseable validator message")
return
@ -329,7 +339,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return
}
if !consensus.PbftLog.HasMatchingViewAnnounce(consensus.blockNum, consensus.viewID, recvMsg.BlockHash) {
if !consensus.FBFTLog.HasMatchingViewAnnounce(
consensus.blockNum, consensus.viewID, recvMsg.BlockHash,
) {
utils.Logger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
@ -338,24 +350,24 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
//return
}
validatorPubKey := recvMsg.SenderPubkey.SerializeToHexStr()
validatorPubKey := recvMsg.SenderPubkey
prepareSig := recvMsg.Payload
prepareSigs := consensus.prepareSigs
prepareBitmap := consensus.prepareBitmap
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
logger := utils.Logger().With().Str("validatorPubKey", validatorPubKey).Logger()
if len(prepareSigs) >= consensus.Quorum() {
logger := utils.Logger().With().
Str("validatorPubKey", validatorPubKey.SerializeToHexStr()).Logger()
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) {
// already have enough signatures
logger.Debug().Msg("[OnPrepare] Received Additional Prepare Message")
return
}
// proceed only when the message is not received before
_, ok := prepareSigs[validatorPubKey]
if ok {
logger.Debug().Msg("[OnPrepare] Already Received prepare message from the validator")
signed := consensus.Decider.ReadSignature(quorum.Prepare, validatorPubKey)
if signed != nil {
logger.Debug().
Msg("[OnPrepare] Already Received prepare message from the validator")
return
}
@ -363,7 +375,8 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
var sign bls.Sign
err = sign.Deserialize(prepareSig)
if err != nil {
utils.Logger().Error().Err(err).Msg("[OnPrepare] Failed to deserialize bls signature")
utils.Logger().Error().Err(err).
Msg("[OnPrepare] Failed to deserialize bls signature")
return
}
if !sign.VerifyHash(recvMsg.SenderPubkey, consensus.blockHash[:]) {
@ -371,16 +384,18 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
return
}
logger = logger.With().Int("NumReceivedSoFar", len(prepareSigs)).Int("PublicKeys", len(consensus.PublicKeys)).Logger()
logger = logger.With().
Int64("NumReceivedSoFar", consensus.Decider.SignatoriesCount(quorum.Prepare)).
Int64("PublicKeys", consensus.Decider.ParticipantsCount()).Logger()
logger.Info().Msg("[OnPrepare] Received New Prepare Signature")
prepareSigs[validatorPubKey] = &sign
consensus.Decider.AddSignature(quorum.Prepare, validatorPubKey, &sign)
// Set the bitmap indicating that this validator signed.
if err := prepareBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil {
utils.Logger().Warn().Err(err).Msg("[OnPrepare] prepareBitmap.SetKey failed")
return
}
if len(prepareSigs) >= consensus.Quorum() {
if consensus.Decider.IsQuorumAchieved(quorum.Prepare) {
logger.Debug().Msg("[OnPrepare] Received Enough Prepare Signatures")
// Construct and broadcast prepared message
msgToSend, aggSig := consensus.constructPreparedMessage()
@ -391,24 +406,32 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
msgPayload, _ := proto.GetConsensusMessagePayload(msgToSend)
msg := &msg_pb.Message{}
_ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg)
FBFTMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Warn().Err(err).Msg("[OnPrepare] Unable to parse pbft message")
return
}
consensus.PbftLog.AddMessage(pbftMsg)
consensus.FBFTLog.AddMessage(FBFTMsg)
// Leader add commit phase signature
blockNumHash := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumHash, consensus.blockNum)
commitPayload := append(blockNumHash, consensus.blockHash[:]...)
consensus.commitSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(commitPayload)
consensus.Decider.AddSignature(
quorum.Commit, consensus.PubKey, consensus.priKey.SignHash(commitPayload),
)
if err := consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil {
utils.Logger().Debug().Msg("[OnPrepare] Leader commit bitmap set failed")
return
}
if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(
consensus.blockNum,
msg_pb.MessageType_PREPARED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
host.ConstructP2pMessage(byte(17), msgToSend),
); err != nil {
utils.Logger().Warn().Msg("[OnPrepare] Cannot send prepared message")
} else {
utils.Logger().Debug().
@ -417,20 +440,21 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
Msg("[OnPrepare] Sent Prepared Message!!")
}
consensus.msgSender.StopRetry(msg_pb.MessageType_ANNOUNCE)
consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) // Stop retry committed msg of last consensus
// Stop retry committed msg of last consensus
consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED)
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Str("To", FBFTCommit.String()).
Msg("[OnPrepare] Switching phase")
consensus.switchPhase(Commit, true)
consensus.switchPhase(FBFTCommit, true)
}
return
}
func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
utils.Logger().Debug().Msg("[OnPrepared] Received Prepared message")
if consensus.IsLeader() && consensus.mode.Mode() == Normal {
if consensus.IsLeader() && consensus.current.Mode() == Normal {
return
}
@ -439,7 +463,8 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
utils.Logger().Debug().Err(err).Msg("[OnPrepared] VerifySenderKey failed")
return
}
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
if !senderKey.IsEqual(consensus.LeaderPubKey) &&
consensus.current.Mode() == Normal && !consensus.ignoreViewIDCheck {
utils.Logger().Warn().Msg("[OnPrepared] SenderKey not match leader PubKey")
return
}
@ -448,7 +473,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
return
}
recvMsg, err := ParsePbftMessage(msg)
recvMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Debug().Err(err).Msg("[OnPrepared] Unparseable validator message")
return
@ -470,10 +495,11 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
utils.Logger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!!")
return
}
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
prepareCount := consensus.Decider.SignatoriesCount(quorum.Prepare)
if count := utils.CountOneBits(mask.Bitmap); count < prepareCount {
utils.Logger().Debug().
Int("Need", consensus.Quorum()).
Int("Got", count).
Int64("Need", prepareCount).
Int64("Got", count).
Msg("Not enough signatures in the Prepared msg")
return
}
@ -513,8 +539,9 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
Msg("[OnPrepared] BlockHash not match")
return
}
if consensus.mode.Mode() == Normal {
if err := chain.Engine.VerifyHeader(consensus.ChainReader, blockObj.Header(), true); err != nil {
if consensus.current.Mode() == Normal {
err := chain.Engine.VerifyHeader(consensus.ChainReader, blockObj.Header(), true)
if err != nil {
utils.Logger().Error().
Err(err).
Str("inChain", consensus.ChainReader.CurrentHeader().Number().String()).
@ -530,9 +557,9 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
}
}
consensus.PbftLog.AddBlock(&blockObj)
consensus.FBFTLog.AddBlock(&blockObj)
recvMsg.Block = []byte{} // save memory space
consensus.PbftLog.AddMessage(recvMsg)
consensus.FBFTLog.AddMessage(recvMsg)
utils.Logger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
@ -543,13 +570,13 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
defer consensus.mutex.Unlock()
consensus.tryCatchup()
if consensus.mode.Mode() == ViewChanging {
if consensus.current.Mode() == ViewChanging {
utils.Logger().Debug().Msg("[OnPrepared] Still in ViewChanging mode, Exiting!!")
return
}
if consensus.checkViewID(recvMsg) != nil {
if consensus.mode.Mode() == Normal {
if consensus.current.Mode() == Normal {
utils.Logger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
Uint64("MsgBlockNum", recvMsg.BlockNum).
@ -603,9 +630,9 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Str("To", FBFTCommit.String()).
Msg("[OnPrepared] Switching phase")
consensus.switchPhase(Commit, true)
consensus.switchPhase(FBFTCommit, true)
return
}
@ -626,7 +653,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
recvMsg, err := ParsePbftMessage(msg)
recvMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Debug().Err(err).Msg("[OnCommit] Parse pbft message failed")
return
@ -642,7 +669,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
if !consensus.PbftLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
if !consensus.FBFTLog.HasMatchingAnnounce(consensus.blockNum, recvMsg.BlockHash) {
utils.Logger().Debug().
Hex("MsgBlockHash", recvMsg.BlockHash[:]).
Uint64("MsgBlockNum", recvMsg.BlockNum).
@ -651,7 +678,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
if !consensus.PbftLog.HasMatchingPrepared(consensus.blockNum, recvMsg.BlockHash) {
if !consensus.FBFTLog.HasMatchingPrepared(consensus.blockNum, recvMsg.BlockHash) {
utils.Logger().Debug().
Hex("blockHash", recvMsg.BlockHash[:]).
Uint64("blockNum", consensus.blockNum).
@ -659,32 +686,30 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
validatorPubKey := recvMsg.SenderPubkey.SerializeToHexStr()
validatorPubKey := recvMsg.SenderPubkey
commitSig := recvMsg.Payload
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
logger := utils.Logger().With().Str("validatorPubKey", validatorPubKey).Logger()
logger := utils.Logger().With().
Str("validatorPubKey", validatorPubKey.SerializeToHexStr()).Logger()
if !consensus.IsValidatorInCommittee(recvMsg.SenderPubkey) {
logger.Error().Msg("[OnCommit] Invalid validator")
return
}
commitSigs := consensus.commitSigs
commitBitmap := consensus.commitBitmap
// proceed only when the message is not received before
_, ok := commitSigs[validatorPubKey]
if ok {
logger.Debug().Msg("[OnCommit] Already received commit message from the validator")
signed := consensus.Decider.ReadSignature(quorum.Commit, validatorPubKey)
if signed != nil {
logger.Debug().
Msg("[OnCommit] Already received commit message from the validator")
return
}
// has to be called before verifying signature
quorumWasMet := len(commitSigs) >= consensus.Quorum()
quorumWasMet := consensus.Decider.IsQuorumAchieved(quorum.Commit)
// Verify the signature on commitPayload is correct
var sign bls.Sign
err = sign.Deserialize(commitSig)
@ -701,18 +726,18 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
return
}
logger = logger.With().Int("numReceivedSoFar", len(commitSigs)).Logger()
logger = logger.With().
Int64("numReceivedSoFar", consensus.Decider.SignatoriesCount(quorum.Commit)).
Logger()
logger.Info().Msg("[OnCommit] Received new commit message")
commitSigs[validatorPubKey] = &sign
consensus.Decider.AddSignature(quorum.Commit, validatorPubKey, &sign)
// Set the bitmap indicating that this validator signed.
if err := commitBitmap.SetKey(recvMsg.SenderPubkey, true); err != nil {
utils.Logger().Warn().Err(err).Msg("[OnCommit] commitBitmap.SetKey failed")
return
}
quorumIsMet := len(commitSigs) >= consensus.Quorum()
rewardThresholdIsMet := len(commitSigs) >= consensus.RewardThreshold()
quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit)
if !quorumWasMet && quorumIsMet {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
go func(viewID uint64) {
@ -724,7 +749,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED)
}
if rewardThresholdIsMet {
if consensus.Decider.IsRewardThresholdAchieved() {
go func(viewID uint64) {
consensus.commitFinishChan <- viewID
logger.Info().Msg("[OnCommit] 90% Enough commits received")
@ -733,11 +758,11 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
}
func (consensus *Consensus) finalizeCommits() {
utils.Logger().Info().Int("NumCommits", len(consensus.commitSigs)).Msg("[Finalizing] Finalizing Block")
utils.Logger().Info().
Int64("NumCommits", consensus.Decider.SignatoriesCount(quorum.Commit)).
Msg("[Finalizing] Finalizing Block")
beforeCatchupNum := consensus.blockNum
//beforeCatchupViewID := consensus.viewID
// Construct committed message
msgToSend, aggSig := consensus.constructCommittedMessage()
consensus.aggregatedCommitSig = aggSig // this may not needed
@ -746,16 +771,16 @@ func (consensus *Consensus) finalizeCommits() {
msgPayload, _ := proto.GetConsensusMessagePayload(msgToSend)
msg := &msg_pb.Message{}
_ = protobuf.Unmarshal(msgPayload, msg)
pbftMsg, err := ParsePbftMessage(msg)
pbftMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Warn().Err(err).Msg("[FinalizeCommits] Unable to parse pbft message")
return
}
consensus.PbftLog.AddMessage(pbftMsg)
consensus.FBFTLog.AddMessage(pbftMsg)
consensus.ChainReader.WriteLastCommits(pbftMsg.Payload)
// find correct block content
block := consensus.PbftLog.GetBlockByHash(consensus.blockHash)
block := consensus.FBFTLog.GetBlockByHash(consensus.blockHash)
if block == nil {
utils.Logger().Warn().
Str("blockHash", hex.EncodeToString(consensus.blockHash[:])).
@ -771,7 +796,12 @@ func (consensus *Consensus) finalizeCommits() {
}
// if leader success finalize the block, send committed message to validators
if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
if err := consensus.msgSender.SendWithRetry(
block.NumberU64(),
msg_pb.MessageType_COMMITTED, []nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
utils.Logger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message")
} else {
utils.Logger().Info().
@ -800,7 +830,7 @@ func (consensus *Consensus) finalizeCommits() {
Uint64("blockNum", block.NumberU64()).
Uint64("ViewId", block.Header().ViewID().Uint64()).
Str("blockHash", block.Hash().String()).
Int("index", consensus.getIndexOfPubKey(consensus.PubKey)).
Int("index", consensus.Decider.IndexOf(consensus.PubKey)).
Msg("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!")
// Print to normal log too
utils.GetLogInstance().Info("HOORAY!!!!!!! CONSENSUS REACHED!!!!!!!", "BlockNum", block.NumberU64())
@ -812,7 +842,7 @@ func (consensus *Consensus) finalizeCommits() {
func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
utils.Logger().Debug().Msg("[OnCommitted] Receive committed message")
if consensus.IsLeader() && consensus.mode.Mode() == Normal {
if consensus.IsLeader() && consensus.current.Mode() == Normal {
return
}
@ -821,7 +851,8 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
utils.Logger().Warn().Err(err).Msg("[OnCommitted] verifySenderKey failed")
return
}
if !senderKey.IsEqual(consensus.LeaderPubKey) && consensus.mode.Mode() == Normal && !consensus.ignoreViewIDCheck {
if !senderKey.IsEqual(consensus.LeaderPubKey) &&
consensus.current.Mode() == Normal && !consensus.ignoreViewIDCheck {
utils.Logger().Warn().Msg("[OnCommitted] senderKey not match leader PubKey")
return
}
@ -830,7 +861,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return
}
recvMsg, err := ParsePbftMessage(msg)
recvMsg, err := ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Warn().Msg("[OnCommitted] unable to parse msg")
return
@ -850,14 +881,21 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return
}
// check has 2f+1 signatures
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
switch consensus.Decider.Policy() {
case quorum.SuperMajorityVote:
threshold := consensus.Decider.QuorumThreshold()
if count := utils.CountOneBits(mask.Bitmap); int64(count) < threshold {
utils.Logger().Warn().
Int("need", consensus.Quorum()).
Int("got", count).
Int64("need", threshold).
Int64("got", count).
Msg("[OnCommitted] Not enough signature in committed msg")
return
}
case quorum.SuperMajorityStake:
// Come back to thinking about this situation for Bitmap
}
// check has 2f+1 signatures
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, recvMsg.BlockNum)
@ -869,7 +907,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
return
}
consensus.PbftLog.AddMessage(recvMsg)
consensus.FBFTLog.AddMessage(recvMsg)
consensus.ChainReader.WriteLastCommits(recvMsg.Payload)
utils.Logger().Debug().
Uint64("MsgViewID", recvMsg.ViewID).
@ -887,7 +925,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
go func() {
select {
case consensus.blockNumLowChan <- struct{}{}:
consensus.mode.SetMode(Syncing)
consensus.current.SetMode(Syncing)
for _, v := range consensus.consensusTimeout {
v.Stop()
}
@ -903,7 +941,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
// }
consensus.tryCatchup()
if consensus.mode.Mode() == ViewChanging {
if consensus.current.Mode() == ViewChanging {
utils.Logger().Debug().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!")
return
}
@ -925,7 +963,7 @@ func (consensus *Consensus) LastCommitSig() ([]byte, []byte, error) {
}
lastCommits, err := consensus.ChainReader.ReadLastCommits()
if err != nil || len(lastCommits) < 96 {
msgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_COMMITTED, consensus.blockNum-1)
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(msg_pb.MessageType_COMMITTED, consensus.blockNum-1)
if len(msgs) != 1 {
return nil, nil, ctxerror.New("GetLastCommitSig failed with wrong number of committed message", "numCommittedMsg", len(msgs))
}
@ -950,7 +988,7 @@ func (consensus *Consensus) tryCatchup() {
// }
currentBlockNum := consensus.blockNum
for {
msgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_COMMITTED, consensus.blockNum)
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(msg_pb.MessageType_COMMITTED, consensus.blockNum)
if len(msgs) == 0 {
break
}
@ -961,7 +999,7 @@ func (consensus *Consensus) tryCatchup() {
}
utils.Logger().Info().Msg("[TryCatchup] committed message found")
block := consensus.PbftLog.GetBlockByHash(msgs[0].BlockHash)
block := consensus.FBFTLog.GetBlockByHash(msgs[0].BlockHash)
if block == nil {
break
}
@ -979,8 +1017,8 @@ func (consensus *Consensus) tryCatchup() {
}
utils.Logger().Info().Msg("[TryCatchup] block found to commit")
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash)
msg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, msgs[0].BlockNum, msgs[0].BlockHash)
msg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs)
if msg == nil {
break
}
@ -1011,16 +1049,17 @@ func (consensus *Consensus) tryCatchup() {
Uint64("From", currentBlockNum).
Uint64("To", consensus.blockNum).
Msg("[TryCatchup] Caught up!")
consensus.switchPhase(Announce, true)
consensus.switchPhase(FBFTAnnounce, true)
}
// catup up and skip from view change trap
if currentBlockNum < consensus.blockNum && consensus.mode.Mode() == ViewChanging {
consensus.mode.SetMode(Normal)
if currentBlockNum < consensus.blockNum &&
consensus.current.Mode() == ViewChanging {
consensus.current.SetMode(Normal)
consensus.consensusTimeout[timeoutViewChange].Stop()
}
// clean up old log
consensus.PbftLog.DeleteBlocksLessThan(consensus.blockNum - 1)
consensus.PbftLog.DeleteMessagesLessThan(consensus.blockNum - 1)
consensus.FBFTLog.DeleteBlocksLessThan(consensus.blockNum - 1)
consensus.FBFTLog.DeleteMessagesLessThan(consensus.blockNum - 1)
}
// Start waits for the next new block and run consensus
@ -1057,7 +1096,8 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
continue
}
for k, v := range consensus.consensusTimeout {
if consensus.mode.Mode() == Syncing || consensus.mode.Mode() == Listening {
if consensus.current.Mode() == Syncing ||
consensus.current.Mode() == Listening {
v.Stop()
}
if !v.CheckExpire() {
@ -1069,7 +1109,7 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
break
} else {
utils.Logger().Debug().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!")
viewID := consensus.mode.ViewID()
viewID := consensus.current.ViewID()
consensus.startViewChange(viewID + 1)
break
}
@ -1078,12 +1118,12 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
consensus.SetBlockNum(consensus.ChainReader.CurrentHeader().Number().Uint64() + 1)
consensus.SetViewID(consensus.ChainReader.CurrentHeader().ViewID().Uint64() + 1)
mode := consensus.UpdateConsensusInformation()
consensus.mode.SetMode(mode)
consensus.current.SetMode(mode)
utils.Logger().Info().Str("Mode", mode.String()).Msg("Node is in sync")
case <-consensus.syncNotReadyChan:
consensus.SetBlockNum(consensus.ChainReader.CurrentHeader().Number().Uint64() + 1)
consensus.mode.SetMode(Syncing)
consensus.current.SetMode(Syncing)
utils.Logger().Info().Msg("Node is out of sync")
case newBlock := <-blockChannel:
@ -1173,7 +1213,7 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan
utils.Logger().Debug().
Int("numTxs", len(newBlock.Transactions())).
Time("startTime", startTime).
Int("publicKeys", len(consensus.PublicKeys)).
Int64("publicKeys", consensus.Decider.ParticipantsCount()).
Msg("[ConsensusMainLoop] STARTING CONSENSUS")
consensus.announce(newBlock)
@ -1219,15 +1259,16 @@ func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockN
// ValidateVrfAndProof validates a VRF/Proof from hash of previous block
func (consensus *Consensus) ValidateVrfAndProof(headerObj *block.Header) bool {
vrfPk := vrf_bls.NewVRFVerifier(consensus.LeaderPubKey)
var blockHash [32]byte
previousHeader := consensus.ChainReader.GetHeaderByNumber(headerObj.Number().Uint64() - 1)
previousHeader := consensus.ChainReader.GetHeaderByNumber(
headerObj.Number().Uint64() - 1,
)
previousHash := previousHeader.Hash()
copy(blockHash[:], previousHash[:])
vrfProof := [96]byte{}
copy(vrfProof[:], headerObj.Vrf()[32:])
hash, err := vrfPk.ProofToHash(blockHash[:], vrfProof[:])
if err != nil {
consensus.getLogger().Warn().
Err(err).
@ -1243,7 +1284,9 @@ func (consensus *Consensus) ValidateVrfAndProof(headerObj *block.Header) bool {
return false
}
vrfBlockNumbers, _ := consensus.ChainReader.ReadEpochVrfBlockNums(headerObj.Epoch())
vrfBlockNumbers, _ := consensus.ChainReader.ReadEpochVrfBlockNums(
headerObj.Epoch(),
)
consensus.getLogger().Info().
Str("MsgBlockNum", headerObj.Number().String()).
Int("Number of VRF", len(vrfBlockNumbers)).

@ -3,11 +3,12 @@ package consensus
import (
"testing"
"github.com/harmony-one/harmony/crypto/bls"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
@ -20,7 +21,10 @@ func TestConstructPrepareMessage(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
test.Fatalf("Cannot craeate consensus: %v", err)
}
@ -48,7 +52,10 @@ func TestConstructCommitMessage(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
test.Fatalf("Cannot craeate consensus: %v", err)
}

@ -5,6 +5,7 @@ import (
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
)
@ -20,7 +21,7 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
}
vcMsg := message.GetViewchange()
vcMsg.ViewId = consensus.mode.ViewID()
vcMsg.ViewId = consensus.current.ViewID()
vcMsg.BlockNum = consensus.blockNum
vcMsg.ShardId = consensus.ShardID
// sender address
@ -29,8 +30,10 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
// next leader key already updated
vcMsg.LeaderPubkey = consensus.LeaderPubKey.Serialize()
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, consensus.blockNum, consensus.blockHash)
preparedMsg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeqHash(
msg_pb.MessageType_PREPARED, consensus.blockNum, consensus.blockHash,
)
preparedMsg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs)
var msgToSign []byte
if preparedMsg == nil {
@ -55,7 +58,7 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
}
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, consensus.mode.ViewID())
binary.LittleEndian.PutUint64(viewIDBytes, consensus.current.ViewID())
sign1 := consensus.priKey.SignHash(viewIDBytes)
if sign1 != nil {
vcMsg.ViewidSig = sign1.Serialize()
@ -65,7 +68,8 @@ func (consensus *Consensus) constructViewChangeMessage() []byte {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil {
utils.Logger().Error().Err(err).Msg("[constructViewChangeMessage] failed to sign and marshal the viewchange message")
utils.Logger().Error().Err(err).
Msg("[constructViewChangeMessage] failed to sign and marshal the viewchange message")
}
return proto.ConstructConsensusMessage(marshaledMessage)
}
@ -81,7 +85,7 @@ func (consensus *Consensus) constructNewViewMessage() []byte {
}
vcMsg := message.GetViewchange()
vcMsg.ViewId = consensus.mode.ViewID()
vcMsg.ViewId = consensus.current.ViewID()
vcMsg.BlockNum = consensus.blockNum
vcMsg.ShardId = consensus.ShardID
// sender address
@ -96,7 +100,7 @@ func (consensus *Consensus) constructNewViewMessage() []byte {
vcMsg.M2Bitmap = consensus.nilBitmap.Bitmap
}
sig3arr := consensus.GetViewIDSigsArray()
sig3arr := consensus.Decider.ReadAllSignatures(quorum.ViewChange)
utils.Logger().Debug().Int("len", len(sig3arr)).Msg("[constructNewViewMessage] M3 (ViewID) type signatures")
// even we check here for safty, m3 type signatures must >= 2f+1
if len(sig3arr) > 0 {
@ -107,7 +111,8 @@ func (consensus *Consensus) constructNewViewMessage() []byte {
marshaledMessage, err := consensus.signAndMarshalConsensusMessage(message)
if err != nil {
utils.Logger().Error().Err(err).Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
utils.Logger().Error().Err(err).
Msg("[constructNewViewMessage] failed to sign and marshal the new view message")
}
return proto.ConstructConsensusMessage(marshaledMessage)
}

@ -0,0 +1,55 @@
package consensus
import "fmt"
// Mode is the current
type Mode byte
const (
// Normal ..
Normal Mode = iota
// ViewChanging ..
ViewChanging
// Syncing ..
Syncing
// Listening ..
Listening
)
// FBFTPhase : different phases of consensus
type FBFTPhase byte
// Enum for FBFTPhase
const (
FBFTAnnounce FBFTPhase = iota
FBFTPrepare
FBFTCommit
)
var (
modeNames = map[Mode]string{
Normal: "Normal",
ViewChanging: "ViewChanging",
Syncing: "Syncing",
Listening: "Listening",
}
phaseNames = map[FBFTPhase]string{
FBFTAnnounce: "Announce",
FBFTPrepare: "Prepare",
FBFTCommit: "Commit",
}
)
func (m Mode) String() string {
if name, ok := modeNames[m]; ok {
return name
}
return fmt.Sprintf("Mode %+v", byte(m))
}
func (p FBFTPhase) String() string {
if name, ok := phaseNames[p]; ok {
return name
}
return fmt.Sprintf("FBFTPhase %+v", byte(p))
}

@ -13,16 +13,16 @@ import (
"github.com/harmony-one/harmony/internal/utils"
)
// PbftLog represents the log stored by a node during PBFT process
type PbftLog struct {
blocks mapset.Set //store blocks received in PBFT
messages mapset.Set // store messages received in PBFT
// FBFTLog represents the log stored by a node during FBFT process
type FBFTLog struct {
blocks mapset.Set //store blocks received in FBFT
messages mapset.Set // store messages received in FBFT
maxLogSize uint32
mutex sync.Mutex
}
// PbftMessage is the record of pbft messages received by a node during PBFT process
type PbftMessage struct {
// FBFTMessage is the record of pbft messages received by a node during FBFT process
type FBFTMessage struct {
MessageType msg_pb.MessageType
ViewID uint64
BlockNum uint64
@ -39,32 +39,32 @@ type PbftMessage struct {
M3Bitmap *bls_cosi.Mask
}
// NewPbftLog returns new instance of PbftLog
func NewPbftLog() *PbftLog {
// NewFBFTLog returns new instance of FBFTLog
func NewFBFTLog() *FBFTLog {
blocks := mapset.NewSet()
messages := mapset.NewSet()
logSize := maxLogSize
pbftLog := PbftLog{blocks: blocks, messages: messages, maxLogSize: logSize}
pbftLog := FBFTLog{blocks: blocks, messages: messages, maxLogSize: logSize}
return &pbftLog
}
// Blocks return the blocks stored in the log
func (log *PbftLog) Blocks() mapset.Set {
func (log *FBFTLog) Blocks() mapset.Set {
return log.blocks
}
// Messages return the messages stored in the log
func (log *PbftLog) Messages() mapset.Set {
func (log *FBFTLog) Messages() mapset.Set {
return log.messages
}
// AddBlock add a new block into the log
func (log *PbftLog) AddBlock(block *types.Block) {
func (log *FBFTLog) AddBlock(block *types.Block) {
log.blocks.Add(block)
}
// GetBlockByHash returns the block matches the given block hash
func (log *PbftLog) GetBlockByHash(hash common.Hash) *types.Block {
func (log *FBFTLog) GetBlockByHash(hash common.Hash) *types.Block {
var found *types.Block
it := log.Blocks().Iterator()
for block := range it.C {
@ -77,7 +77,7 @@ func (log *PbftLog) GetBlockByHash(hash common.Hash) *types.Block {
}
// GetBlocksByNumber returns the blocks match the given block number
func (log *PbftLog) GetBlocksByNumber(number uint64) []*types.Block {
func (log *FBFTLog) GetBlocksByNumber(number uint64) []*types.Block {
found := []*types.Block{}
it := log.Blocks().Iterator()
for block := range it.C {
@ -89,7 +89,7 @@ func (log *PbftLog) GetBlocksByNumber(number uint64) []*types.Block {
}
// DeleteBlocksLessThan deletes blocks less than given block number
func (log *PbftLog) DeleteBlocksLessThan(number uint64) {
func (log *FBFTLog) DeleteBlocksLessThan(number uint64) {
found := mapset.NewSet()
it := log.Blocks().Iterator()
for block := range it.C {
@ -101,7 +101,7 @@ func (log *PbftLog) DeleteBlocksLessThan(number uint64) {
}
// DeleteBlockByNumber deletes block of specific number
func (log *PbftLog) DeleteBlockByNumber(number uint64) {
func (log *FBFTLog) DeleteBlockByNumber(number uint64) {
found := mapset.NewSet()
it := log.Blocks().Iterator()
for block := range it.C {
@ -113,11 +113,11 @@ func (log *PbftLog) DeleteBlockByNumber(number uint64) {
}
// DeleteMessagesLessThan deletes messages less than given block number
func (log *PbftLog) DeleteMessagesLessThan(number uint64) {
func (log *FBFTLog) DeleteMessagesLessThan(number uint64) {
found := mapset.NewSet()
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*PbftMessage).BlockNum < number {
if msg.(*FBFTMessage).BlockNum < number {
found.Add(msg)
}
}
@ -125,85 +125,85 @@ func (log *PbftLog) DeleteMessagesLessThan(number uint64) {
}
// AddMessage adds a pbft message into the log
func (log *PbftLog) AddMessage(msg *PbftMessage) {
func (log *FBFTLog) AddMessage(msg *FBFTMessage) {
log.messages.Add(msg)
}
// GetMessagesByTypeSeqViewHash returns pbft messages with matching type, blockNum, viewID and blockHash
func (log *PbftLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*PbftMessage {
found := []*PbftMessage{}
func (log *FBFTLog) GetMessagesByTypeSeqViewHash(typ msg_pb.MessageType, blockNum uint64, viewID uint64, blockHash common.Hash) []*FBFTMessage {
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*PbftMessage).MessageType == typ && msg.(*PbftMessage).BlockNum == blockNum && msg.(*PbftMessage).ViewID == viewID && msg.(*PbftMessage).BlockHash == blockHash {
found = append(found, msg.(*PbftMessage))
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum && msg.(*FBFTMessage).ViewID == viewID && msg.(*FBFTMessage).BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage))
}
}
return found
}
// GetMessagesByTypeSeq returns pbft messages with matching type, blockNum
func (log *PbftLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*PbftMessage {
found := []*PbftMessage{}
func (log *FBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage {
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*PbftMessage).MessageType == typ && msg.(*PbftMessage).BlockNum == blockNum {
found = append(found, msg.(*PbftMessage))
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum {
found = append(found, msg.(*FBFTMessage))
}
}
return found
}
// GetMessagesByTypeSeqHash returns pbft messages with matching type, blockNum
func (log *PbftLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*PbftMessage {
found := []*PbftMessage{}
func (log *FBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage {
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*PbftMessage).MessageType == typ && msg.(*PbftMessage).BlockNum == blockNum && msg.(*PbftMessage).BlockHash == blockHash {
found = append(found, msg.(*PbftMessage))
if msg.(*FBFTMessage).MessageType == typ && msg.(*FBFTMessage).BlockNum == blockNum && msg.(*FBFTMessage).BlockHash == blockHash {
found = append(found, msg.(*FBFTMessage))
}
}
return found
}
// HasMatchingAnnounce returns whether the log contains announce type message with given blockNum, blockHash
func (log *PbftLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool {
func (log *FBFTLog) HasMatchingAnnounce(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_ANNOUNCE, blockNum, blockHash)
return len(found) >= 1
}
// HasMatchingViewAnnounce returns whether the log contains announce type message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
func (log *FBFTLog) HasMatchingViewAnnounce(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, blockNum, viewID, blockHash)
return len(found) >= 1
}
// HasMatchingPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingPrepared(blockNum uint64, blockHash common.Hash) bool {
func (log *FBFTLog) HasMatchingPrepared(blockNum uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqHash(msg_pb.MessageType_PREPARED, blockNum, blockHash)
return len(found) >= 1
}
// HasMatchingViewPrepared returns whether the log contains prepared message with given blockNum, viewID and blockHash
func (log *PbftLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
func (log *FBFTLog) HasMatchingViewPrepared(blockNum uint64, viewID uint64, blockHash common.Hash) bool {
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_PREPARED, blockNum, viewID, blockHash)
return len(found) >= 1
}
// GetMessagesByTypeSeqView returns pbft messages with matching type, blockNum and viewID
func (log *PbftLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*PbftMessage {
found := []*PbftMessage{}
func (log *FBFTLog) GetMessagesByTypeSeqView(typ msg_pb.MessageType, blockNum uint64, viewID uint64) []*FBFTMessage {
found := []*FBFTMessage{}
it := log.Messages().Iterator()
for msg := range it.C {
if msg.(*PbftMessage).MessageType != typ || msg.(*PbftMessage).BlockNum != blockNum || msg.(*PbftMessage).ViewID != viewID {
if msg.(*FBFTMessage).MessageType != typ || msg.(*FBFTMessage).BlockNum != blockNum || msg.(*FBFTMessage).ViewID != viewID {
continue
}
found = append(found, msg.(*PbftMessage))
found = append(found, msg.(*FBFTMessage))
}
return found
}
// FindMessageByMaxViewID returns the message that has maximum ViewID
func (log *PbftLog) FindMessageByMaxViewID(msgs []*PbftMessage) *PbftMessage {
func (log *FBFTLog) FindMessageByMaxViewID(msgs []*FBFTMessage) *FBFTMessage {
if len(msgs) == 0 {
return nil
}
@ -218,9 +218,9 @@ func (log *PbftLog) FindMessageByMaxViewID(msgs []*PbftMessage) *PbftMessage {
return msgs[maxIdx]
}
// ParsePbftMessage parses PBFT message into PbftMessage structure
func ParsePbftMessage(msg *msg_pb.Message) (*PbftMessage, error) {
pbftMsg := PbftMessage{}
// ParseFBFTMessage parses FBFT message into FBFTMessage structure
func ParseFBFTMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
pbftMsg := FBFTMessage{}
pbftMsg.MessageType = msg.GetType()
consensusMsg := msg.GetConsensus()
@ -241,9 +241,9 @@ func ParsePbftMessage(msg *msg_pb.Message) (*PbftMessage, error) {
return &pbftMsg, nil
}
// ParseViewChangeMessage parses view change message into PbftMessage structure
func ParseViewChangeMessage(msg *msg_pb.Message) (*PbftMessage, error) {
pbftMsg := PbftMessage{}
// ParseViewChangeMessage parses view change message into FBFTMessage structure
func ParseViewChangeMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
pbftMsg := FBFTMessage{}
pbftMsg.MessageType = msg.GetType()
if pbftMsg.MessageType != msg_pb.MessageType_VIEWCHANGE {
return nil, fmt.Errorf("ParseViewChangeMessage: incorrect message type %s", pbftMsg.MessageType)
@ -286,27 +286,27 @@ func ParseViewChangeMessage(msg *msg_pb.Message) (*PbftMessage, error) {
return &pbftMsg, nil
}
// ParseNewViewMessage parses new view message into PbftMessage structure
func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*PbftMessage, error) {
pbftMsg := PbftMessage{}
pbftMsg.MessageType = msg.GetType()
// ParseNewViewMessage parses new view message into FBFTMessage structure
func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*FBFTMessage, error) {
FBFTMsg := FBFTMessage{}
FBFTMsg.MessageType = msg.GetType()
if pbftMsg.MessageType != msg_pb.MessageType_NEWVIEW {
return nil, fmt.Errorf("ParseNewViewMessage: incorrect message type %s", pbftMsg.MessageType)
if FBFTMsg.MessageType != msg_pb.MessageType_NEWVIEW {
return nil, fmt.Errorf("ParseNewViewMessage: incorrect message type %s", FBFTMsg.MessageType)
}
vcMsg := msg.GetViewchange()
pbftMsg.ViewID = vcMsg.ViewId
pbftMsg.BlockNum = vcMsg.BlockNum
pbftMsg.Payload = make([]byte, len(vcMsg.Payload))
copy(pbftMsg.Payload[:], vcMsg.Payload[:])
FBFTMsg.ViewID = vcMsg.ViewId
FBFTMsg.BlockNum = vcMsg.BlockNum
FBFTMsg.Payload = make([]byte, len(vcMsg.Payload))
copy(FBFTMsg.Payload[:], vcMsg.Payload[:])
pubKey, err := bls_cosi.BytesToBlsPublicKey(vcMsg.SenderPubkey)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to parse senderpubkey")
return nil, err
}
pbftMsg.SenderPubkey = pubKey
FBFTMsg.SenderPubkey = pubKey
if len(vcMsg.M3Aggsigs) > 0 {
m3Sig := bls.Sign{}
@ -315,14 +315,14 @@ func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*PbftMessa
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M3 viewID signature")
return nil, err
}
m3mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil)
m3mask, err := bls_cosi.NewMask(consensus.Decider.Participants(), nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err
}
m3mask.SetMask(vcMsg.M3Bitmap)
pbftMsg.M3AggSig = &m3Sig
pbftMsg.M3Bitmap = m3mask
FBFTMsg.M3AggSig = &m3Sig
FBFTMsg.M3Bitmap = m3mask
}
if len(vcMsg.M2Aggsigs) > 0 {
@ -332,15 +332,15 @@ func (consensus *Consensus) ParseNewViewMessage(msg *msg_pb.Message) (*PbftMessa
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to deserialize the multi signature for M2 aggregated signature")
return nil, err
}
m2mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil)
m2mask, err := bls_cosi.NewMask(consensus.Decider.Participants(), nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("ParseViewChangeMessage failed to create mask for multi signature")
return nil, err
}
m2mask.SetMask(vcMsg.M2Bitmap)
pbftMsg.M2AggSig = &m2Sig
pbftMsg.M2Bitmap = m2mask
FBFTMsg.M2AggSig = &m2Sig
FBFTMsg.M2Bitmap = m2mask
}
return &pbftMsg, nil
return &FBFTMsg, nil
}

@ -6,6 +6,8 @@ import (
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/api/proto"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
@ -19,9 +21,12 @@ func constructAnnounceMessage(t *testing.T) []byte {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := New(host, 0, leader, bls.RandPrivateKey())
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := New(
host, values.BeaconChainShardID, leader, bls.RandPrivateKey(), decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
t.Fatalf("Cannot create consensus: %v", err)
}
consensus.blockHash = [32]byte{}
@ -39,21 +44,21 @@ func getConsensusMessage(payload []byte) (*msg_pb.Message, error) {
return msg, nil
}
func TestParsePbftMessage(t *testing.T) {
func TestParseFBFTMessage(t *testing.T) {
payload := constructAnnounceMessage(t)
msg, err := getConsensusMessage(payload)
if err != nil {
t.Error("create consensus message error")
}
_, err = ParsePbftMessage(msg)
_, err = ParseFBFTMessage(msg)
if err != nil {
t.Error("unable to parse PbftMessage")
t.Error("unable to parse FBFTMessage")
}
}
func TestGetMessagesByTypeSeqViewHash(t *testing.T) {
pbftMsg := PbftMessage{MessageType: msg_pb.MessageType_ANNOUNCE, BlockNum: 2, ViewID: 3, BlockHash: [32]byte{01, 02}}
log := NewPbftLog()
pbftMsg := FBFTMessage{MessageType: msg_pb.MessageType_ANNOUNCE, BlockNum: 2, ViewID: 3, BlockHash: [32]byte{01, 02}}
log := NewFBFTLog()
log.AddMessage(&pbftMsg)
found := log.GetMessagesByTypeSeqViewHash(msg_pb.MessageType_ANNOUNCE, 2, 3, [32]byte{01, 02})
@ -68,8 +73,8 @@ func TestGetMessagesByTypeSeqViewHash(t *testing.T) {
}
func TestHasMatchingAnnounce(t *testing.T) {
pbftMsg := PbftMessage{MessageType: msg_pb.MessageType_ANNOUNCE, BlockNum: 2, ViewID: 3, BlockHash: [32]byte{01, 02}}
log := NewPbftLog()
pbftMsg := FBFTMessage{MessageType: msg_pb.MessageType_ANNOUNCE, BlockNum: 2, ViewID: 3, BlockHash: [32]byte{01, 02}}
log := NewFBFTLog()
log.AddMessage(&pbftMsg)
found := log.HasMatchingViewAnnounce(2, 3, [32]byte{01, 02})
if !found {

@ -0,0 +1,235 @@
package quorum
import (
"github.com/harmony-one/bls/ffi/go/bls"
)
// Phase is a phase that needs quorum to proceed
type Phase byte
const (
// Prepare ..
Prepare Phase = iota
// Commit ..
Commit
// ViewChange ..
ViewChange
)
// Policy is the rule we used to decide is quorum achieved
type Policy byte
const (
// SuperMajorityVote is a 2/3s voting mechanism, pre-PoS
SuperMajorityVote Policy = iota
// SuperMajorityStake is 2/3s of total staked amount for epoch
SuperMajorityStake
)
// ParticipantTracker ..
type ParticipantTracker interface {
Participants() []*bls.PublicKey
IndexOf(*bls.PublicKey) int
ParticipantsCount() int64
NextAfter(*bls.PublicKey) (bool, *bls.PublicKey)
UpdateParticipants(pubKeys []*bls.PublicKey)
DumpParticipants() []string
}
// SignatoryTracker ..
type SignatoryTracker interface {
ParticipantTracker
AddSignature(p Phase, PubKey *bls.PublicKey, sig *bls.Sign)
// Caller assumes concurrency protection
SignatoriesCount(Phase) int64
Reset([]Phase)
}
// SignatureReader ..
type SignatureReader interface {
SignatoryTracker
ReadAllSignatures(Phase) []*bls.Sign
ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign
}
// These maps represent the signatories (validators), keys are BLS public keys
// and values are BLS private key signed signatures
type cIdentities struct {
// Public keys of the committee including leader and validators
publicKeys []*bls.PublicKey
prepare map[string]*bls.Sign
commit map[string]*bls.Sign
// viewIDSigs: every validator
// sign on |viewID|blockHash| in view changing message
viewID map[string]*bls.Sign
}
func (s *cIdentities) IndexOf(pubKey *bls.PublicKey) int {
idx := -1
for k, v := range s.publicKeys {
if v.IsEqual(pubKey) {
idx = k
}
}
return idx
}
func (s *cIdentities) NextAfter(pubKey *bls.PublicKey) (bool, *bls.PublicKey) {
found := false
idx := s.IndexOf(pubKey)
if idx != -1 {
found = true
}
idx = (idx + 1) % int(s.ParticipantsCount())
return found, s.publicKeys[idx]
}
func (s *cIdentities) Participants() []*bls.PublicKey {
return s.publicKeys
}
func (s *cIdentities) UpdateParticipants(pubKeys []*bls.PublicKey) {
s.publicKeys = append(pubKeys[:0:0], pubKeys...)
}
func (s *cIdentities) DumpParticipants() []string {
keys := make([]string, len(s.publicKeys))
for i := 0; i < len(s.publicKeys); i++ {
keys[i] = s.publicKeys[i].SerializeToHexStr()
}
return keys
}
func (s *cIdentities) ParticipantsCount() int64 {
return int64(len(s.publicKeys))
}
func (s *cIdentities) SignatoriesCount(p Phase) int64 {
switch p {
case Prepare:
return int64(len(s.prepare))
case Commit:
return int64(len(s.commit))
case ViewChange:
return int64(len(s.viewID))
default:
return 0
}
}
func (s *cIdentities) AddSignature(p Phase, PubKey *bls.PublicKey, sig *bls.Sign) {
hex := PubKey.SerializeToHexStr()
switch p {
case Prepare:
s.prepare[hex] = sig
case Commit:
s.commit[hex] = sig
case ViewChange:
s.viewID[hex] = sig
}
}
func (s *cIdentities) Reset(ps []Phase) {
for _, p := range ps {
switch m := map[string]*bls.Sign{}; p {
case Prepare:
s.prepare = m
case Commit:
s.commit = m
case ViewChange:
s.viewID = m
}
}
}
func (s *cIdentities) ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign {
m := map[string]*bls.Sign{}
hex := PubKey.SerializeToHexStr()
switch p {
case Prepare:
m = s.prepare
case Commit:
m = s.commit
case ViewChange:
m = s.viewID
}
payload, ok := m[hex]
if !ok {
return nil
}
return payload
}
func (s *cIdentities) ReadAllSignatures(p Phase) []*bls.Sign {
sigs := []*bls.Sign{}
m := map[string]*bls.Sign{}
switch p {
case Prepare:
m = s.prepare
case Commit:
m = s.commit
case ViewChange:
m = s.viewID
}
for _, sig := range m {
sigs = append(sigs, sig)
}
return sigs
}
func newMapBackedSignatureReader() SignatureReader {
return &cIdentities{
[]*bls.PublicKey{}, map[string]*bls.Sign{},
map[string]*bls.Sign{}, map[string]*bls.Sign{},
}
}
// Decider ..
type Decider interface {
SignatureReader
Policy() Policy
IsQuorumAchieved(Phase) bool
QuorumThreshold() int64
IsRewardThresholdAchieved() bool
}
type uniformVoteWeight struct {
SignatureReader
}
// NewDecider ..
func NewDecider(p Policy) Decider {
switch p {
case SuperMajorityVote:
return &uniformVoteWeight{newMapBackedSignatureReader()}
// case SuperMajorityStake:
default:
// Should not be possible
return nil
}
}
// Policy ..
func (v *uniformVoteWeight) Policy() Policy {
return SuperMajorityVote
}
// IsQuorumAchieved ..
func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool {
return v.SignatoriesCount(p) >= v.QuorumThreshold()
}
// QuorumThreshold ..
func (v *uniformVoteWeight) QuorumThreshold() int64 {
return v.ParticipantsCount()*2/3 + 1
}
// RewardThreshold ..
func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool {
return v.SignatoriesCount(Commit) >= (v.ParticipantsCount() * 9 / 10)
}

@ -9,135 +9,79 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p/host"
)
// PbftPhase PBFT phases: pre-prepare, prepare and commit
type PbftPhase int
// Enum for PbftPhase
const (
Announce PbftPhase = iota
Prepare
Commit
)
// Mode determines whether a node is in normal or viewchanging mode
type Mode int
// Enum for node Mode
const (
Normal Mode = iota
ViewChanging
Syncing
Listening
)
// PbftMode contains mode and viewID of viewchanging
type PbftMode struct {
// State contains current mode and current viewID
type State struct {
mode Mode
viewID uint64
mux sync.Mutex
}
// Mode return the current node mode
func (pm *PbftMode) Mode() Mode {
func (pm *State) Mode() Mode {
return pm.mode
}
//String print mode string
func (mode Mode) String() string {
if mode == Normal {
return "Normal"
} else if mode == ViewChanging {
return "ViewChanging"
} else if mode == Syncing {
return "Syncing"
} else if mode == Listening {
return "Listening"
}
return "Unknown"
}
// String print phase string
func (phase PbftPhase) String() string {
if phase == Announce {
return "Announce"
} else if phase == Prepare {
return "Prepare"
} else if phase == Commit {
return "Commit"
}
return "Unknown"
}
// SetMode set the node mode as required
func (pm *PbftMode) SetMode(m Mode) {
func (pm *State) SetMode(s Mode) {
pm.mux.Lock()
defer pm.mux.Unlock()
pm.mode = m
pm.mode = s
}
// ViewID return the current viewchanging id
func (pm *PbftMode) ViewID() uint64 {
func (pm *State) ViewID() uint64 {
return pm.viewID
}
// SetViewID sets the viewchanging id accordingly
func (pm *PbftMode) SetViewID(viewID uint64) {
func (pm *State) SetViewID(viewID uint64) {
pm.mux.Lock()
defer pm.mux.Unlock()
pm.viewID = viewID
}
// GetViewID returns the current viewchange viewID
func (pm *PbftMode) GetViewID() uint64 {
func (pm *State) GetViewID() uint64 {
return pm.viewID
}
// switchPhase will switch PbftPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(desirePhase PbftPhase, override bool) {
// switchPhase will switch FBFTPhase to nextPhase if the desirePhase equals the nextPhase
func (consensus *Consensus) switchPhase(desired FBFTPhase, override bool) {
if override {
consensus.phase = desirePhase
consensus.phase = desired
return
}
var nextPhase PbftPhase
var nextPhase FBFTPhase
switch consensus.phase {
case Announce:
nextPhase = Prepare
case Prepare:
nextPhase = Commit
case Commit:
nextPhase = Announce
}
if nextPhase == desirePhase {
case FBFTAnnounce:
nextPhase = FBFTPrepare
case FBFTPrepare:
nextPhase = FBFTCommit
case FBFTCommit:
nextPhase = FBFTAnnounce
}
if nextPhase == desired {
consensus.phase = nextPhase
}
}
// GetNextLeaderKey uniquely determine who is the leader for given viewID
func (consensus *Consensus) GetNextLeaderKey() *bls.PublicKey {
idx := consensus.getIndexOfPubKey(consensus.LeaderPubKey)
if idx == -1 {
wasFound, next := consensus.Decider.NextAfter(consensus.LeaderPubKey)
if !wasFound {
utils.Logger().Warn().
Str("key", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("GetNextLeaderKey: currentLeaderKey not found")
}
idx = (idx + 1) % len(consensus.PublicKeys)
return consensus.PublicKeys[idx]
}
func (consensus *Consensus) getIndexOfPubKey(pubKey *bls.PublicKey) int {
for k, v := range consensus.PublicKeys {
if v.IsEqual(pubKey) {
return k
}
}
return -1
return next
}
// ResetViewChangeState reset the state for viewchange
@ -145,18 +89,18 @@ func (consensus *Consensus) ResetViewChangeState() {
utils.Logger().Debug().
Str("Phase", consensus.phase.String()).
Msg("[ResetViewChangeState] Resetting view change state")
consensus.mode.SetMode(Normal)
bhpBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
nilBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
viewIDBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, nil)
consensus.current.SetMode(Normal)
members := consensus.Decider.Participants()
bhpBitmap, _ := bls_cosi.NewMask(members, nil)
nilBitmap, _ := bls_cosi.NewMask(members, nil)
viewIDBitmap, _ := bls_cosi.NewMask(members, nil)
consensus.bhpBitmap = bhpBitmap
consensus.nilBitmap = nilBitmap
consensus.viewIDBitmap = viewIDBitmap
consensus.m1Payload = []byte{}
consensus.bhpSigs = map[string]*bls.Sign{}
consensus.nilSigs = map[string]*bls.Sign{}
consensus.viewIDSigs = map[string]*bls.Sign{}
consensus.Decider.Reset([]quorum.Phase{quorum.ViewChange})
}
func createTimeout() map[TimeoutType]*utils.Timeout {
@ -174,8 +118,8 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
}
consensus.consensusTimeout[timeoutConsensus].Stop()
consensus.consensusTimeout[timeoutBootstrap].Stop()
consensus.mode.SetMode(ViewChanging)
consensus.mode.SetViewID(viewID)
consensus.current.SetMode(ViewChanging)
consensus.current.SetViewID(viewID)
consensus.LeaderPubKey = consensus.GetNextLeaderKey()
diff := viewID - consensus.viewID
@ -187,12 +131,16 @@ func (consensus *Consensus) startViewChange(viewID uint64) {
Msg("[startViewChange]")
msgToSend := consensus.constructViewChangeMessage()
consensus.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
consensus.host.SendMessageToGroups([]nodeconfig.GroupID{
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
host.ConstructP2pMessage(byte(17), msgToSend),
)
consensus.consensusTimeout[timeoutViewChange].SetDuration(duration)
consensus.consensusTimeout[timeoutViewChange].Start()
utils.Logger().Debug().
Uint64("ViewChangingID", consensus.mode.ViewID()).
Uint64("ViewChangingID", consensus.current.ViewID()).
Msg("[startViewChange] start view change timer")
}
@ -207,10 +155,10 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
if len(consensus.viewIDSigs) >= consensus.Quorum() {
if consensus.Decider.IsQuorumAchieved(quorum.ViewChange) {
utils.Logger().Debug().
Int("have", len(consensus.viewIDSigs)).
Int("need", consensus.Quorum()).
Int64("have", consensus.Decider.SignatoriesCount(quorum.ViewChange)).
Int64("need", consensus.Decider.QuorumThreshold()).
Str("validatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()).
Msg("[onViewChange] Received Enough View Change Messages")
return
@ -238,9 +186,10 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
if consensus.mode.Mode() == ViewChanging && consensus.mode.ViewID() > recvMsg.ViewID {
if consensus.current.Mode() == ViewChanging &&
consensus.current.ViewID() > recvMsg.ViewID {
utils.Logger().Warn().
Uint64("MyViewChangingID", consensus.mode.ViewID()).
Uint64("MyViewChangingID", consensus.current.ViewID()).
Uint64("MsgViewChangingID", recvMsg.ViewID).
Msg("[onViewChange] ViewChanging ID Is Low")
return
@ -259,8 +208,10 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
_, ok2 := consensus.bhpSigs[consensus.PubKey.SerializeToHexStr()]
if !(ok1 || ok2) {
// add own signature for newview message
preparedMsgs := consensus.PbftLog.GetMessagesByTypeSeq(msg_pb.MessageType_PREPARED, recvMsg.BlockNum)
preparedMsg := consensus.PbftLog.FindMessageByMaxViewID(preparedMsgs)
preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msg_pb.MessageType_PREPARED, recvMsg.BlockNum,
)
preparedMsg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs)
if preparedMsg == nil {
utils.Logger().Debug().Msg("[onViewChange] add my M2(NIL) type messaage")
consensus.nilSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(NIL)
@ -273,11 +224,13 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
}
}
// add self m3 type message signature and bitmap
_, ok3 := consensus.viewIDSigs[consensus.PubKey.SerializeToHexStr()]
if !ok3 {
signature := consensus.Decider.ReadSignature(quorum.ViewChange, consensus.PubKey)
if signature == nil {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
consensus.viewIDSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(viewIDBytes)
consensus.Decider.AddSignature(
quorum.ViewChange, consensus.PubKey, consensus.priKey.SignHash(viewIDBytes),
)
consensus.viewIDBitmap.SetKey(consensus.PubKey, true)
}
@ -329,10 +282,9 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
// check has 2f+1 signature in m1 type message
if count := utils.CountOneBits(mask.Bitmap); count < consensus.Quorum() {
utils.Logger().Debug().
Int("need", consensus.Quorum()).
Int("have", count).
need := consensus.Decider.QuorumThreshold()
if count := utils.CountOneBits(mask.Bitmap); count < need {
utils.Logger().Debug().Int64("need", need).Int64("have", count).
Msg("[onViewChange] M1 Payload Not Have Enough Signature")
return
}
@ -349,14 +301,18 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
if len(consensus.m1Payload) == 0 {
consensus.m1Payload = append(recvMsg.Payload[:0:0], recvMsg.Payload...)
// create prepared message for new leader
preparedMsg := PbftMessage{MessageType: msg_pb.MessageType_PREPARED, ViewID: recvMsg.ViewID, BlockNum: recvMsg.BlockNum}
preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED,
ViewID: recvMsg.ViewID,
BlockNum: recvMsg.BlockNum,
}
preparedMsg.BlockHash = common.Hash{}
copy(preparedMsg.BlockHash[:], recvMsg.Payload[:32])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = consensus.PubKey
utils.Logger().Info().Msg("[onViewChange] New Leader Prepared Message Added")
consensus.PbftLog.AddMessage(&preparedMsg)
consensus.FBFTLog.AddMessage(&preparedMsg)
}
}
utils.Logger().Debug().
@ -367,8 +323,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
}
// check and add viewID (m3 type) message signature
_, ok := consensus.viewIDSigs[senderKey.SerializeToHexStr()]
if ok {
sig := consensus.Decider.ReadSignature(quorum.ViewChange, senderKey)
if sig != nil {
utils.Logger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Already Received M3(ViewID) message from the validator")
@ -385,16 +341,17 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
utils.Logger().Debug().
Str("validatorPubKey", senderKey.SerializeToHexStr()).
Msg("[onViewChange] Add M3 (ViewID) type message")
consensus.viewIDSigs[senderKey.SerializeToHexStr()] = recvMsg.ViewidSig
consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true) // Set the bitmap indicating that this validator signed.
consensus.Decider.AddSignature(quorum.ViewChange, senderKey, recvMsg.ViewidSig)
// Set the bitmap indicating that this validator signed.
consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true)
utils.Logger().Debug().
Int("numSigs", len(consensus.viewIDSigs)).
Int("needed", consensus.Quorum()).
Int64("numSigs", consensus.Decider.SignatoriesCount(quorum.ViewChange)).
Int64("needed", consensus.Decider.QuorumThreshold()).
Msg("[onViewChange]")
// received enough view change messages, change state to normal consensus
if len(consensus.viewIDSigs) >= consensus.Quorum() {
consensus.mode.SetMode(Normal)
if consensus.Decider.IsQuorumAchieved(quorum.ViewChange) {
consensus.current.SetMode(Normal)
consensus.LeaderPubKey = consensus.PubKey
consensus.ResetState()
if len(consensus.m1Payload) == 0 {
@ -404,30 +361,36 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
} else {
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Str("To", FBFTCommit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(Commit, true)
consensus.switchPhase(FBFTCommit, true)
copy(consensus.blockHash[:], consensus.m1Payload[:32])
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
utils.Logger().Error().Err(err).Msg("[onViewChange] ReadSignatureBitmapPayload Fail")
utils.Logger().Error().Err(err).
Msg("[onViewChange] ReadSignatureBitmapPayload Fail")
return
}
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
// Leader sign and add commit message
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, consensus.blockNum)
commitPayload := append(blockNumBytes, consensus.blockHash[:]...)
consensus.commitSigs[consensus.PubKey.SerializeToHexStr()] = consensus.priKey.SignHash(commitPayload)
consensus.Decider.AddSignature(
quorum.Commit, consensus.PubKey, consensus.priKey.SignHash(commitPayload),
)
if err = consensus.commitBitmap.SetKey(consensus.PubKey, true); err != nil {
utils.Logger().Debug().Msg("[OnViewChange] New Leader commit bitmap set failed")
utils.Logger().Debug().
Msg("[OnViewChange] New Leader commit bitmap set failed")
return
}
}
consensus.mode.SetViewID(recvMsg.ViewID)
consensus.current.SetViewID(recvMsg.ViewID)
msgToSend := consensus.constructNewViewMessage()
utils.Logger().Warn().
@ -441,7 +404,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.consensusTimeout[timeoutViewChange].Stop()
consensus.consensusTimeout[timeoutConsensus].Start()
utils.Logger().Debug().
Uint64("viewChangingID", consensus.mode.ViewID()).
Uint64("viewChangingID", consensus.current.ViewID()).
Msg("[onViewChange] New Leader Start Consensus Timer and Stop View Change Timer")
utils.Logger().Debug().
Str("myKey", consensus.PubKey.SerializeToHexStr()).
@ -482,10 +445,9 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
// check total number of sigs >= 2f+1
if count := utils.CountOneBits(m3Mask.Bitmap); count < consensus.Quorum() {
utils.Logger().Debug().
Int("need", consensus.Quorum()).
Int("have", count).
need := consensus.Decider.QuorumThreshold()
if count := utils.CountOneBits(m3Mask.Bitmap); count < need {
utils.Logger().Debug().Int64("need", need).Int64("have", count).
Msg("[onNewView] Not Have Enough M3 (ViewID) Signature")
return
}
@ -504,45 +466,54 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
utils.Logger().Debug().Msg("[onNewView] M2AggSig (NIL) is Not Empty")
m2Sig := recvMsg.M2AggSig
if !m2Sig.VerifyHash(m2Mask.AggregatePublic, NIL) {
utils.Logger().Warn().Msg("[onNewView] Unable to Verify Aggregated Signature of M2 (NIL) payload")
utils.Logger().Warn().
Msg("[onNewView] Unable to Verify Aggregated Signature of M2 (NIL) payload")
return
}
}
// check when M3 sigs > M2 sigs, then M1 (recvMsg.Payload) should not be empty
if m2Mask == nil || m2Mask.Bitmap == nil || (m2Mask != nil && m2Mask.Bitmap != nil && utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
if m2Mask == nil || m2Mask.Bitmap == nil ||
(m2Mask != nil && m2Mask.Bitmap != nil &&
utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) {
if len(recvMsg.Payload) <= 32 {
utils.Logger().Debug().Msg("[onNewView] M1 (prepared) Type Payload Not Have Enough Length")
utils.Logger().Debug().
Msg("[onNewView] M1 (prepared) Type Payload Not Have Enough Length")
return
}
// m1 is not empty, check it's valid
blockHash := recvMsg.Payload[:32]
aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32)
if err != nil {
utils.Logger().Error().Err(err).Msg("[onNewView] ReadSignatureBitmapPayload Failed")
utils.Logger().Error().Err(err).
Msg("[onNewView] ReadSignatureBitmapPayload Failed")
return
}
if !aggSig.VerifyHash(mask.AggregatePublic, blockHash) {
utils.Logger().Warn().Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message")
utils.Logger().Warn().
Msg("[onNewView] Failed to Verify Signature for M1 (prepare) message")
return
}
copy(consensus.blockHash[:], blockHash)
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
// create prepared message from newview
preparedMsg := PbftMessage{MessageType: msg_pb.MessageType_PREPARED, ViewID: recvMsg.ViewID, BlockNum: recvMsg.BlockNum}
preparedMsg := FBFTMessage{
MessageType: msg_pb.MessageType_PREPARED,
ViewID: recvMsg.ViewID,
BlockNum: recvMsg.BlockNum,
}
preparedMsg.BlockHash = common.Hash{}
copy(preparedMsg.BlockHash[:], blockHash[:])
preparedMsg.Payload = make([]byte, len(recvMsg.Payload)-32)
copy(preparedMsg.Payload[:], recvMsg.Payload[32:])
preparedMsg.SenderPubkey = senderKey
consensus.PbftLog.AddMessage(&preparedMsg)
consensus.FBFTLog.AddMessage(&preparedMsg)
}
// newView message verified success, override my state
consensus.viewID = recvMsg.ViewID
consensus.mode.SetViewID(recvMsg.ViewID)
consensus.current.SetViewID(recvMsg.ViewID)
consensus.LeaderPubKey = senderKey
consensus.ResetViewChangeState()
@ -568,9 +539,9 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
consensus.host.SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
utils.Logger().Debug().
Str("From", consensus.phase.String()).
Str("To", Commit.String()).
Str("To", FBFTCommit.String()).
Msg("[OnViewChange] Switching phase")
consensus.switchPhase(Commit, true)
consensus.switchPhase(FBFTCommit, true)
} else {
consensus.ResetState()
utils.Logger().Info().Msg("onNewView === announce")
@ -578,7 +549,8 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
utils.Logger().Debug().
Str("newLeaderKey", consensus.LeaderPubKey.SerializeToHexStr()).
Msg("new leader changed")
utils.Logger().Debug().Msg("validator start consensus timer and stop view change timer")
utils.Logger().Debug().
Msg("validator start consensus timer and stop view change timer")
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.consensusTimeout[timeoutViewChange].Stop()
}

@ -24,11 +24,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/internal/ctxerror"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/params"
)
@ -58,7 +58,7 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin
func (v *BlockValidator) ValidateBody(block *types.Block) error {
// Check whether the block's known, and if not, that it's linkable
if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) {
return values.ErrKnownBlock
return ErrKnownBlock
}
if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {

@ -40,7 +40,6 @@ import (
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/params"
@ -52,7 +51,6 @@ import (
var (
// blockInsertTimer
blockInsertTimer = metrics.NewRegisteredTimer("chain/inserts", nil)
// ErrNoGenesis is the error when there is no genesis.
ErrNoGenesis = errors.New("Genesis not found in chain")
)
@ -1249,7 +1247,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
err = bc.Validator().ValidateBody(block)
}
switch {
case err == values.ErrKnownBlock:
case err == ErrKnownBlock:
// Block and state both already known. However if the current block is below
// this number we did a rollback and we should reimport it nonetheless.
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {

@ -0,0 +1,40 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"github.com/pkg/errors"
)
var (
// ErrKnownBlock is returned when a block to import is already known locally.
ErrKnownBlock = errors.New("block already known")
// ErrGasLimitReached is returned by the gas pool if the amount of gas required
// by a transaction is higher than what's left in the block.
ErrGasLimitReached = errors.New("gas limit reached")
// ErrBlacklistedHash is returned if a block to import is on the blacklist.
ErrBlacklistedHash = errors.New("blacklisted hash")
// ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
// next one expected based on the local chain.
ErrNonceTooHigh = errors.New("nonce too high")
// ErrShardStateNotMatch is returned if the calculated shardState hash not equal that in the block header
ErrShardStateNotMatch = errors.New("shard state root hash not match")
)

@ -19,8 +19,6 @@ package core
import (
"fmt"
"math"
"github.com/harmony-one/harmony/core/values"
)
// GasPool tracks the amount of gas available during execution of the transactions
@ -40,7 +38,7 @@ func (gp *GasPool) AddGas(amount uint64) *GasPool {
// available and returns an error otherwise.
func (gp *GasPool) SubGas(amount uint64) error {
if uint64(*gp) < amount {
return values.ErrGasLimitReached
return ErrGasLimitReached
}
*(*uint64)(gp) -= amount
return nil

@ -22,7 +22,6 @@ import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
@ -171,9 +170,9 @@ func (st *StateTransition) preCheck() error {
nonce := st.state.GetNonce(st.msg.From())
if nonce < st.msg.Nonce() {
return values.ErrNonceTooHigh
return ErrNonceTooHigh
} else if nonce > st.msg.Nonce() {
return values.ErrNonceTooLow
return ErrNonceTooLow
}
}
return st.buyGas()

@ -18,6 +18,7 @@ package core
import (
"context"
"errors"
"fmt"
"math"
"math/big"
@ -29,11 +30,11 @@ import (
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
)
@ -42,6 +43,44 @@ const (
chainHeadChanSize = 10
)
var (
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")
// ErrNonceTooLow is returned if the nonce of a transaction is lower than the
// one present in the local chain.
ErrNonceTooLow = errors.New("nonce too low")
// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrInsufficientFunds is returned if the total cost of executing a transaction
// is higher than the balance of the user's account.
ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
// ErrIntrinsicGas is returned if the transaction is specified to use less gas
// than required to start the invocation.
ErrIntrinsicGas = errors.New("intrinsic gas too low")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")
// ErrNegativeValue is a sanity error to ensure noone is able to specify a
// transaction with a negative value.
ErrNegativeValue = errors.New("negative value")
// ErrOversizedData is returned if the input data of a transaction is greater
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
)
var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
@ -563,42 +602,42 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
if tx.Size() > 32*1024 {
return values.ErrOversizedData
return ErrOversizedData
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return values.ErrNegativeValue
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas < tx.Gas() {
return values.ErrGasLimit
return ErrGasLimit
}
// Make sure the transaction is signed properly
from, err := types.Sender(pool.signer, tx)
if err != nil {
return values.ErrInvalidSender
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return values.ErrUnderpriced
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return values.ErrNonceTooLow
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return values.ErrInsufficientFunds
return ErrInsufficientFunds
}
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return values.ErrIntrinsicGas
return ErrIntrinsicGas
}
return nil
}
@ -634,7 +673,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
Str("price", tx.GasPrice().String()).
Msg("Discarding underpriced transaction")
underpricedTxCounter.Inc(1)
return false, values.ErrUnderpriced
return false, ErrUnderpriced
}
// New transaction is better than our worse ones, make room for it
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
@ -654,7 +693,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, values.ErrReplaceUnderpriced
return false, ErrReplaceUnderpriced
}
// New transaction is better, replace old one
if old != nil {
@ -720,7 +759,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
return false, values.ErrReplaceUnderpriced
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {

@ -34,7 +34,6 @@ import (
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/internal/params"
)
@ -235,27 +234,27 @@ func TestInvalidTransactions(t *testing.T) {
from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1))
if err := pool.AddRemote(tx); err != values.ErrInsufficientFunds {
t.Error("expected", values.ErrInsufficientFunds)
if err := pool.AddRemote(tx); err != ErrInsufficientFunds {
t.Error("expected", ErrInsufficientFunds)
}
balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), tx.GasPrice()))
pool.currentState.AddBalance(from, balance)
if err := pool.AddRemote(tx); err != values.ErrIntrinsicGas {
t.Error("expected", values.ErrIntrinsicGas, "got", err)
if err := pool.AddRemote(tx); err != ErrIntrinsicGas {
t.Error("expected", ErrIntrinsicGas, "got", err)
}
pool.currentState.SetNonce(from, 1)
pool.currentState.AddBalance(from, big.NewInt(0xffffffffffffff))
tx = transaction(0, 100000, key)
if err := pool.AddRemote(tx); err != values.ErrNonceTooLow {
t.Error("expected", values.ErrNonceTooLow)
if err := pool.AddRemote(tx); err != ErrNonceTooLow {
t.Error("expected", ErrNonceTooLow)
}
tx = transaction(1, 100000, key)
pool.gasPrice = big.NewInt(1000)
if err := pool.AddRemote(tx); err != values.ErrUnderpriced {
t.Error("expected", values.ErrUnderpriced, "got", err)
if err := pool.AddRemote(tx); err != ErrUnderpriced {
t.Error("expected", ErrUnderpriced, "got", err)
}
if err := pool.AddLocal(tx); err != nil {
t.Error("expected", nil, "got", err)
@ -325,8 +324,8 @@ func TestTransactionNegativeValue(t *testing.T) {
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, 0, big.NewInt(-1), 100, big.NewInt(1), nil), types.HomesteadSigner{}, key)
from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1))
if err := pool.AddRemote(tx); err != values.ErrNegativeValue {
t.Error("expected", values.ErrNegativeValue, "got", err)
if err := pool.AddRemote(tx); err != ErrNegativeValue {
t.Error("expected", ErrNegativeValue, "got", err)
}
}

@ -186,7 +186,6 @@ type Block struct {
header *block.Header
uncles []*block.Header
transactions Transactions
stakingTransactions staking.StakingTransactions
incomingReceipts CXReceiptsProofs
// caches
@ -350,7 +349,7 @@ func (b *Block) Transactions() Transactions {
// StakingTransactions returns stakingTransactions.
func (b *Block) StakingTransactions() staking.StakingTransactions {
return b.stakingTransactions
return staking.StakingTransactions{}
}
// IncomingReceipts returns verified outgoing receipts

@ -1,79 +0,0 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package values
import (
"github.com/pkg/errors"
)
var (
// ErrKnownBlock is returned when a block to import is already known locally.
ErrKnownBlock = errors.New("block already known")
// ErrGasLimitReached is returned by the gas pool if the amount of gas required
// by a transaction is higher than what's left in the block.
ErrGasLimitReached = errors.New("gas limit reached")
// ErrBlacklistedHash is returned if a block to import is on the blacklist.
ErrBlacklistedHash = errors.New("blacklisted hash")
// ErrNonceTooLow is returned if the nonce of a transaction is lower than the
// one present in the local chain.
ErrNonceTooLow = errors.New("nonce too low")
// ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
// next one expected based on the local chain.
ErrNonceTooHigh = errors.New("nonce too high")
// ErrShardStateNotMatch is returned if the calculated shardState hash not equal that in the block header
ErrShardStateNotMatch = errors.New("shard state root hash not match")
// ErrInvalidChainID when ChainID of signer does not match that of running node
ErrInvalidChainID = errors.New("invalid chain id for signer")
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")
// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrInsufficientFunds is returned if the total cost of executing a transaction
// is higher than the balance of the user's account.
ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
// ErrIntrinsicGas is returned if the transaction is specified to use less gas
// than required to start the invocation.
ErrIntrinsicGas = errors.New("intrinsic gas too low")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")
// ErrNegativeValue is a sanity error to ensure noone is able to specify a
// transaction with a negative value.
ErrNegativeValue = errors.New("negative value")
// ErrOversizedData is returned if the input data of a transaction is greater
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
)

@ -132,7 +132,7 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header)
return errors.Wrapf(err,
"cannot calculate quorum for block %s", header.Number())
}
if count := utils.CountOneBits(mask.Bitmap); count < parentQuorum {
if count := utils.CountOneBits(mask.Bitmap); count < int64(parentQuorum) {
return ctxerror.New("[VerifySeal] Not enough signature in LastCommitSignature from Block Header",
"need", parentQuorum, "got", count)
}
@ -206,7 +206,7 @@ func (e *engineImpl) VerifyHeaderWithSignature(chain engine.ChainReader, header
return errors.Wrapf(err,
"cannot calculate quorum for block %s", header.Number())
}
if count := utils.CountOneBits(mask.Bitmap); count < quorum {
if count := utils.CountOneBits(mask.Bitmap); count < int64(quorum) {
return ctxerror.New("[VerifyHeaderWithSignature] Not enough signature in commitSignature from Block Header",
"need", quorum, "got", count)
}

@ -68,7 +68,7 @@ type ConfigType struct {
beacon GroupID // the beacon group ID
group GroupID // the group ID of the shard (note: for beacon chain node, the beacon and shard group are the same)
client GroupID // the client group ID of the shard
isClient bool // whether this node is a client node, such as wallet/txgen
isClient bool // whether this node is a client node, such as wallet
isBeacon bool // whether this node is beacon node doing consensus or not
ShardID uint32 // ShardID of this node; TODO ek – reviisit when resharding
role Role // Role of the node

@ -11,12 +11,16 @@ import (
"github.com/harmony-one/harmony/accounts"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/values"
internal_common "github.com/harmony-one/harmony/internal/common"
staking "github.com/harmony-one/harmony/staking/types"
"github.com/pkg/errors"
)
var (
// ErrInvalidChainID when ChainID of signer does not match that of running node
errInvalidChainID = errors.New("invalid chain id for signer")
)
// TxHistoryArgs is struct to make GetTransactionsHistory request
type TxHistoryArgs struct {
Address string `json:"address"`
@ -179,7 +183,7 @@ func (s *PublicTransactionPoolAPI) SendRawStakingTransaction(
}
c := s.b.ChainConfig().ChainID
if tx.ChainID().Cmp(c) != 0 {
e := errors.Wrapf(values.ErrInvalidChainID, "current chain id:%s", c.String())
e := errors.Wrapf(errInvalidChainID, "current chain id:%s", c.String())
return common.Hash{}, e
}
return SubmitStakingTransaction(ctx, s.b, tx)
@ -194,7 +198,7 @@ func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encod
}
c := s.b.ChainConfig().ChainID
if tx.ChainID().Cmp(c) != 0 {
e := errors.Wrapf(values.ErrInvalidChainID, "current chain id:%s", c.String())
e := errors.Wrapf(errInvalidChainID, "current chain id:%s", c.String())
return common.Hash{}, e
}
return SubmitTransaction(ctx, s.b, tx)

@ -89,7 +89,7 @@ func countOneBitsInByte(by byte) int {
}
// CountOneBits counts the number of 1 bit in byte array
func CountOneBits(arr []byte) int {
func CountOneBits(arr []byte) int64 {
if arr == nil {
return 0
}
@ -100,5 +100,5 @@ func CountOneBits(arr []byte) int {
for i := range arr {
count += countOneBitsInByte(arr[i])
}
return count
return int64(count)
}

@ -36,7 +36,7 @@ var (
)
// SetLogContext used to print out loggings of node with port and ip.
// Every instance (node, txgen, etc..) needs to set this for logging.
// Every instance (node, etc..) needs to set this for logging.
func SetLogContext(_port, _ip string) {
port = _port
ip = _ip

@ -430,7 +430,7 @@ func (node *Node) startRxPipeline(
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
// start the goroutine to receive client message
// client messages are sent by clients, like txgen, wallet
node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers)
@ -624,7 +624,7 @@ func (node *Node) AddBeaconPeer(p *p2p.Peer) bool {
}
// isBeacon = true if the node is beacon node
// isClient = true if the node light client(txgen,wallet)
// isClient = true if the node light client(wallet)
func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
chanPeer := make(chan p2p.Peer)

@ -12,13 +12,12 @@ To support such behavior, we architecture Node logic with service manager which
Each service needs to implement minimal interace behavior like Start, Stop so that the service manager can handle those operation.
```
```golang
// ServiceInterface is the collection of functions any service needs to implement.
type ServiceInterface interface {
StartService()
StopService()
}
```
### Creating a service.
@ -31,7 +30,7 @@ Since different services may have different ways to be created you may need to h
Action is the input to operate Service Manager. We can send action to action channel of service manager to start or stop a service.
```
```golang
// Action is type of service action.
type Action struct {
action ActionType
@ -49,17 +48,19 @@ Service Manager is very handy to transform a node role from validator to leader
We have enabled libp2p based gossiping using pubsub. Nodes no longer send messages to individual nodes.
All message communication is via SendMessageToGroups function.
* There would be 4 topics for sending and receiving of messages
* **GroupIDBeacon** This topic serves for consensus within the beaconchain
* **GroupIDBeaconClient** This topic serves for receipt of staking transactions by beacon chain and broadcast of blocks (by beacon leader)
* **GroupIDShard** (_under construction_) This topic serves for consensus related and pingpong messages within the shard
* **GroupIDShardClient** (_under construction_) This topic serves to receive transactions from client and send confirmed blocks back to client (like txgen). The shard leader (only) sends back the confirmed blocks.
- There would be 4 topics for sending and receiving of messages
- **GroupIDBeacon** This topic serves for consensus within the beaconchain
- **GroupIDBeaconClient** This topic serves for receipt of staking transactions by beacon chain and broadcast of blocks (by beacon leader)
- **GroupIDShard** (_under construction_) This topic serves for consensus related and pingpong messages within the shard
- **GroupIDShardClient** (_under construction_) This topic serves to receive transactions from client and send confirmed blocks back to client. The shard leader (only) sends back the confirmed blocks.
- Beacon chain nodes need to subscribe to _TWO_ topics
* Beacon chain nodes need to subscribe to _TWO_ topics
* **GroupIDBeacon**
* **GroupIDBeaconClient**.
- **GroupIDBeacon**
- **GroupIDBeaconClient**.
* Every new node other than beacon chain nodes, including txgen and wallet needs to subscribe to _THREE_ topics.
* **GroupIDBeaconClient**
* **GroupIDShard**
* **GroupIDShardClient**
- Every new node other than beacon chain nodes, wallet needs to subscribe to _THREE_ topics.
- **GroupIDBeaconClient**
- **GroupIDShard**
- **GroupIDShardClient**

@ -31,23 +31,26 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
}
if msg.Type == msg_pb.MessageType_COMMITTED {
recvMsg, err := consensus.ParsePbftMessage(msg)
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Error().Err(err).Msg("[Explorer] onCommitted unable to parse msg")
utils.Logger().Error().Err(err).
Msg("[Explorer] onCommitted unable to parse msg")
return
}
aggSig, mask, err := node.Consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0)
aggSig, mask, err := node.Consensus.ReadSignatureBitmapPayload(
recvMsg.Payload, 0,
)
if err != nil {
utils.Logger().Error().Err(err).Msg("[Explorer] readSignatureBitmapPayload failed")
utils.Logger().Error().Err(err).
Msg("[Explorer] readSignatureBitmapPayload failed")
return
}
// check has 2f+1 signatures
if count := utils.CountOneBits(mask.Bitmap); count < node.Consensus.Quorum() {
utils.Logger().Error().
Int("need", node.Consensus.Quorum()).
Int("have", count).
need := node.Consensus.Decider.QuorumThreshold()
if count := utils.CountOneBits(mask.Bitmap); count < need {
utils.Logger().Error().Int64("need", need).Int64("have", count).
Msg("[Explorer] not have enough signature")
return
}
@ -63,13 +66,13 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
return
}
block := node.Consensus.PbftLog.GetBlockByHash(recvMsg.BlockHash)
block := node.Consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash)
if block == nil {
utils.Logger().Info().
Uint64("msgBlock", recvMsg.BlockNum).
Msg("[Explorer] Haven't received the block before the committed msg")
node.Consensus.PbftLog.AddMessage(recvMsg)
node.Consensus.FBFTLog.AddMessage(recvMsg)
return
}
@ -77,7 +80,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
node.commitBlockForExplorer(block)
} else if msg.Type == msg_pb.MessageType_PREPARED {
recvMsg, err := consensus.ParsePbftMessage(msg)
recvMsg, err := consensus.ParseFBFTMessage(msg)
if err != nil {
utils.Logger().Error().Err(err).Msg("[Explorer] Unable to parse Prepared msg")
return
@ -86,10 +89,10 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
blockObj := &types.Block{}
err = rlp.DecodeBytes(block, blockObj)
// Add the block into Pbft log.
node.Consensus.PbftLog.AddBlock(blockObj)
// Add the block into FBFT log.
node.Consensus.FBFTLog.AddBlock(blockObj)
// Try to search for MessageType_COMMITTED message from pbft log.
msgs := node.Consensus.PbftLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_COMMITTED, blockObj.NumberU64(), blockObj.Hash())
msgs := node.Consensus.FBFTLog.GetMessagesByTypeSeqHash(msg_pb.MessageType_COMMITTED, blockObj.NumberU64(), blockObj.Hash())
// If found, then add the new block into blockchain db.
if len(msgs) > 0 {
node.AddNewBlockForExplorer(blockObj)
@ -107,7 +110,7 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) {
node.Consensus.UpdateConsensusInformation()
}
// Clean up the blocks to avoid OOM.
node.Consensus.PbftLog.DeleteBlockByNumber(block.NumberU64())
node.Consensus.FBFTLog.DeleteBlockByNumber(block.NumberU64())
// Do dump all blocks from state syncing for explorer one time
// TODO: some blocks can be dumped before state syncing finished.
// And they would be dumped again here. Please fix it.
@ -137,8 +140,8 @@ func (node *Node) commitBlockForExplorer(block *types.Block) {
curNum := block.NumberU64()
if curNum-100 > 0 {
node.Consensus.PbftLog.DeleteBlocksLessThan(curNum - 100)
node.Consensus.PbftLog.DeleteMessagesLessThan(curNum - 100)
node.Consensus.FBFTLog.DeleteBlocksLessThan(curNum - 100)
node.Consensus.FBFTLog.DeleteMessagesLessThan(curNum - 100)
}
}

@ -79,7 +79,7 @@ func (node *Node) SetupGenesisBlock(db ethdb.Database, shardID uint32, myShardSt
chainConfig = *params.PangaeaChainConfig
fallthrough // the rest is the same as testnet
default: // all other types share testnet config
// Tests account for txgen to use
// Test accounts
node.AddTestingAddresses(genesisAlloc, TestAccountNumber)
// Smart contract deployer account used to deploy initial smart contract
@ -126,7 +126,7 @@ func CreateTestBankKeys(numAddresses int) (keys []*ecdsa.PrivateKey, err error)
}
// AddTestingAddresses create the genesis block allocation that contains deterministically
// generated testing addresses with tokens. This is mostly used for generated simulated transactions in txgen.
// generated testing addresses with tokens.
func (node *Node) AddTestingAddresses(gAlloc core.GenesisAlloc, numAddress int) {
for _, testBankKey := range node.TestBankKeys {
testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey)

@ -4,9 +4,10 @@ import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
@ -22,7 +23,10 @@ func TestAddNewBlock(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, blsKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := consensus.New(
host, values.BeaconChainShardID, leader, blsKey, decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
@ -53,7 +57,10 @@ func TestVerifyNewBlock(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, blsKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := consensus.New(
host, values.BeaconChainShardID, leader, blsKey, decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}

@ -8,19 +8,18 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
bls2 "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/drand"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core/values"
bls2 "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
"github.com/stretchr/testify/assert"
)
var testDBFactory = &shardchain.MemDBFactory{}
@ -34,7 +33,10 @@ func TestNewNode(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, blsKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := consensus.New(
host, values.BeaconChainShardID, leader, blsKey, decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
@ -198,7 +200,10 @@ func TestAddPeers(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, blsKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := consensus.New(
host, values.BeaconChainShardID, leader, blsKey, decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}
@ -245,7 +250,10 @@ func TestAddBeaconPeer(t *testing.T) {
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus, err := consensus.New(host, 0, leader, blsKey)
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensus, err := consensus.New(
host, values.BeaconChainShardID, leader, blsKey, decider,
)
if err != nil {
t.Fatalf("Cannot craeate consensus: %v", err)
}

@ -100,7 +100,7 @@ func CompareBlsPublicKey(k1, k2 BlsPublicKey) int {
return bytes.Compare(k1[:], k2[:])
}
// NodeID represents node id (BLS address) and its voting power, which is set at epoch change only.
// NodeID represents node id (BLS address)
type NodeID struct {
EcdsaAddress common.Address `json:"ecdsa_address"`
BlsPublicKey BlsPublicKey `json:"bls_pubkey"`

@ -85,7 +85,7 @@ It should cover the basic function to pass, to fail, and error conditions.
* test case # : CS1
* description : beacon chain reach consensus
* test procedure : start beacon chain with 50, 100, 150, 200, 250, 300 nodes, start txgen for 300 seconds, check leader log on number of consensuses
* test procedure : start beacon chain with 50, 100, 150, 200, 250, 300 nodes, check leader log on number of consensuses
* passing criteria
* dependency
* note
@ -96,7 +96,7 @@ It should cover the basic function to pass, to fail, and error conditions.
* test case # : DR1
* description : drand generate random number
* test procedure : start beacon chain with 50, 150, 300 nodes, start txgen for 300 seconds, check leader log on the success of generating random number
* test procedure : start beacon chain with 50, 150, 300 nodes, check leader log on the success of generating random number
* passing criteria : random number genreated
* dependency
* note
@ -257,16 +257,6 @@ It should cover the basic function to pass, to fail, and error conditions.
* automated?
---
### transaction stress
* test case # : STX1
* description : txgen send transaction to shard
* test procedure : started beacon chain with 50 nodes, start txgen to send 1,000, 10,000 tx to the shard
* passing criteria
* dependency
* note
* automated?
---
### long running stress
### storage

@ -3,7 +3,7 @@ package types
import (
"math/big"
"github.com/harmony-one/harmony/core/numeric"
"github.com/harmony-one/harmony/numeric"
)
type (

@ -69,11 +69,12 @@ func (d Delegation) GetAmount() *big.Int { return d.Amount }
// String returns a human readable string representation of a Delegation.
func (d Delegation) String() string {
return fmt.Sprintf(`Delegation:
Delegator: %s
Validator: %s
Amount: %s`, d.DelegatorAddress,
d.ValidatorAddress, d.Amount)
return fmt.Sprintf(`
Delegation:
Delegator: %s
Validator: %s
Amount: %s
`, d.DelegatorAddress, d.ValidatorAddress, d.Amount)
}
// Delegations is a collection of delegations
@ -215,11 +216,12 @@ func (d *Redelegation) AddEntry(epoch *big.Int, amt *big.Int) {
// String returns a human readable string representation of a Redelegation.
func (d Redelegation) String() string {
out := fmt.Sprintf(`Redelegations between:
Delegator: %s
Source Validator: %s
Destination Validator: %s
Entries:
out := fmt.Sprintf(`
Redelegations between:
Delegator: %s
Source Validator: %s
Destination Validator: %s
Entries:
`,
d.DelegatorAddress, d.ValidatorSrcAddress, d.ValidatorDstAddress,
)

@ -1,10 +1,11 @@
package types
import (
"fmt"
"math/big"
"github.com/harmony-one/harmony/core/numeric"
"github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
"github.com/pkg/errors"
)
@ -26,15 +27,22 @@ const (
)
var (
directiveKind = [...]string{
"NewValidator", "EditValidator", "Delegate", "Redelegate", "Undelegate",
directiveNames = map[Directive]string{
DirectiveNewValidator: "NewValidator",
DirectiveEditValidator: "EditValidator",
DirectiveDelegate: "Delegate",
DirectiveRedelegate: "Redelegate",
DirectiveUndelegate: "Undelegate",
}
// ErrInvalidStakingKind given when caller gives bad staking message kind
ErrInvalidStakingKind = errors.New("bad staking kind")
)
func (d Directive) String() string {
return directiveKind[d]
if name, ok := directiveNames[d]; ok {
return name
}
return fmt.Sprintf("Directive %+v", byte(d))
}
// NewValidator - type for creating a new validator

@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/crypto/hash"
)
@ -106,12 +105,15 @@ func (s EIP155Signer) Equal(s2 Signer) bool {
return ok && eip155.chainID.Cmp(s.chainID) == 0
}
var big8 = big.NewInt(8)
var (
big8 = big.NewInt(8)
errInvalidChainID = errors.New("invalid chain id for signer")
)
// Sender returns the sender address of the given signer.
func (s EIP155Signer) Sender(tx *StakingTransaction) (common.Address, error) {
if tx.ChainID().Cmp(s.chainID) != 0 {
return common.Address{}, values.ErrInvalidChainID
return common.Address{}, errInvalidChainID
}
V := new(big.Int).Sub(tx.data.V, s.chainIDMul)
V.Sub(V, big8)

@ -34,14 +34,15 @@ type StakingTransaction struct {
from atomic.Value
}
type fulfill func() (Directive, interface{})
// StakeMsgFulfiller is signature of callback intended to produce the StakeMsg
type StakeMsgFulfiller func() (Directive, interface{})
// NewStakingTransaction produces a new staking transaction record
func NewStakingTransaction(
nonce, gasLimit uint64, gasPrice *big.Int, f fulfill,
nonce, gasLimit uint64, gasPrice *big.Int, f StakeMsgFulfiller,
) (*StakingTransaction, error) {
directive, payload := f()
// TODO(Double check that this is legitmate directive)
// TODO(Double check that this is legitmate directive, use type switch)
newStake := &StakingTransaction{data: txdata{
directive,
payload,

@ -33,7 +33,7 @@ type Validator struct {
// description for the validator
Description `json:"description" yaml:"description"`
// Is the validator active in the validating process or not
IsCurrentlyActive bool `json:"active" yaml:"active"`
Active bool `json:"active" yaml:"active"`
}
// Description - some possible IRL connections

Loading…
Cancel
Save