diff --git a/benchmark.go b/benchmark.go index af17637fa..5b698c651 100644 --- a/benchmark.go +++ b/benchmark.go @@ -230,5 +230,6 @@ func main() { } } + go currentNode.SupportSyncing() currentNode.StartServer(*port) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 067f64428..1c8fd8cd1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -48,6 +48,7 @@ type Consensus struct { leader p2p.Peer // Public keys of the committee including leader and validators PublicKeys []kyber.Point + pubKeyLock sync.Mutex // private/public keys of current node priKey kyber.Scalar @@ -87,7 +88,7 @@ type Consensus struct { Log log.Logger - uniqueIdInstance *utils.UniqueValidatorId + uniqueIDInstance *utils.UniqueValidatorId } // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far @@ -172,7 +173,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * } consensus.Log = log.New() - consensus.uniqueIdInstance = utils.GetUniqueValidatorIdInstance() + consensus.uniqueIDInstance = utils.GetUniqueValidatorIdInstance() return &consensus } @@ -241,7 +242,7 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) if !ok { if peer.ValidatorID == -1 { - peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId()) + peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueId()) } consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) @@ -277,9 +278,18 @@ func (consensus *Consensus) DebugPrintValidators() { consensus.Log.Debug("validator:", "IP", p.Ip, "Port", p.Port, "VID", p.ValidatorID, "Key", str2) count++ return true - } else { - return false } + return false }) consensus.Log.Debug("Validators", "#", count) } + +// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex +func (consensus *Consensus) UpdatePublicKeys(pubKeys []kyber.Point) int { + consensus.pubKeyLock.Lock() + // consensus.PublicKeys = make([]kyber.Point, len(pubKeys)) + consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...) + consensus.pubKeyLock.Unlock() + + return len(consensus.PublicKeys) +} diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 5c009f079..67e5f1268 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -22,6 +22,10 @@ import ( proto_consensus "github.com/harmony-one/harmony/proto/consensus" ) +const ( + waitForEnoughValidators = 300 +) + var ( startTime time.Time ) @@ -29,13 +33,12 @@ var ( // WaitForNewBlock waits for the next new block to run consensus on func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { consensus.Log.Debug("Waiting for block", "consensus", consensus) - backoff := p2p.NewExpBackoff(500*time.Millisecond, 30*time.Second, 2.0) for { // keep waiting for new blocks newBlock := <-blockChannel if !consensus.HasEnoughValidators() { consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) - backoff.Sleep() + time.Sleep(waitForEnoughValidators * time.Millisecond) } // TODO: think about potential race condition @@ -50,12 +53,18 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) } } -// WaitForNewBlock waits for the next new block to run consensus on +// WaitForNewBlockAccount waits for the next new block to run consensus on func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Block) { consensus.Log.Debug("Waiting for block", "consensus", consensus) for { // keep waiting for new blocks newBlock := <-blockChannel // TODO: think about potential race condition + + if !consensus.HasEnoughValidators() { + consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) + time.Sleep(waitForEnoughValidators * time.Millisecond) + } + startTime = time.Now() consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime) for consensus.state == Finished { @@ -63,6 +72,7 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc data, err := rlp.EncodeToBytes(newBlock) if err == nil { consensus.Log.Debug("Sample tx", "tx", newBlock.Transactions()[0]) + consensus.ResetState() consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: data}) } else { consensus.Log.Error("Failed encoding the block with RLP") @@ -498,6 +508,10 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) { profiler.LogMetrics(metrics) } +// HasEnoughValidators checks the number of publicKeys to determine +// if the shard has enough validators +// FIXME (HAR-82): we need epoch support or a better way to determine +// when to initiate the consensus func (consensus *Consensus) HasEnoughValidators() bool { if len(consensus.PublicKeys) < consensus.MinPeers { return false diff --git a/go_executable_build.sh b/go_executable_build.sh index 61fa7a14a..a6a3fa482 100755 --- a/go_executable_build.sh +++ b/go_executable_build.sh @@ -8,6 +8,7 @@ BUCKET=unique-bucket-bin GOOS=linux GOARCH=amd64 FOLDER=/${WHOAMI:-$USER} +RACE= if [ "$(uname -s)" == "Darwin" ]; then MD5='md5 -r' @@ -29,6 +30,7 @@ OPTIONS: -o os set build OS (default: $GOOS, windows is supported) -b bucket set the upload bucket name (default: $BUCKET) -f folder set the upload folder name in the bucket (default: $FOLDER) + -r enable -race build option (default: $RACE) ACTION: build build binaries only (default action) @@ -57,7 +59,7 @@ function build_only BUILTBY=${USER}@ for bin in "${!SRC[@]}"; do - env GOOS=$GOOS GOARCH=$GOARCH go build -ldflags="-X main.version=v${VERSION} -X main.commit=${COMMIT} -X main.builtAt=${BUILTAT} -X main.builtBy=${BUILTBY}" -o $BINDIR/$bin ${SRC[$bin]} + env GOOS=$GOOS GOARCH=$GOARCH go build -ldflags="-X main.version=v${VERSION} -X main.commit=${COMMIT} -X main.builtAt=${BUILTAT} -X main.builtBy=${BUILTBY}" -o $BINDIR/$bin $RACE ${SRC[$bin]} if [ "$(uname -s)" == "Linux" ]; then $BINDIR/$bin -version fi @@ -84,7 +86,7 @@ function upload } ################################ MAIN FUNCTION ############################## -while getopts "hp:a:o:b:f:" option; do +while getopts "hp:a:o:b:f:r" option; do case $option in h) usage ;; p) PROFILE=$OPTARG ;; @@ -92,6 +94,7 @@ while getopts "hp:a:o:b:f:" option; do o) GOOS=$OPTARG ;; b) BUCKET=$OPTARG/ ;; f) FOLDER=$OPTARG ;; + r) RACE=-race ;; esac done diff --git a/node/node.go b/node/node.go index fd256a319..3bc6cc0dd 100644 --- a/node/node.go +++ b/node/node.go @@ -8,6 +8,8 @@ import ( "math/big" "math/rand" "net" + "os" + "strconv" "strings" "sync" "time" @@ -31,17 +33,26 @@ import ( downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) -type NodeState byte +// State is a state of a node. +type State byte +// All constants except the NodeLeader below are for validators only. const ( - NodeInit NodeState = iota // Node just started, before contacting BeaconChain - NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard - NodeJoinedShard // Node joined Shard, ready for consensus - NodeOffline // Node is offline - NodeReadyForConsensus // Node is ready to do consensus - NodeDoingConsensus // Node is already doing consensus + NodeInit State = iota // Node just started, before contacting BeaconChain + NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard + NodeJoinedShard // Node joined Shard, ready for consensus + NodeOffline // Node is offline + NodeReadyForConsensus // Node is ready to do consensus + NodeDoingConsensus // Node is already doing consensus + NodeLeader // Node is the leader of some shard. ) +const ( + // TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus + TimeToSleepForSyncing = time.Second * 30 +) + +// NetworkNode ... type NetworkNode struct { SelfPeer p2p.Peer IDCPeer p2p.Peer @@ -64,14 +75,13 @@ type Node struct { crossTxToReturnMutex sync.Mutex ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept Client *client.Client // The presence of a client object means this node will also act as a client - IsWaiting bool - SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. + SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. IDCPeer p2p.Peer SyncNode bool // TODO(minhdoan): Remove it later. chain *core.BlockChain // Account Model Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer - State NodeState // State of the Node + State State // State of the Node // Account Model pendingTransactionsAccount types.Transactions // TODO: replace with txPool @@ -150,6 +160,7 @@ func (node *Node) StartServer(port string) { node.listenOnPort(port) } +// SetLog sets log for Node. func (node *Node) SetLog() *Node { node.log = log.New() return node @@ -205,7 +216,7 @@ func (node *Node) countNumTransactionsInBlockchainAccount() int { return count } -//ConnectIdentityChain connects to identity chain +//ConnectBeaconChain connects to identity chain func (node *Node) ConnectBeaconChain() { Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer} msg := node.SerializeNode(Nnode) @@ -298,12 +309,16 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { } // Logger node.log = log.New() - node.State = NodeInit + if consensus.IsLeader { + node.State = NodeLeader + } else { + node.State = NodeInit + } return &node } -// Add neighbors nodes +// AddPeers adds neighbors nodes func (node *Node) AddPeers(peers []p2p.Peer) int { count := 0 for _, p := range peers { @@ -332,14 +347,28 @@ func (node *Node) JoinShard(leader p2p.Peer) { buffer := ping.ConstructPingMessage() p2p.SendMessage(leader, buffer) - node.log.Debug("Sent ping message") } } -// StartDownloaderServer starts downloader server. -func (node *Node) StartDownloaderServer() { +// SupportSyncing keeps sleeping until it's doing consensus or it's a leader. +func (node *Node) SupportSyncing() { + node.InitSyncingServer() + node.StartSyncingServer() +} + +// InitSyncingServer starts downloader server. +func (node *Node) InitSyncingServer() { node.downloaderServer = downloader.NewServer(node) - // node.downloaderServer.Start(node.) +} + +// StartSyncingServer starts syncing server. +func (node *Node) StartSyncingServer() { + if port, err := strconv.Atoi(node.SelfPeer.Port); err == nil { + node.downloaderServer.Start(node.SelfPeer.Ip, fmt.Sprintf("%d", port-1000)) + } else { + node.log.Error("Wrong port format provided") + os.Exit(1) + } } // CalculateResponse implements DownloadInterface on Node object. diff --git a/node/node_handler.go b/node/node_handler.go index 164ceec14..47eb7cca9 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -372,7 +372,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { } } -// WaitForConsensusReady ... +// WaitForConsensusReadyAccount ... func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) { node.log.Debug("Waiting for Consensus ready", "node", node) @@ -466,7 +466,7 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { return node.UtxoPool.VerifyTransactions(newBlock.Transactions) } -// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on +// VerifyNewBlockAccount is called by consensus participants to verify the block (account model) they are running consensus on func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool { err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) if err != nil { @@ -604,6 +604,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return -1 } // node.log.Info("Pong", "Msg", pong) + // TODO (lc) state syncing, and wait for all public keys node.State = NodeJoinedShard peers := make([]p2p.Peer, 0) @@ -621,15 +622,16 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { continue } peers = append(peers, *peer) - } - count := node.AddPeers(peers) + if len(peers) > 0 { + node.AddPeers(peers) + } // Reset Validator PublicKeys every time we receive PONG message from Leader // The PublicKeys has to be idential across the shard on every node // TODO (lc): we need to handle RemovePeer situation - node.Consensus.PublicKeys = make([]kyber.Point, 0) + publicKeys := make([]kyber.Point, 0) // Create the the PubKey from the []byte sent from leader for _, k := range pong.PubKeys { @@ -639,8 +641,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { node.log.Error("UnmarshalBinary Failed PubKeys", "error", err) continue } - node.Consensus.PublicKeys = append(node.Consensus.PublicKeys, key) + publicKeys = append(publicKeys, key) } - return count + return node.Consensus.UpdatePublicKeys(publicKeys) } diff --git a/p2p/peer.go b/p2p/peer.go index 78b59a805..78fd6e1ef 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -135,6 +135,9 @@ func send(ip, port string, message []byte) { for trial := 0; trial < 10; trial++ { err := sendWithSocketClient(ip, port, message) if err == nil { + if trial > 0 { + log.Warn("retry sendWithSocketClient", "rety", trial) + } return } log.Info("sleeping before trying to send again", diff --git a/syncing/downloader/server.go b/syncing/downloader/server.go index 753e6572e..dd28ba882 100644 --- a/syncing/downloader/server.go +++ b/syncing/downloader/server.go @@ -11,6 +11,10 @@ import ( pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) +const ( + DefaultDownloadPort = "6666" +) + // Server is the Server struct for downloader package. type Server struct { downloadInterface DownloadInterface