Revert "Run messaging handlings in parallel" (#66)

* Revert "Run messaging handlings in parallel (#57)"

This reverts commit b1a0900563.

* fix lint issues

---------

Co-authored-by: stana-ethernal <stana.miric@ethernal.tech>
pull/68/head
Victor Castell 2 years ago committed by GitHub
parent b1a0900563
commit ec72c80bc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      core/byzantine_test.go
  2. 209
      core/ibft.go
  3. 68
      core/ibft_test.go
  4. 69
      core/state.go
  5. 18
      messages/helpers.go
  6. 30
      messages/proto/helper.go

@ -13,16 +13,12 @@ import (
func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
var (
numNodes uint64 = 6
)
//nolint:dupl
t.Run("malicious hash in proposal", func(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -55,7 +51,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -89,7 +85,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -124,7 +120,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -158,7 +154,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -193,7 +189,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -228,7 +224,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node
@ -263,7 +259,7 @@ func TestByzantineBehaviour(t *testing.T) {
t.Parallel()
cluster := newCluster(
numNodes,
6,
func(c *cluster) {
for _, node := range c.nodes {
currentNode := node

@ -3,6 +3,7 @@ package core
import (
"bytes"
"context"
"errors"
"math"
"sync"
"time"
@ -49,6 +50,10 @@ const (
roundFactorBase = float64(2)
)
var (
errTimeoutExpired = errors.New("round timeout expired")
)
// IBFT represents a single instance of the IBFT state machine
type IBFT struct {
// log is the logger instance
@ -119,7 +124,7 @@ func NewIBFT(
},
seals: make([]*messages.CommittedSeal, 0),
roundStarted: false,
commitSent: false,
name: newRound,
},
baseRoundTimeout: round0Timeout,
}
@ -390,7 +395,7 @@ func (i *IBFT) startRound(ctx context.Context) {
i.log.Debug("pre-prepare message multicasted")
}
i.runReceptions(ctx)
i.runStates(ctx)
}
// waitForRCC waits for valid RCC for the specified height and round
@ -516,37 +521,38 @@ func (i *IBFT) proposalMatchesCertificate(
return true
}
// runReceptions spawn processes to handle message for the round
func (i *IBFT) runReceptions(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
i.runPrePrepare(ctx)
}()
// runStates is the main loop which performs state transitions
func (i *IBFT) runStates(ctx context.Context) {
var timeout error
go func() {
defer wg.Done()
i.runPrepare(ctx)
}()
go func() {
defer wg.Done()
for {
switch i.state.getStateName() {
case newRound:
timeout = i.runNewRound(ctx)
case prepare:
timeout = i.runPrepare(ctx)
case commit:
timeout = i.runCommit(ctx)
case fin:
i.runFin()
// Block inserted without any errors,
// sequence is complete
i.signalRoundDone(ctx)
i.runCommit(ctx)
}()
return
}
wg.Wait()
if timeout != nil {
// Timeout received
return
}
}
}
// runPrePrepare starts reception of PREPREPARE message
func (i *IBFT) runPrePrepare(ctx context.Context) {
i.log.Debug("enter: reception of PREPREPARE message")
defer i.log.Debug("exit: reception of PREPREPARE message")
// runNewRound runs the New Round IBFT state
func (i *IBFT) runNewRound(ctx context.Context) error {
i.log.Debug("enter: new round state")
defer i.log.Debug("exit: new round state")
var (
// Grab the current view
@ -569,25 +575,28 @@ func (i *IBFT) runPrePrepare(ctx context.Context) {
defer i.messages.Unsubscribe(sub.ID)
for {
// SubscriptionDetails conditions have been met,
// grab the proposal messages
proposalMessage := i.handlePrePrepare(view)
if proposalMessage != nil {
select {
case <-ctx.Done():
// Stop signal received, exit
return errTimeoutExpired
case <-sub.SubCh:
// SubscriptionDetails conditions have been met,
// grab the proposal messages
proposalMessage := i.handlePrePrepare(view)
if proposalMessage == nil {
continue
}
// Multicast the PREPARE message
i.acceptProposal(proposalMessage)
i.state.setProposalMessage(proposalMessage)
i.sendPrepareMessage(view)
i.log.Debug("prepare message multicasted")
return
}
// Move to the prepare state
i.state.changeState(prepare)
select {
case <-ctx.Done():
// Stop signal received, exit
return
case <-sub.SubCh:
continue
return nil
}
}
}
@ -750,11 +759,6 @@ func (i *IBFT) validateProposal(msg *proto.Message, view *proto.View) bool {
// handlePrePrepare parses the received proposal and performs
// a transition to PREPARE state, if the proposal is valid
func (i *IBFT) handlePrePrepare(view *proto.View) *proto.Message {
// exit if node has received valid proposal
if i.state.getProposalMessage() != nil {
return nil
}
isValidPrePrepare := func(message *proto.Message) bool {
if view.Round == 0 {
// proposal must be for round 0
@ -777,10 +781,10 @@ func (i *IBFT) handlePrePrepare(view *proto.View) *proto.Message {
return msgs[0]
}
// runPrepare starts reception of PREPARE messages
func (i *IBFT) runPrepare(ctx context.Context) {
i.log.Debug("enter: reception of PREPARE messages")
defer i.log.Debug("exit: reception of PREPARE messages")
// runPrepare runs the Prepare IBFT state
func (i *IBFT) runPrepare(ctx context.Context) error {
i.log.Debug("enter: prepare state")
defer i.log.Debug("exit: prepare state")
var (
// Grab the current view
@ -801,46 +805,24 @@ func (i *IBFT) runPrepare(ctx context.Context) {
defer i.messages.Unsubscribe(sub.ID)
for {
prepareMessages := i.handlePrepare(view)
if prepareMessages != nil {
i.state.finalizePrepare(
&proto.PreparedCertificate{
ProposalMessage: i.state.getProposalMessage(),
PrepareMessages: prepareMessages,
},
i.state.getProposal(),
)
i.state.setCommitSent(true)
// Multicast the COMMIT message
i.sendCommitMessage(view)
i.log.Debug("commit message multicasted")
return
}
// quorum of valid prepare messages not received, retry
select {
case <-ctx.Done():
// Stop signal received, exit
return
return errTimeoutExpired
case <-sub.SubCh:
continue
if !i.handlePrepare(view) {
// quorum of valid prepare messages not received, retry
continue
}
return nil
}
}
}
// handlePrepare parses available prepare messages and performs
// a transition to COMMIT state, if quorum was reached
func (i *IBFT) handlePrepare(view *proto.View) []*proto.Message {
// exit if node has not received a proposal for round yet
// or node has sent commit message already
if i.state.getProposalMessage() == nil || i.state.getCommitSent() {
return nil
}
func (i *IBFT) handlePrepare(view *proto.View) bool {
isValidPrepare := func(message *proto.Message) bool {
// Verify that the proposal hash is valid
return i.backend.IsValidProposalHash(
@ -857,16 +839,29 @@ func (i *IBFT) handlePrepare(view *proto.View) []*proto.Message {
if !i.backend.HasQuorum(view.Height, prepareMessages, proto.MessageType_PREPARE) {
// quorum not reached, keep polling
return nil
return false
}
return prepareMessages
// Multicast the COMMIT message
i.sendCommitMessage(view)
i.log.Debug("commit message multicasted")
i.state.finalizePrepare(
&proto.PreparedCertificate{
ProposalMessage: i.state.getProposalMessage(),
PrepareMessages: prepareMessages,
},
i.state.getProposal(),
)
return true
}
// runCommit starts reception of COMMIT messages
func (i *IBFT) runCommit(ctx context.Context) {
i.log.Debug("enter: reception of COMMIT message")
defer i.log.Debug("exit: reception of COMMIT message")
// runCommit runs the Commit IBFT state
func (i *IBFT) runCommit(ctx context.Context) error {
i.log.Debug("enter: commit state")
defer i.log.Debug("exit: commit state")
var (
// Grab the current view
@ -887,19 +882,17 @@ func (i *IBFT) runCommit(ctx context.Context) {
defer i.messages.Unsubscribe(sub.ID)
for {
if i.handleCommit(view) {
i.signalRoundDone(ctx)
return
}
// quorum not reached, retry
select {
case <-ctx.Done():
// Stop signal received, exit
return
return errTimeoutExpired
case <-sub.SubCh:
continue
if !i.handleCommit(view) {
// quorum not reached, retry
continue
}
return nil
}
}
}
@ -907,10 +900,6 @@ func (i *IBFT) runCommit(ctx context.Context) {
// handleCommit parses available commit messages and performs
// a transition to FIN state, if quorum was reached
func (i *IBFT) handleCommit(view *proto.View) bool {
if i.state.getProposalMessage() == nil {
return false
}
isValidCommit := func(message *proto.Message) bool {
var (
proposalHash = messages.ExtractCommitHash(message)
@ -942,6 +931,17 @@ func (i *IBFT) handleCommit(view *proto.View) bool {
// Set the committed seals
i.state.setCommittedSeals(commitSeals)
// Move to the fin state
i.state.changeState(fin)
return true
}
// runFin runs the fin state (block insertion)
func (i *IBFT) runFin() {
i.log.Debug("enter: fin state")
defer i.log.Debug("exit: fin state")
// Insert the block to the node's underlying
// blockchain layer
i.backend.InsertProposal(
@ -954,11 +954,9 @@ func (i *IBFT) handleCommit(view *proto.View) bool {
// Remove stale messages
i.messages.PruneByHeight(i.state.getHeight())
return true
}
// moveToNewRound changes round and resets state
// moveToNewRound moves the state to the new round
func (i *IBFT) moveToNewRound(round uint64) {
i.state.setView(&proto.View{
Height: i.state.getHeight(),
@ -967,7 +965,7 @@ func (i *IBFT) moveToNewRound(round uint64) {
i.state.setRoundStarted(false)
i.state.setProposalMessage(nil)
i.state.setCommitSent(false)
i.state.changeState(newRound)
}
func (i *IBFT) buildProposal(ctx context.Context, view *proto.View) *proto.Message {
@ -1058,10 +1056,11 @@ func (i *IBFT) buildProposal(ctx context.Context, view *proto.View) *proto.Messa
)
}
// acceptProposal accepts the proposal and saves it into state
// acceptProposal accepts the proposal and moves the state
func (i *IBFT) acceptProposal(proposalMessage *proto.Message) {
// accept newly proposed block
// accept newly proposed block and move to PREPARE state
i.state.setProposalMessage(proposalMessage)
i.state.changeState(prepare)
}
// AddMessage adds a new message to the IBFT message system

@ -288,6 +288,9 @@ func TestRunNewRound_Proposer(t *testing.T) {
i.wg.Wait()
// Make sure the node is in prepare state
assert.Equal(t, prepare, i.state.name)
// Make sure the accepted proposal is the one proposed to other nodes
assert.Equal(t, multicastedProposal, i.state.proposalMessage)
@ -366,7 +369,6 @@ func TestRunNewRound_Proposer(t *testing.T) {
}
},
}
//nolint:dupl
messages = mockMessages{
subscribeFn: func(_ messages.SubscriptionDetails) *messages.Subscription {
return &messages.Subscription{
@ -382,10 +384,6 @@ func TestRunNewRound_Proposer(t *testing.T) {
messageType proto.MessageType,
isValid func(message *proto.Message) bool,
) []*proto.Message {
if messageType != proto.MessageType_ROUND_CHANGE {
return nil
}
return filterMessages(
roundChangeMessages,
isValid,
@ -418,6 +416,9 @@ func TestRunNewRound_Proposer(t *testing.T) {
i.wg.Wait()
// Make sure the node changed the state to prepare
assert.Equal(t, prepare, i.state.name)
// Make sure the multicasted proposal is the accepted proposal
assert.Equal(t, multicastedPreprepare, i.state.proposalMessage)
@ -539,7 +540,6 @@ func TestRunNewRound_Proposer(t *testing.T) {
}
},
}
//nolint:dupl
messages = mockMessages{
subscribeFn: func(_ messages.SubscriptionDetails) *messages.Subscription {
return &messages.Subscription{
@ -555,10 +555,6 @@ func TestRunNewRound_Proposer(t *testing.T) {
messageType proto.MessageType,
isValid func(message *proto.Message) bool,
) []*proto.Message {
if messageType != proto.MessageType_ROUND_CHANGE {
return nil
}
return filterMessages(
roundChangeMessages,
isValid,
@ -591,6 +587,9 @@ func TestRunNewRound_Proposer(t *testing.T) {
i.wg.Wait()
// Make sure the node changed the state to prepare
assert.Equal(t, prepare, i.state.name)
// Make sure the multicasted proposal is the accepted proposal
assert.Equal(t, multicastedPreprepare, i.state.proposalMessage)
@ -693,6 +692,9 @@ func TestRunNewRound_Validator_Zero(t *testing.T) {
i.wg.Wait()
// Make sure the node moves to prepare state
assert.Equal(t, prepare, i.state.name)
// Make sure the accepted proposal is the one that was sent out
assert.Equal(t, correctRoundMessage.proposal, i.state.getProposal())
@ -858,6 +860,9 @@ func TestRunNewRound_Validator_NonZero(t *testing.T) {
i.wg.Wait()
// Make sure the node moves to prepare state
assert.Equal(t, prepare, i.state.name)
// Make sure the accepted proposal is the one that was sent out
assert.Equal(t, correctRoundMessage.proposal, i.state.getProposal())
@ -942,6 +947,7 @@ func TestRunPrepare(t *testing.T) {
)
i := NewIBFT(log, backend, transport)
i.state.name = prepare
i.state.roundStarted = true
i.state.proposalMessage = &proto.Message{
Payload: &proto.Message_PreprepareData{
@ -961,8 +967,8 @@ func TestRunPrepare(t *testing.T) {
i.wg.Wait()
// Make sure the node sent commit message
assert.True(t, i.state.commitSent)
// Make sure the node moves to the commit state
assert.Equal(t, commit, i.state.name)
// Make sure the proposal didn't change
assert.Equal(t, correctRoundMessage.proposal, i.state.getProposal())
@ -1055,7 +1061,7 @@ func TestRunCommit(t *testing.T) {
},
}
i.state.roundStarted = true
i.state.commitSent = true
i.state.name = commit
ctx, cancelFn := context.WithCancel(context.Background())
@ -1083,6 +1089,9 @@ func TestRunCommit(t *testing.T) {
i.wg.Wait()
// Make sure the node changed the state to fin
assert.Equal(t, fin, i.state.name)
// Make sure the inserted proposal was the one present
assert.Equal(t, insertedProposal, correctRoundMessage.proposal.RawProposal)
@ -1286,6 +1295,9 @@ func TestIBFT_MoveToNewRound(t *testing.T) {
// Make sure the proposal is not present
assert.Nil(t, i.state.getProposal())
// Make sure the state is correct
assert.Equal(t, newRound, i.state.name)
})
}
@ -2454,6 +2466,30 @@ func TestIBFT_WatchForFutureRCC(t *testing.T) {
assert.Equal(t, rccRound, receivedRound)
}
// TestState_String makes sure the string representation
// of states is correct
func TestState_String(t *testing.T) {
t.Parallel()
stringMap := map[stateType]string{
newRound: "new round",
prepare: "prepare",
commit: "commit",
fin: "fin",
}
stateTypes := []stateType{
newRound,
prepare,
commit,
fin,
}
for _, stateT := range stateTypes {
assert.Equal(t, stringMap[stateT], stateT.String())
}
}
// TestIBFT_RunSequence_NewProposal verifies that the
// state changes correctly when receiving a higher proposal event
func TestIBFT_RunSequence_NewProposal(t *testing.T) {
@ -2510,6 +2546,9 @@ func TestIBFT_RunSequence_NewProposal(t *testing.T) {
// Make sure the round has been started
assert.True(t, i.state.roundStarted)
// Make sure the state is the prepare state
assert.Equal(t, prepare, i.state.name)
}
// TestIBFT_RunSequence_FutureRCC verifies that the
@ -2556,6 +2595,9 @@ func TestIBFT_RunSequence_FutureRCC(t *testing.T) {
// Make sure the new round has been started
assert.True(t, i.state.roundStarted)
// Make sure the state is the new round state
assert.Equal(t, newRound, i.state.name)
}
// TestIBFT_ExtendRoundTimer makes sure the round timeout

@ -5,10 +5,32 @@ import (
"github.com/0xPolygon/go-ibft/messages"
"github.com/0xPolygon/go-ibft/messages/proto"
)
type stateType uint8
protoBuf "google.golang.org/protobuf/proto"
const (
newRound stateType = iota
prepare
commit
fin
)
func (s stateType) String() string {
switch s {
case newRound:
return "new round"
case prepare:
return "prepare"
case commit:
return "commit"
case fin:
return "fin"
}
return ""
}
type state struct {
sync.RWMutex
@ -31,8 +53,7 @@ type state struct {
// flags for different states
roundStarted bool
// commitSent for current round
commitSent bool
name stateType
}
func (s *state) getView() *proto.View {
@ -51,7 +72,7 @@ func (s *state) clear(height uint64) {
s.seals = nil
s.roundStarted = false
s.commitSent = false
s.name = newRound
s.proposalMessage = nil
s.latestPC = nil
s.latestPreparedProposal = nil
@ -94,9 +115,7 @@ func (s *state) setProposalMessage(proposalMessage *proto.Message) {
s.Lock()
defer s.Unlock()
proposalMsg, _ := protoBuf.Clone(proposalMessage).(*proto.Message)
s.proposalMessage = proposalMsg
s.proposalMessage = proposalMessage
}
func (s *state) getRound() uint64 {
@ -126,6 +145,7 @@ func (s *state) getProposal() *proto.Proposal {
func (s *state) getRawDataFromProposal() []byte {
proposal := s.getProposal()
if proposal != nil {
return proposal.RawProposal
}
@ -140,25 +160,25 @@ func (s *state) getCommittedSeals() []*messages.CommittedSeal {
return s.seals
}
func (s *state) setRoundStarted(started bool) {
s.Lock()
defer s.Unlock()
func (s *state) getStateName() stateType {
s.RLock()
defer s.RUnlock()
s.roundStarted = started
return s.name
}
func (s *state) getCommitSent() bool {
s.RLock()
defer s.RUnlock()
func (s *state) changeState(name stateType) {
s.Lock()
defer s.Unlock()
return s.commitSent
s.name = name
}
func (s *state) setCommitSent(sent bool) {
func (s *state) setRoundStarted(started bool) {
s.Lock()
defer s.Unlock()
s.commitSent = sent
s.roundStarted = started
}
func (s *state) setView(view *proto.View) {
@ -172,11 +192,7 @@ func (s *state) setCommittedSeals(seals []*messages.CommittedSeal) {
s.Lock()
defer s.Unlock()
s.seals = s.seals[:0]
for _, seal := range seals {
s.seals = append(s.seals, seal.Copy())
}
s.seals = seals
}
func (s *state) newRound() {
@ -184,6 +200,8 @@ func (s *state) newRound() {
defer s.Unlock()
if !s.roundStarted {
// Round is not yet started, kick the round off
s.name = newRound
s.roundStarted = true
}
}
@ -195,6 +213,9 @@ func (s *state) finalizePrepare(
s.Lock()
defer s.Unlock()
s.latestPC = certificate.Copy()
s.latestPreparedProposal = latestPPB.Copy()
s.latestPC = certificate
s.latestPreparedProposal = latestPPB
// Move to the commit state
s.name = commit
}

@ -18,20 +18,6 @@ type CommittedSeal struct {
Signature []byte
}
// Copy is a helper method for deep copy of CommittedSeal
func (cs *CommittedSeal) Copy() *CommittedSeal {
signer := make([]byte, len(cs.Signer))
signature := make([]byte, len(cs.Signature))
copy(signer, cs.Signer)
copy(signature, cs.Signature)
return &CommittedSeal{
Signer: signer,
Signature: signature,
}
}
// ExtractCommittedSeals extracts the committed seals from the passed in messages
func ExtractCommittedSeals(commitMessages []*proto.Message) ([]*CommittedSeal, error) {
committedSeals := make([]*CommittedSeal, 0)
@ -50,10 +36,6 @@ func ExtractCommittedSeals(commitMessages []*proto.Message) ([]*CommittedSeal, e
// ExtractCommittedSeal extracts the committed seal from the passed in message
func ExtractCommittedSeal(commitMessage *proto.Message) *CommittedSeal {
if commitMessage.Type != proto.MessageType_COMMIT {
return nil
}
commitData, _ := commitMessage.Payload.(*proto.Message_CommitData)
return &CommittedSeal{

@ -15,33 +15,3 @@ func (m *Message) PayloadNoSig() ([]byte, error) {
return raw, nil
}
// Copy is a helper method for deep copy of Proposal
func (p *Proposal) Copy() *Proposal {
rawProposal := make([]byte, len(p.RawProposal))
copy(rawProposal, p.RawProposal)
return &Proposal{
RawProposal: rawProposal,
Round: p.Round,
}
}
// Copy is a helper method for deep copy of PreparedCertificate
func (pc *PreparedCertificate) Copy() *PreparedCertificate {
proposal, _ := proto.Clone(pc.ProposalMessage).(*Message)
preparedMessages := make([]*Message, len(pc.PrepareMessages))
for idx, pm := range pc.PrepareMessages {
prepareMsg, _ := proto.Clone(pm).(*Message)
preparedMessages[idx] = prepareMsg
}
return &PreparedCertificate{
ProposalMessage: proposal,
PrepareMessages: preparedMessages,
}
}

Loading…
Cancel
Save