add minimum pbft view change structure

add timeout class for pbft view change
add README for PBFT process including view change protocol
pull/783/head
chao 6 years ago committed by chaosma
parent f9071da236
commit 994aba8b07
  1. 1
      api/service/consensus/service.go
  2. 100
      consensus/README.md
  3. 9
      consensus/config.go
  4. 83
      consensus/consensus.go
  5. 0
      consensus/consensus_service.go
  6. 0
      consensus/consensus_service_test.go
  7. 61
      consensus/view_change.go
  8. 63
      internal/utils/timer.go
  9. 31
      internal/utils/timer_test.go

@ -27,6 +27,7 @@ func (s *Service) StartService() {
utils.GetLogInstance().Info("Starting consensus service.")
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
// after implementation finished, replace WaitForNewBlock by: s.consensus.Start(s.stopChan, s.stoppedChan)
s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan)
s.consensus.WaitForNewRandomness()
}

@ -1 +1,101 @@
Consensus package includes the Harmony BFT consensus protocol code, which uses BLS-based multi-signature to cosign the new block. The details are in Harmony's new [consensus protocol design](https://talk.harmony.one/t/bls-based-practical-bft-consensus/131).
## Introduction to Harmony BFT with BLS signatures
Harmony BFT consensus protocol consist of normal mode and view changing mode which is same as the PBFT(practical byzantine fault tolerance) protocol. The difference is we use the BLS aggregated signature to reduce O(N^2) communications to O(N), which is more efficient and scalable to traditional PBFT. For brevity, we will still call the whole process as PBFT.
### Normal mode
To reach the consensus of the next block, there are 3 phases: announce(i.e. pre-prepare in PBFT), prepare and commit.
* Announce(leader): The leader broadcasts ANNOUNCE message along with candidate of the next block.
* Prepare(validator): The validator will validate the block sent by leader and send PREPARE message; if the block is invalid, the validator will propose view change. If the prepare timeout, the validator will also propose view change.
* Prepared(leader): The leader will collect 2f+1 PREPARE message including itself and broadcast PREPARED message with the aggregated signature
* Commit(validator): The validator will check the validity of aggregated signature (# of signatures >= 2f+1) and send COMMIT message; if the commit timeout, the validator will also propose view change.
* Committed(leader): The leader will collect 2f+1 COMMIT message including itself and broadcast COMMITTED message with the aggregated signature
* Finalize(leader and validators): Both the leader and validators will finalize the block into blockchain together with 2f+1 aggregated signatures.
### View changing mode
* ViewChange(validator): whenever a validator receives invalid block/signature from the leader, it should send VIEWCHANGE message with view v+1 together with its own prepared message(>=2f+1 aggregated prepare signatures) from previous views.
* NewView(new leader): when the new leader (uniquely determined) collect enough (2f+1) view change messages, it broadcasts the NEWVIEW message with aggregated VIEWCHANGE signatures.
* During the view changing process, if the new leader not send NEWVIEW message on time, the validator will propose ViewChange for the next view v+2 and so on...
## State Machine
The whole process of PBFT can be described as a state machine. We don't separate the roles of leader and validators, instead we use PbftState structure to describe the role and phase of a given node who is joining the consensus process. When a node receives a new message from its peer, its state will be updated. i.e. pbft_state --(upon receive new PbftMessage)--> new_pbft_state. Thus the most nature and clear way is to describe the whole process as state machine.
```
// PbftState holds the state of a node in PBFT process
type PbftState struct {
IsLeader bool
phase PbftPhase // Announce, Prepare(d), Commit(ted)
...
}
// PbftLog stores the data in PBFT process, it will be used in different phases in order to determine whether a new PbftMessage is valid or not.
type PbftLog struct {
blocks []*types.Block
messages []*PbftMessage
}
// entry point and main loop;
// in each loop, the node will receive PBFT message from peers with timeout,
// then update its state accordingly. handleMessageUpdate function handles various kinds of messages and update its state
// it will also send new PBFT message (if not null) to its peers.
// in the same loop, the node will also check whether it should send view changing message to new leader
// finally, it will periodically try to publish new block into blockchain
func (consensus *Consensus) Start(stopChan chan struct{}, stoppedChan chan struct{}) {
defer close(stoppedChan)
tick := time.NewTicker(blockDuration)
consensus.idleTimeout.Start()
for {
select {
default:
msg := consensus.recvWithTimeout(receiveTimeout)
consensus.handleMessageUpdate(msg)
if consensus.idleTimeout.CheckExpire() {
consensus.startViewChange(consensus.consensusID + 1)
}
if consensus.commitTimeout.CheckExpire() {
consensus.startViewChange(consensus.consensusID + 1)
}
if consensus.viewChangeTimeout.CheckExpire() {
if consensus.mode.Mode() == Normal {
continue
}
consensusID := consensus.mode.ConsensusID()
consensus.startViewChange(consensusID + 1)
}
case <-tick.C:
consensus.tryPublishBlock()
case <-stopChan:
return
}
}
}
```

@ -0,0 +1,9 @@
package consensus
import "time"
const (
// blockDuration is the period a node try to publish a new block if it's leader
blockDuration time.Duration = 5 * time.Second
receiveTimeout time.Duration = 5 * time.Second
)

@ -3,6 +3,7 @@ package consensus // consensus
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
@ -15,9 +16,11 @@ import (
// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
// The current state of the consensus
state State
phase PbftPhase
mode PbftMode
//TODO depreciate it after implement PbftPhase
state State
// Commits collected from validators.
prepareSigs map[common.Address]*bls.Sign // key is the validator's address
commitSigs map[common.Address]*bls.Sign // key is the validator's address
@ -55,6 +58,10 @@ type Consensus struct {
SelfAddress common.Address
// Consensus Id (View Id) - 4 byte
consensusID uint32
// newConsensusID is the new consensusID for new leader change
newConsensusID uint32
// Blockhash - 32 byte
blockHash [32]byte
// Block to run consensus on
@ -69,6 +76,27 @@ type Consensus struct {
// global consensus mutex
mutex sync.Mutex
// channel to receive consensus message
msgChan chan []byte
// timer to make sure leader publishes block in a timely manner; if not
// then this node will propose view change
idleTimeout utils.Timeout
// timer to make sure this node commit a block in a timely manner; if not
// this node will initialize a view change
commitTimeout utils.Timeout
// When doing view change, timer to make sure a valid view change message
// sent by new leader in a timely manner; if not this node will start
// a new view change
viewChangeTimeout utils.Timeout
// The duration of viewChangeTimeout; when a view change is initialized with v+1
// timeout will be equal to viewChangeDuration; if view change failed and start v+2
// timeout will be 2*viewChangeDuration; timeout of view change v+n is n*viewChangeDuration
viewChangeDuration time.Duration
// Validator specific fields
// Blocks received but not done with consensus yet
blocksReceived map[uint32]*BlockConsensusStatus
@ -167,6 +195,8 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
consensus.consensusID = 0
consensus.ShardID = ShardID
consensus.msgChan = make(chan []byte)
// For validators to keep track of all blocks received but not yet committed, so as to catch up to latest consensus if lagged behind.
consensus.blocksReceived = make(map[uint32]*BlockConsensusStatus)
@ -185,3 +215,52 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe
// consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "PubKey", consensus.PubKey)
return &consensus, nil
}
// Start is the entry point and main loop for consensus
func (consensus *Consensus) Start(stopChan chan struct{}, stoppedChan chan struct{}) {
defer close(stoppedChan)
tick := time.NewTicker(blockDuration)
consensus.idleTimeout.Start()
for {
select {
default:
msg := consensus.recvWithTimeout(receiveTimeout)
consensus.handleMessageUpdate(msg)
if consensus.idleTimeout.CheckExpire() {
consensus.startViewChange(consensus.consensusID + 1)
}
if consensus.commitTimeout.CheckExpire() {
consensus.startViewChange(consensus.consensusID + 1)
}
if consensus.viewChangeTimeout.CheckExpire() {
if consensus.mode.Mode() == Normal {
continue
}
consensusID := consensus.mode.ConsensusID()
consensus.startViewChange(consensusID + 1)
}
case <-tick.C:
consensus.tryPublishBlock()
case <-stopChan:
return
}
}
}
// recvWithTimeout receives message before timeout
// TODO: cm
func (consensus *Consensus) recvWithTimeout(timeoutDuration time.Duration) []byte {
// use consensus.msgChan to get message
return []byte{}
}
// handleMessageUpdate will update the consensus state according to received message
// TODO: cm
func (consensus *Consensus) handleMessageUpdate(msg []byte) {
}
// tryPublishBlock periodically check if block is finalized and then publish it
// TODO: cm
func (consensus *Consensus) tryPublishBlock() {
}

@ -0,0 +1,61 @@
package consensus
import "sync"
// PbftPhase PBFT phases: pre-prepare, prepare and commit
type PbftPhase int
// Enum for PbftPhase
const (
Annonce PbftPhase = iota
Prepare
Commit
Finish
)
// Mode determines whether a node is in normal or viewchanging mode
type Mode int
// Enum for node Mode
const (
Normal Mode = iota
ViewChanging
)
// PbftMode contains mode and consensusID of viewchanging
type PbftMode struct {
mode Mode
consensusID uint32
mux sync.Mutex
}
// Mode return the current node mode
func (pm *PbftMode) Mode() Mode {
return pm.mode
}
// SetMode set the node mode as required
func (pm *PbftMode) SetMode(m Mode) {
pm.mux.Lock()
defer pm.mux.Unlock()
pm.mode = m
}
// ConsensusID return the current viewchanging id
func (pm *PbftMode) ConsensusID() uint32 {
return pm.consensusID
}
// SetConsensusID sets the viewchanging id accordingly
func (pm *PbftMode) SetConsensusID(consensusID uint32) {
pm.mux.Lock()
defer pm.mux.Unlock()
pm.consensusID = consensusID
}
// startViewChange start a new view change
func (consensus *Consensus) startViewChange(consensusID uint32) {
consensus.mode.SetMode(ViewChanging)
consensus.mode.SetConsensusID(consensusID)
// TODO (cm): implement the actual logic
}

@ -0,0 +1,63 @@
package utils
import (
"time"
)
// TimeoutState indicates the state of Timeout class
type TimeoutState int
// Enum for different TimeoutState
const (
Active TimeoutState = iota
Inactive
Expired
)
// Timeout is the implementation of timeout
type Timeout struct {
state TimeoutState
d time.Duration
start time.Time
}
// NewTimeout creates a new timeout class
func NewTimeout(d time.Duration) *Timeout {
timeout := Timeout{state: Inactive, d: d, start: time.Now()}
return &timeout
}
// Start starts the timeout clock
func (timeout *Timeout) Start() {
timeout.state = Active
timeout.start = time.Now()
}
// Stop stops the timeout clock
func (timeout *Timeout) Stop() {
timeout.state = Inactive
timeout.start = time.Now()
}
// CheckExpire checks whether the timeout is reached/expired
func (timeout *Timeout) CheckExpire() bool {
if timeout.state == Active && time.Since(timeout.start) > timeout.d {
timeout.state = Expired
}
if timeout.state == Expired {
return true
}
return false
}
// Duration returns the duration period of timeout
func (timeout *Timeout) Duration() time.Duration {
return timeout.d
}
// IsActive checks whether timeout clock is active;
// A timeout is active means it's not stopped caused by stop
// and also not expired with time elapses longer than duration from start
func (timeout *Timeout) IsActive() bool {
return timeout.state == Active
}

@ -0,0 +1,31 @@
package utils
import (
"testing"
"time"
)
func TestNewTimeout(t *testing.T) {
timer := NewTimeout(time.Second)
if timer == nil || timer.Duration() != time.Second || timer.IsActive() {
t.Fatalf("timer initialization error")
}
}
func TestCheckExpire(t *testing.T) {
timer := NewTimeout(time.Second)
timer.Start()
time.Sleep(2 * time.Second)
if timer.CheckExpire() == false {
t.Fatalf("CheckExpire should be true")
}
timer.Start()
if timer.CheckExpire() == true {
t.Fatalf("CheckExpire should be false")
}
timer.Stop()
if timer.CheckExpire() == true {
t.Fatalf("CheckExpire should be false")
}
}
Loading…
Cancel
Save