Merge pull request #143 from harmony-one/HAR-65_support_node_removal_after_offline

HAR-65: support node removal after offline
pull/146/head
Leo Chen 6 years ago committed by GitHub
commit 7ba2041ca5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      beaconchain/libs/beaconchain.go
  2. 2
      benchmark.go
  3. 2
      client/client.go
  4. 69
      consensus/consensus.go
  5. 16
      consensus/consensus_leader.go
  6. 29
      consensus/consensus_test.go
  7. 19
      deploy.sh
  8. 2
      newnode/newnode.go
  9. 37
      node/node.go
  10. 14
      node/node_handler.go
  11. 4
      node/p2p.go
  12. 10
      p2p/host/hostv1/hostv1.go
  13. 37
      p2p/host/message.go

@ -63,7 +63,7 @@ func (bc *BeaconChain) AcceptConnections(b []byte) {
response := bcconn.ResponseRandomNumber{NumberOfShards: bc.NumberOfShards, NumberOfNodesAdded: bc.NumberOfNodesAdded, Leaders: bc.Leaders}
msg := bcconn.SerializeRandomInfo(response)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg)
host.SendMessage(bc.host, Node.Self, msgToSend)
host.SendMessage(bc.host, Node.Self, msgToSend, nil)
}
//StartServer a server and process the request by a handler.

@ -180,6 +180,8 @@ func main() {
attack.GetInstance().SetLogger(consensus.Log)
// Current node.
currentNode := node.New(host, consensus, ldb)
currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers
// If there is a client configured in the node list.
if clientPeer != nil {
currentNode.ClientPeer = clientPeer

@ -124,7 +124,7 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) {
for shardID, txs := range BuildOutputShardTransactionMap(txsToSend) {
host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs))
host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs), nil)
}
}

@ -3,6 +3,7 @@ package consensus // consensus
import (
"fmt"
"reflect"
"strconv"
"sync"
@ -21,6 +22,8 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/utils"
proto_node "github.com/harmony-one/harmony/proto/node"
)
// Consensus data containing all info related to one round of consensus process
@ -99,6 +102,12 @@ type Consensus struct {
// The p2p host used to send/receive p2p messages
host host.Host
// Signal channel for lost validators
OfflinePeers chan p2p.Peer
// List of offline Peers
OfflinePeerList []p2p.Peer
}
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
@ -185,6 +194,8 @@ func New(host host.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Con
consensus.Log = log.New()
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
consensus.OfflinePeerList = make([]p2p.Peer, 0)
// consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "pubKey", consensus.pubKey)
return &consensus
}
@ -230,6 +241,9 @@ func (consensus *Consensus) ResetState() {
consensus.aggregatedCommitment = nil
consensus.aggregatedFinalCommitment = nil
consensus.secret = map[uint32]kyber.Scalar{}
// Clear the OfflinePeersList again
consensus.OfflinePeerList = make([]p2p.Peer, 0)
}
// Returns a string representation of this consensus
@ -256,18 +270,65 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int {
peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID())
}
consensus.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer)
consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
consensus.pubKeyLock.Unlock()
}
count++
}
return count
}
// RemovePeers will remove the peers from the validator list and PublicKeys
// RemovePeers will remove the peer from the validator list and PublicKeys
// It will be called when leader/node lost connection to peers
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function
return 0
// early return as most of the cases no peers to remove
if len(peers) == 0 {
return 0
}
count := 0
count2 := 0
newList := append(consensus.PublicKeys[:0:0], consensus.PublicKeys...)
for _, peer := range peers {
consensus.validators.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
// We are using peer.IP and peer.Port to identify the unique peer
// FIXME (lc): use a generic way to identify a peer
if p.IP == peer.IP && p.Port == peer.Port {
consensus.validators.Delete(k)
count++
}
return true
}
return false
})
for i, pp := range newList {
// Not Found the pubkey, if found pubkey, ignore it
if reflect.DeepEqual(peer.PubKey, pp) {
// consensus.Log.Debug("RemovePeers", "i", i, "pp", pp, "peer.PubKey", peer.PubKey)
newList = append(newList[:i], newList[i+1:]...)
count2++
}
}
}
if count2 > 0 {
consensus.UpdatePublicKeys(newList)
// Send out Pong messages to everyone in the shard to keep the publickeys in sync
// Or the shard won't be able to reach consensus if public keys are mismatch
validators := consensus.GetValidatorPeers()
pong := proto_node.NewPongMessage(validators, consensus.PublicKeys)
buffer := pong.ConstructPongMessage()
host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers)
}
return count2
}
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
@ -420,5 +481,5 @@ func (consensus *Consensus) GetNodeID() uint16 {
// SendMessage sends message thru p2p host to peer.
func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) {
host.SendMessage(consensus.host, peer, message)
host.SendMessage(consensus.host, peer, message, nil)
}

@ -38,6 +38,11 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
for { // keep waiting for new blocks
newBlock := <-blockChannel
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
consensus.Log.Debug("WaitForNewBlock", "removed peers", c)
}
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
@ -63,6 +68,11 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc
newBlock := <-blockChannel
// TODO: think about potential race condition
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
consensus.Log.Debug("WaitForNewBlock", "removed peers", c)
}
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
@ -134,7 +144,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.Log.Debug("Stop encoding block")
msgToSend := consensus.constructAnnounceMessage()
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to AnnounceDone
consensus.state = AnnounceDone
consensus.commitByLeader(true)
@ -261,7 +271,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
consensus.responseByLeader(challengeScalar, targetState == ChallengeDone)
// Broadcast challenge message
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to targetState (ChallengeDone or FinalChallengeDone)
consensus.state = targetState
@ -418,7 +428,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
// Start the second round of Cosi
msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
consensus.commitByLeader(false)
} else {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses))

@ -5,6 +5,7 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
"github.com/harmony-one/harmony/utils"
)
func TestNew(test *testing.T) {
@ -28,3 +29,31 @@ func TestNew(test *testing.T) {
test.Error("Consensus Leader is set to wrong Peer")
}
}
func TestRemovePeers(t *testing.T) {
_, pk1 := utils.GenKey("1", "1")
_, pk2 := utils.GenKey("2", "2")
_, pk3 := utils.GenKey("3", "3")
_, pk4 := utils.GenKey("4", "4")
_, pk5 := utils.GenKey("5", "5")
p1 := p2p.Peer{IP: "1", Port: "1", PubKey: pk1}
p2 := p2p.Peer{IP: "2", Port: "2", PubKey: pk2}
p3 := p2p.Peer{IP: "3", Port: "3", PubKey: pk3}
p4 := p2p.Peer{IP: "4", Port: "4", PubKey: pk4}
peers := []p2p.Peer{p1, p2, p3, p4}
peerRemove := []p2p.Peer{p1, p2}
leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5}
host := p2pimpl.NewHost(leader)
consensus := New(host, "0", peers, leader)
// consensus.DebugPrintPublicKeys()
f := consensus.RemovePeers(peerRemove)
if f == 0 {
t.Errorf("consensus.RemovePeers return false")
consensus.DebugPrintPublicKeys()
}
}

@ -10,6 +10,17 @@ function cleanup() {
done
}
function killnode() {
local port=$1
if [ -n "port" ]; then
pid=$(/bin/ps -fu $USER | grep "benchmark" | grep "$port" | awk '{print $2}')
echo "killing node with port: $port"
kill -9 $pid 2> /dev/null
echo "node with port: $port is killed"
fi
}
trap cleanup SIGINT SIGTERM
function usage {
@ -25,6 +36,7 @@ USAGE: $ME [OPTIONS] config_file_name
-D duration txgen run duration (default: $DURATION)
-m min_peers minimal number of peers to start consensus (default: $MIN)
-s shards number of shards (default: $SHARDS)
-k nodeport kill the node with specified port number (default: $KILLPORT)
This script will build all the binaries and start benchmark and txgen based on the configuration file.
@ -43,8 +55,9 @@ TXGEN=true
DURATION=90
MIN=5
SHARDS=2
KILLPORT=9004
while getopts "hpdtD:m:s:" option; do
while getopts "hpdtD:m:s:k:" option; do
case $option in
h) usage ;;
p) PEER='-peer_discovery' ;;
@ -53,6 +66,7 @@ while getopts "hpdtD:m:s:" option; do
D) DURATION=$OPTARG ;;
m) MIN=$OPTARG ;;
s) SHARDS=$OPTARG ;;
k) KILLPORT=$OPTARG ;;
esac
done
@ -103,6 +117,9 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
fi
done < $config
# Emulate node offline
(sleep 45; killnode $KILLPORT) &
echo "launching txgen ..."
if [ "$TXGEN" == "true" ]; then
if [ -z "$PEER" ]; then

@ -89,7 +89,7 @@ checkLoop:
gotShardInfo = true
break checkLoop
} else {
host.SendMessage(node.host, BCPeer, msgToSend)
host.SendMessage(node.host, BCPeer, msgToSend, nil)
}
}
}

@ -49,6 +49,26 @@ const (
NodeLeader // Node is the leader of some shard.
)
func (state State) String() string {
switch state {
case NodeInit:
return "NodeInit"
case NodeWaitToJoin:
return "NodeWaitToJoin"
case NodeJoinedShard:
return "NodeJoinedShard"
case NodeOffline:
return "NodeOffline"
case NodeReadyForConsensus:
return "NodeReadyForConsensus"
case NodeDoingConsensus:
return "NodeDoingConsensus"
case NodeLeader:
return "NodeLeader"
}
return "Unknown"
}
// Constants related to doing syncing.
const (
NotDoingSyncing uint32 = iota
@ -112,6 +132,9 @@ type Node struct {
// Channel to stop sending ping message
StopPing chan struct{}
// Signal channel for lost validators
OfflinePeers chan p2p.Peer
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -324,6 +347,9 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
node.syncingState = NotDoingSyncing
node.StopPing = make(chan struct{})
node.OfflinePeers = make(chan p2p.Peer)
go node.RemovePeersHandler()
return &node
}
@ -460,3 +486,14 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*
}
return response, nil
}
// RemovePeersHandler is a goroutine to wait on the OfflinePeers channel
// and remove the peers from validator list
func (node *Node) RemovePeersHandler() {
for {
select {
case p := <-node.OfflinePeers:
node.Consensus.OfflinePeerList = append(node.Consensus.OfflinePeerList, p)
}
}
}

@ -566,7 +566,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
// Broadcast the message to all validators, as publicKeys is updated
// FIXME: HAR-89 use a separate nodefind/neighbor message
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer)
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers)
return len(peers)
}
@ -578,6 +578,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return -1
}
// node.log.Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID())
peers := make([]*p2p.Peer, 0)
for _, p := range pong.Peers {
@ -615,9 +617,13 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, key)
}
node.State = NodeJoinedShard
// Notify JoinShard to stop sending Ping messages
node.StopPing <- struct{}{}
if node.State == NodeWaitToJoin {
node.State = NodeJoinedShard
// Notify JoinShard to stop sending Ping messages
if node.StopPing != nil {
node.StopPing <- struct{}{}
}
}
return node.Consensus.UpdatePublicKeys(publicKeys)
}

@ -7,12 +7,12 @@ import (
// SendMessage sends data to ip, port
func (node *Node) SendMessage(p p2p.Peer, data []byte) {
host.SendMessage(node.host, p, data)
host.SendMessage(node.host, p, data, nil)
}
// BroadcastMessage broadcasts message to peers
func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) {
host.BroadcastMessage(node.host, peers, data)
host.BroadcastMessage(node.host, peers, data, nil)
}
// GetHost returns the p2p host

@ -1,6 +1,7 @@
package hostv1
import (
"fmt"
"io"
"net"
"time"
@ -71,18 +72,17 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) {
func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) {
addr := net.JoinHostPort(peer.IP, peer.Port)
conn, err := net.Dial("tcp", addr)
// log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr)
if err != nil {
log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err)
return
log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err)
return fmt.Errorf("Dail Failed")
}
defer conn.Close()
nw, err := conn.Write(message)
if err != nil {
log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err)
return
return fmt.Errorf("Write Failed")
}
if nw < len(message) {
log.Warn("Write() returned short count",
@ -91,7 +91,7 @@ func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) {
}
// No ack (reply) message from the receiver for now.
return
return nil
}
// Close closes the host

@ -13,14 +13,14 @@ import (
// SendMessage is to connect a socket given a port and send the given message.
// TODO(minhdoan, rj): need to check if a peer is reachable or not.
func SendMessage(host Host, p p2p.Peer, message []byte) {
func SendMessage(host Host, p p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Construct normal p2p message
content := ConstructP2pMessage(byte(0), message)
go send(host, p, content)
go send(host, p, content, lostPeer)
}
// BroadcastMessage sends the message to a list of peers
func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
if len(peers) == 0 {
return
}
@ -32,7 +32,7 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
start := time.Now()
for _, peer := range peers {
peerCopy := peer
go send(h, peerCopy, content)
go send(h, peerCopy, content, lostPeer)
}
log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
@ -43,6 +43,13 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
}
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg, lostPeer)
}
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte {
@ -58,19 +65,10 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte {
return byteBuffer.Bytes()
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg)
log.Info("Done sending from leader")
}
// BroadcastMessageFromValidator sends the message to a list of peers from a validator.
func BroadcastMessageFromValidator(h Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) {
peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast)
BroadcastMessage(h, peers, msg)
log.Info("Done sending from validator")
BroadcastMessage(h, peers, msg, nil)
}
// MaxBroadCast is the maximum number of neighbors to broadcast
@ -89,14 +87,14 @@ func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer {
}
// Send a message to another node with given port.
func send(h Host, peer p2p.Peer, message []byte) {
func send(h Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Add attack code here.
//attack.GetInstance().Run()
backoff := p2p.NewExpBackoff(250*time.Millisecond, 10*time.Second, 2)
backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2)
for trial := 0; trial < 10; trial++ {
var err error
h.SendMessage(peer, message)
err = h.SendMessage(peer, message)
if err == nil {
if trial > 0 {
log.Warn("retry send", "rety", trial)
@ -108,6 +106,11 @@ func send(h Host, peer p2p.Peer, message []byte) {
backoff.Sleep()
}
log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port))
if lostPeer != nil {
// Notify lostPeer channel
lostPeer <- peer
}
}
// DialWithSocketClient joins host port and establishes connection

Loading…
Cancel
Save