Resolve conflict

pull/112/head
Rongjian Lan 6 years ago
commit 5917b971b0
  1. 12
      consensus/consensus.go
  2. 19
      consensus/consensus_leader.go
  3. 3
      consensus/consensus_leader_msg.go
  4. 7
      consensus/consensus_validator.go
  5. 12
      node/node.go
  6. 25
      node/node_handler.go
  7. 26
      node/node_test.go
  8. 2
      p2p/peer.go

@ -179,6 +179,7 @@ func New(selfPeer p2p.Peer, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
consensus.Log = log.New()
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
// consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "pubKey", consensus.pubKey)
return &consensus
}
@ -239,16 +240,16 @@ func (consensus *Consensus) String() string {
// AddPeers will add new peers into the validator map of the consensus
// and add the public keys
func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int {
count := 0
for _, peer := range peers {
_, ok := consensus.validators.Load(utils.GetUniqueIDFromPeer(peer))
_, ok := consensus.validators.Load(utils.GetUniqueIDFromPeer(*peer))
if !ok {
if peer.ValidatorID == -1 {
peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID())
}
consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer)
consensus.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer)
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
}
count++
@ -401,3 +402,8 @@ func (consensus *Consensus) Prepare(chain ChainReader, header *types.Header) err
func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header *types.Header) {
// TODO: implement mining rewards
}
// GetNodeID returns the nodeID
func (consensus *Consensus) GetNodeID() uint16 {
return consensus.nodeID
}

@ -6,6 +6,7 @@ import (
"encoding/gob"
"encoding/hex"
"errors"
"strconv"
"time"
"github.com/ethereum/go-ethereum/rlp"
@ -39,11 +40,12 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
continue
}
// TODO: think about potential race condition
startTime = time.Now()
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
for consensus.state == Finished {
// time.Sleep(500 * time.Millisecond)
consensus.ResetState()
@ -63,10 +65,11 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
continue
}
startTime = time.Now()
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
for consensus.state == Finished {
// time.Sleep(500 * time.Millisecond)
data, err := rlp.EncodeToBytes(newBlock)
@ -222,13 +225,13 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
point := crypto.Ed25519Curve.Point()
point.UnmarshalBinary(commitment)
(*commitments)[validatorID] = point
consensus.Log.Debug("Received new commit message", "num", len(*commitments), "validatorID", validatorID)
consensus.Log.Debug("Received new commit message", "num", len(*commitments), "validatorID", validatorID, "PublicKeys", len(consensus.PublicKeys))
// Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages
bitmap.SetKey(value.PubKey, true)
}
if !shouldProcess {
consensus.Log.Debug("Received new commit message", "validatorID", validatorID)
consensus.Log.Debug("Received additional new commit message", "validatorID", validatorID)
return
}
@ -278,7 +281,7 @@ func (consensus *Consensus) responseByLeader(challenge kyber.Scalar, firstRound
consensus.finalBitmap.SetKey(consensus.pubKey, true)
}
} else {
log.Warn("Failed to generate response", "err", err)
log.Warn("leader failed to generate response", "err", err)
}
}
@ -362,18 +365,18 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
responseScalar.UnmarshalBinary(response)
err := consensus.verifyResponse(commitments, responseScalar, validatorID)
if err != nil {
consensus.Log.Warn("Failed to verify the response", "error", err)
consensus.Log.Warn("leader failed to verify the response", "error", err, "VID", strconv.Itoa(int(validatorID)))
shouldProcess = false
} else {
(*responses)[validatorID] = responseScalar
consensus.Log.Debug("Received new response message", "num", len(*responses), "validatorID", validatorID)
consensus.Log.Debug("Received new response message", "num", len(*responses), "validatorID", strconv.Itoa(int(validatorID)))
// Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages
bitmap.SetKey(value.PubKey, true)
}
}
if !shouldProcess {
consensus.Log.Debug("Received new response message", "validatorID", validatorID)
consensus.Log.Debug("Received new response message", "validatorID", strconv.Itoa(int(validatorID)))
return
}

@ -35,6 +35,7 @@ func (consensus *Consensus) constructAnnounceMessage() []byte {
signature := consensus.signMessage(buffer.Bytes())
buffer.Write(signature)
consensus.Log.Info("New Announce", "NodeID", consensus.nodeID, "bitmap", consensus.bitmap)
return proto_consensus.ConstructConsensusMessage(proto_consensus.Announce, buffer.Bytes())
}
@ -85,6 +86,7 @@ func (consensus *Consensus) constructChallengeMessage(msgTypeToSend proto_consen
signature := consensus.signMessage(buffer.Bytes())
buffer.Write(signature)
consensus.Log.Info("New Challenge", "NodeID", consensus.nodeID, "bitmap", consensus.bitmap)
return proto_consensus.ConstructConsensusMessage(msgTypeToSend, buffer.Bytes()), challengeScalar, aggCommitment
}
@ -115,6 +117,7 @@ func (consensus *Consensus) constructCollectiveSigMessage(collectiveSig [64]byte
signature := consensus.signMessage(buffer.Bytes())
buffer.Write(signature)
consensus.Log.Info("New CollectiveSig", "NodeID", consensus.nodeID, "bitmap", consensus.bitmap)
return proto_consensus.ConstructConsensusMessage(proto_consensus.CollectiveSig, buffer.Bytes())
}

@ -43,7 +43,7 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
// Processes the announce message sent from the leader
func (consensus *Consensus) processAnnounceMessage(payload []byte) {
consensus.Log.Info("Received Announce Message", "Size", len(payload))
consensus.Log.Info("Received Announce Message", "Size", len(payload), "nodeID", consensus.nodeID)
//#### Read payload data
offset := 0
// 4 byte consensus id
@ -239,9 +239,10 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState
response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret[consensusID], receivedChallenge)
if err != nil {
log.Warn("Failed to generate response", "err", err)
log.Warn("validator failed to generate response", "err", err, "priKey", consensus.priKey, "nodeID", consensus.nodeID, "secret", consensus.secret[consensusID])
return
}
msgTypeToSend := proto_consensus.Response
if targetState == FinalResponseDone {
msgTypeToSend = proto_consensus.FinalResponse
@ -340,7 +341,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
// Verify collective signature
err := crypto.Verify(crypto.Ed25519Curve, consensus.PublicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.PublicKeys)/3)+1))
if err != nil {
consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err)
consensus.Log.Warn("Failed to verify the collective sig message", "consensusID", consensusID, "err", err, "bitmap", bitmap, "NodeID", consensus.nodeID, "#PK", len(consensus.PublicKeys))
return
}

@ -50,6 +50,8 @@ const (
const (
// TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus
TimeToSleepForSyncing = time.Second * 30
waitBeforeJoinShard = time.Second * 3
timeOutToJoinShard = time.Minute * 10
)
// NetworkNode ...
@ -325,14 +327,18 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node
}
// AddPeers adds neighbors nodes
func (node *Node) AddPeers(peers []p2p.Peer) int {
func (node *Node) AddPeers(peers []*p2p.Peer) int {
count := 0
for _, p := range peers {
key := fmt.Sprintf("%v", p.PubKey)
_, ok := node.Neighbors.Load(key)
if !ok {
node.Neighbors.Store(key, p)
node.Neighbors.Store(key, *p)
count++
continue
}
if node.SelfPeer.ValidatorID == -1 && p.IP == node.SelfPeer.IP && p.Port == node.SelfPeer.Port {
node.SelfPeer.ValidatorID = p.ValidatorID
}
}
@ -345,7 +351,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
// JoinShard helps a new node to join a shard.
func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, with 10 minutes time-out
backoff := p2p.NewExpBackoff(1*time.Second, 10*time.Minute, 2)
backoff := p2p.NewExpBackoff(waitBeforeJoinShard, timeOutToJoinShard, 2)
for node.State == NodeWaitToJoin {
backoff.Sleep()

@ -522,17 +522,26 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
return -1
}
// Add to Node's peer list
node.AddPeers([]p2p.Peer{*peer})
// Add to Node's peer list anyway
node.AddPeers([]*p2p.Peer{peer})
// Send a Pong message back
peers := node.Consensus.GetValidatorPeers()
pong := proto_node.NewPongMessage(peers, node.Consensus.PublicKeys)
buffer := pong.ConstructPongMessage()
for _, p := range peers {
p2p.SendMessage(p, buffer)
}
// Send a Pong message directly to the sender
// This is necessary because the sender will need to get a ValidatorID
// Just broadcast won't work, some validators won't receive the latest
// PublicKeys as we rely on a valid ValidatorID to do broadcast.
// This is very buggy, but we will move to libp2p, hope the problem will
// be resolved then.
// However, I disable it for now as we are sending redundant PONG messages
// to all validators. This may not be needed. But it maybe add back.
// p2p.SendMessage(*peer, buffer)
// Broadcast the message to all validators, as publicKeys is updated
// FIXME: HAR-89 use a separate nodefind/neighbor message
p2p.BroadcastMessageFromLeader(peers, buffer)
return len(peers)
}
@ -547,7 +556,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
// TODO (lc) state syncing, and wait for all public keys
node.State = NodeJoinedShard
peers := make([]p2p.Peer, 0)
peers := make([]*p2p.Peer, 0)
for _, p := range pong.Peers {
peer := new(p2p.Peer)
@ -561,7 +570,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.log.Error("UnmarshalBinary Failed", "error", err)
continue
}
peers = append(peers, *peer)
peers = append(peers, peer)
}
if len(peers) > 0 {

@ -64,15 +64,15 @@ func TestAddPeers(test *testing.T) {
priKey2 := crypto.Ed25519Curve.Scalar().SetInt64(int64(999))
pubKey2 := pki.GetPublicKeyFromScalar(priKey2)
peers1 := []p2p.Peer{
{
peers1 := []*p2p.Peer{
&p2p.Peer{
IP: "127.0.0.1",
Port: "8888",
PubKey: pubKey1,
Ready: true,
ValidatorID: 1,
},
{
&p2p.Peer{
IP: "127.0.0.1",
Port: "9999",
PubKey: pubKey2,
@ -151,13 +151,13 @@ func exitServer() {
os.Exit(0)
}
// func TestPingPongHandler(test *testing.T) {
// leader := p2p.Peer{IP: "127.0.0.1", Port: "8881"}
// // validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"}
// consensus := consensus.New("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader)
// node := New(consensus, nil)
// // go sendPingMessage(leader)
// go sendPongMessage(leader)
// go exitServer()
// node.StartServer("8881")
// }
func TestPingPongHandler(test *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "8881"}
// validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"}
consensus := consensus.New(leader, "0", []p2p.Peer{leader}, leader)
node := New(consensus, nil, leader)
//go sendPingMessage(leader)
go sendPongMessage(leader)
go exitServer()
node.StartServer("8881")
}

@ -80,14 +80,12 @@ func BroadcastMessageFromLeader(peers []Peer, msg []byte) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(peers, msg)
log.Info("Done sending from leader")
}
// BroadcastMessageFromValidator sends the message to a list of peers from a validator.
func BroadcastMessageFromValidator(selfPeer Peer, peers []Peer, msg []byte) {
peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast)
BroadcastMessage(peers, msg)
log.Info("Done sending from validator")
}
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]

Loading…
Cancel
Save