HAR-65: support node removal after node is offline

In p2p library, I added a channel notification mechansim when it failed
to send messages after retries. The failed p2p.Peer will be notified to
caller via the channel. Then the leader knows which peer is offline and
adds to an offline list.

Before another consensus is started, the ledaer will check the offline
list and remove the node from the validator list.  Leader will have to
broadcast the Pong message to all validators to sync the public key
list.

Signed-off-by: Leo Chen <leo@harmony.one>
pull/143/head
Leo Chen 6 years ago
parent 4db7858749
commit 53a5c3c856
  1. 2
      beaconchain/libs/beaconchain.go
  2. 2
      client/client.go
  3. 81
      consensus/consensus.go
  4. 16
      consensus/consensus_leader.go
  5. 33
      consensus/consensus_test.go
  6. 2
      newnode/newnode.go
  7. 20
      node/node.go
  8. 14
      node/node_handler.go
  9. 4
      node/p2p.go
  10. 10
      p2p/host/hostv1/hostv1.go
  11. 37
      p2p/host/message.go

@ -63,7 +63,7 @@ func (bc *BeaconChain) AcceptConnections(b []byte) {
response := bcconn.ResponseRandomNumber{NumberOfShards: bc.NumberOfShards, NumberOfNodesAdded: bc.NumberOfNodesAdded, Leaders: bc.Leaders}
msg := bcconn.SerializeRandomInfo(response)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg)
host.SendMessage(bc.host, Node.Self, msgToSend)
host.SendMessage(bc.host, Node.Self, msgToSend, nil)
}
//StartServer a server and process the request by a handler.

@ -124,7 +124,7 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) {
for shardID, txs := range BuildOutputShardTransactionMap(txsToSend) {
host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs))
host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs), nil)
}
}

@ -3,6 +3,7 @@ package consensus // consensus
import (
"fmt"
"reflect"
"strconv"
"sync"
@ -21,6 +22,8 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/utils"
proto_node "github.com/harmony-one/harmony/proto/node"
)
// Consensus data containing all info related to one round of consensus process
@ -99,6 +102,12 @@ type Consensus struct {
// The p2p host used to send/receive p2p messages
host host.Host
// Signal channel for lost validators
OfflinePeers chan p2p.Peer
// List of offline Peers
OfflinePeerList []p2p.Peer
}
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
@ -185,6 +194,9 @@ func New(host host.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Con
consensus.Log = log.New()
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
consensus.OfflinePeers = make(chan p2p.Peer)
go consensus.RemovePeersHandler()
// consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "pubKey", consensus.pubKey)
return &consensus
}
@ -230,6 +242,9 @@ func (consensus *Consensus) ResetState() {
consensus.aggregatedCommitment = nil
consensus.aggregatedFinalCommitment = nil
consensus.secret = map[uint32]kyber.Scalar{}
// Clear the OfflinePeersList again
consensus.OfflinePeerList = make([]p2p.Peer, 0)
}
// Returns a string representation of this consensus
@ -256,18 +271,76 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int {
peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID())
}
consensus.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer)
consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
consensus.pubKeyLock.Unlock()
}
count++
}
return count
}
// RemovePeers will remove the peers from the validator list and PublicKeys
// RemovePeers will remove the peer from the validator list and PublicKeys
// It will be called when leader/node lost connection to peers
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function
return 0
// early return as most of the cases no peers to remove
if len(peers) == 0 {
return 0
}
count := 0
count2 := 0
newList := append(consensus.PublicKeys[:0:0], consensus.PublicKeys...)
for _, peer := range peers {
consensus.validators.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
// We are using peer.IP and peer.Port to identify the unique peer
// FIXME (lc): use a generic way to identify a peer
if p.IP == peer.IP && p.Port == peer.Port {
consensus.validators.Delete(k)
count++
}
return true
}
return false
})
for i, pp := range newList {
// Not Found the pubkey, if found pubkey, ignore it
if reflect.DeepEqual(peer.PubKey, pp) {
// consensus.Log.Debug("RemovePeers", "i", i, "pp", pp, "peer.PubKey", peer.PubKey)
newList = append(newList[:i], newList[i+1:]...)
count2++
}
}
}
if count2 > 0 {
consensus.UpdatePublicKeys(newList)
// Send out Pong messages to everyone in the shard to keep the publickeys in sync
// Or the shard won't be able to reach consensus if public keys are mismatch
validators := consensus.GetValidatorPeers()
pong := proto_node.NewPongMessage(validators, consensus.PublicKeys)
buffer := pong.ConstructPongMessage()
host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers)
}
return count2
}
// RemovePeersHandler is a goroutine to wait on the OfflinePeers channel
// and remove the peers from validator list
func (consensus *Consensus) RemovePeersHandler() {
for {
select {
case p := <-consensus.OfflinePeers:
consensus.OfflinePeerList = append(consensus.OfflinePeerList, p)
}
}
}
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
@ -420,5 +493,5 @@ func (consensus *Consensus) GetNodeID() uint16 {
// SendMessage sends message thru p2p host to peer.
func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) {
host.SendMessage(consensus.host, peer, message)
host.SendMessage(consensus.host, peer, message, nil)
}

@ -38,6 +38,11 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
for { // keep waiting for new blocks
newBlock := <-blockChannel
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
consensus.Log.Debug("WaitForNewBlock", "removed peers", c)
}
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
@ -63,6 +68,11 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc
newBlock := <-blockChannel
// TODO: think about potential race condition
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
consensus.Log.Debug("WaitForNewBlock", "removed peers", c)
}
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
@ -134,7 +144,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.Log.Debug("Stop encoding block")
msgToSend := consensus.constructAnnounceMessage()
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to AnnounceDone
consensus.state = AnnounceDone
consensus.commitByLeader(true)
@ -261,7 +271,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
consensus.responseByLeader(challengeScalar, targetState == ChallengeDone)
// Broadcast challenge message
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to targetState (ChallengeDone or FinalChallengeDone)
consensus.state = targetState
@ -418,7 +428,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
// Start the second round of Cosi
msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
consensus.commitByLeader(false)
} else {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses))

@ -5,6 +5,7 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
"github.com/harmony-one/harmony/utils"
)
func TestNew(test *testing.T) {
@ -24,7 +25,39 @@ func TestNew(test *testing.T) {
test.Error("Consensus ReadySignal should be initialized")
}
if consensus.OfflinePeers == nil {
test.Error("Consensus OfflinePeers should be initialized")
}
if consensus.leader != leader {
test.Error("Consensus Leader is set to wrong Peer")
}
}
func TestRemovePeers(t *testing.T) {
_, pk1 := utils.GenKey("1", "1")
_, pk2 := utils.GenKey("2", "2")
_, pk3 := utils.GenKey("3", "3")
_, pk4 := utils.GenKey("4", "4")
_, pk5 := utils.GenKey("5", "5")
p1 := p2p.Peer{IP: "1", Port: "1", PubKey: pk1}
p2 := p2p.Peer{IP: "2", Port: "2", PubKey: pk2}
p3 := p2p.Peer{IP: "3", Port: "3", PubKey: pk3}
p4 := p2p.Peer{IP: "4", Port: "4", PubKey: pk4}
peers := []p2p.Peer{p1, p2, p3, p4}
peerRemove := []p2p.Peer{p1, p2}
leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5}
host := p2pimpl.NewHost(leader)
consensus := New(host, "0", peers, leader)
// consensus.DebugPrintPublicKeys()
f := consensus.RemovePeers(peerRemove)
if f == 0 {
t.Errorf("consensus.RemovePeers return false")
consensus.DebugPrintPublicKeys()
}
}

@ -89,7 +89,7 @@ checkLoop:
gotShardInfo = true
break checkLoop
} else {
host.SendMessage(node.host, BCPeer, msgToSend)
host.SendMessage(node.host, BCPeer, msgToSend, nil)
}
}
}

@ -48,6 +48,26 @@ const (
NodeLeader // Node is the leader of some shard.
)
func (state State) String() string {
switch state {
case NodeInit:
return "NodeInit"
case NodeWaitToJoin:
return "NodeWaitToJoin"
case NodeJoinedShard:
return "NodeJoinedShard"
case NodeOffline:
return "NodeOffline"
case NodeReadyForConsensus:
return "NodeReadyForConsensus"
case NodeDoingConsensus:
return "NodeDoingConsensus"
case NodeLeader:
return "NodeLeader"
}
return "Unknown"
}
// Constants related to doing syncing.
const (
NotDoingSyncing uint32 = iota

@ -566,7 +566,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
// Broadcast the message to all validators, as publicKeys is updated
// FIXME: HAR-89 use a separate nodefind/neighbor message
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer)
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers)
return len(peers)
}
@ -578,6 +578,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return -1
}
// node.log.Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID())
peers := make([]*p2p.Peer, 0)
for _, p := range pong.Peers {
@ -615,9 +617,13 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, key)
}
node.State = NodeJoinedShard
// Notify JoinShard to stop sending Ping messages
node.StopPing <- struct{}{}
if node.State == NodeWaitToJoin {
node.State = NodeJoinedShard
// Notify JoinShard to stop sending Ping messages
if node.StopPing != nil {
node.StopPing <- struct{}{}
}
}
return node.Consensus.UpdatePublicKeys(publicKeys)
}

@ -7,12 +7,12 @@ import (
// SendMessage sends data to ip, port
func (node *Node) SendMessage(p p2p.Peer, data []byte) {
host.SendMessage(node.host, p, data)
host.SendMessage(node.host, p, data, nil)
}
// BroadcastMessage broadcasts message to peers
func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) {
host.BroadcastMessage(node.host, peers, data)
host.BroadcastMessage(node.host, peers, data, nil)
}
// GetHost returns the p2p host

@ -1,6 +1,7 @@
package hostv1
import (
"fmt"
"io"
"net"
"time"
@ -71,18 +72,17 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) {
func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) {
addr := net.JoinHostPort(peer.IP, peer.Port)
conn, err := net.Dial("tcp", addr)
// log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr)
if err != nil {
log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err)
return
log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err)
return fmt.Errorf("Dail Failed")
}
defer conn.Close()
nw, err := conn.Write(message)
if err != nil {
log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err)
return
return fmt.Errorf("Write Failed")
}
if nw < len(message) {
log.Warn("Write() returned short count",
@ -91,7 +91,7 @@ func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) {
}
// No ack (reply) message from the receiver for now.
return
return nil
}
// Close closes the host

@ -13,14 +13,14 @@ import (
// SendMessage is to connect a socket given a port and send the given message.
// TODO(minhdoan, rj): need to check if a peer is reachable or not.
func SendMessage(host Host, p p2p.Peer, message []byte) {
func SendMessage(host Host, p p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Construct normal p2p message
content := ConstructP2pMessage(byte(0), message)
go send(host, p, content)
go send(host, p, content, lostPeer)
}
// BroadcastMessage sends the message to a list of peers
func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
if len(peers) == 0 {
return
}
@ -32,7 +32,7 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
start := time.Now()
for _, peer := range peers {
peerCopy := peer
go send(h, peerCopy, content)
go send(h, peerCopy, content, lostPeer)
}
log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
@ -43,6 +43,13 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
}
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg, lostPeer)
}
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte {
@ -58,19 +65,10 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte {
return byteBuffer.Bytes()
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg)
log.Info("Done sending from leader")
}
// BroadcastMessageFromValidator sends the message to a list of peers from a validator.
func BroadcastMessageFromValidator(h Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) {
peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast)
BroadcastMessage(h, peers, msg)
log.Info("Done sending from validator")
BroadcastMessage(h, peers, msg, nil)
}
// MaxBroadCast is the maximum number of neighbors to broadcast
@ -89,14 +87,14 @@ func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer {
}
// Send a message to another node with given port.
func send(h Host, peer p2p.Peer, message []byte) {
func send(h Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Add attack code here.
//attack.GetInstance().Run()
backoff := p2p.NewExpBackoff(250*time.Millisecond, 10*time.Second, 2)
backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2)
for trial := 0; trial < 10; trial++ {
var err error
h.SendMessage(peer, message)
err = h.SendMessage(peer, message)
if err == nil {
if trial > 0 {
log.Warn("retry send", "rety", trial)
@ -108,6 +106,11 @@ func send(h Host, peer p2p.Peer, message []byte) {
backoff.Sleep()
}
log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port))
if lostPeer != nil {
// Notify lostPeer channel
lostPeer <- peer
}
}
// DialWithSocketClient joins host port and establishes connection

Loading…
Cancel
Save