Merge branch 'master' of github.com:harmony-one/harmony into rj_branch

pull/105/head
Rongjian Lan 6 years ago
commit 947c882f26
  1. 1
      benchmark.go
  2. 20
      consensus/consensus.go
  3. 20
      consensus/consensus_leader.go
  4. 7
      go_executable_build.sh
  5. 63
      node/node.go
  6. 16
      node/node_handler.go
  7. 3
      p2p/peer.go
  8. 4
      syncing/downloader/server.go

@ -230,5 +230,6 @@ func main() {
} }
} }
go currentNode.SupportSyncing()
currentNode.StartServer(*port) currentNode.StartServer(*port)
} }

@ -48,6 +48,7 @@ type Consensus struct {
leader p2p.Peer leader p2p.Peer
// Public keys of the committee including leader and validators // Public keys of the committee including leader and validators
PublicKeys []kyber.Point PublicKeys []kyber.Point
pubKeyLock sync.Mutex
// private/public keys of current node // private/public keys of current node
priKey kyber.Scalar priKey kyber.Scalar
@ -87,7 +88,7 @@ type Consensus struct {
Log log.Logger Log log.Logger
uniqueIdInstance *utils.UniqueValidatorId uniqueIDInstance *utils.UniqueValidatorId
} }
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
@ -172,7 +173,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
} }
consensus.Log = log.New() consensus.Log = log.New()
consensus.uniqueIdInstance = utils.GetUniqueValidatorIdInstance() consensus.uniqueIDInstance = utils.GetUniqueValidatorIdInstance()
return &consensus return &consensus
} }
@ -241,7 +242,7 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int {
_, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer))
if !ok { if !ok {
if peer.ValidatorID == -1 { if peer.ValidatorID == -1 {
peer.ValidatorID = int(consensus.uniqueIdInstance.GetUniqueId()) peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueId())
} }
consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer)
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
@ -277,9 +278,18 @@ func (consensus *Consensus) DebugPrintValidators() {
consensus.Log.Debug("validator:", "IP", p.Ip, "Port", p.Port, "VID", p.ValidatorID, "Key", str2) consensus.Log.Debug("validator:", "IP", p.Ip, "Port", p.Port, "VID", p.ValidatorID, "Key", str2)
count++ count++
return true return true
} else {
return false
} }
return false
}) })
consensus.Log.Debug("Validators", "#", count) consensus.Log.Debug("Validators", "#", count)
} }
// UpdatePublicKeys updates the PublicKeys variable, protected by a mutex
func (consensus *Consensus) UpdatePublicKeys(pubKeys []kyber.Point) int {
consensus.pubKeyLock.Lock()
// consensus.PublicKeys = make([]kyber.Point, len(pubKeys))
consensus.PublicKeys = append(pubKeys[:0:0], pubKeys...)
consensus.pubKeyLock.Unlock()
return len(consensus.PublicKeys)
}

@ -22,6 +22,10 @@ import (
proto_consensus "github.com/harmony-one/harmony/proto/consensus" proto_consensus "github.com/harmony-one/harmony/proto/consensus"
) )
const (
waitForEnoughValidators = 300
)
var ( var (
startTime time.Time startTime time.Time
) )
@ -29,13 +33,12 @@ var (
// WaitForNewBlock waits for the next new block to run consensus on // WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus) consensus.Log.Debug("Waiting for block", "consensus", consensus)
backoff := p2p.NewExpBackoff(500*time.Millisecond, 30*time.Second, 2.0)
for { // keep waiting for new blocks for { // keep waiting for new blocks
newBlock := <-blockChannel newBlock := <-blockChannel
if !consensus.HasEnoughValidators() { if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
backoff.Sleep() time.Sleep(waitForEnoughValidators * time.Millisecond)
} }
// TODO: think about potential race condition // TODO: think about potential race condition
@ -50,12 +53,18 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
} }
} }
// WaitForNewBlock waits for the next new block to run consensus on // WaitForNewBlockAccount waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Block) { func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus) consensus.Log.Debug("Waiting for block", "consensus", consensus)
for { // keep waiting for new blocks for { // keep waiting for new blocks
newBlock := <-blockChannel newBlock := <-blockChannel
// TODO: think about potential race condition // TODO: think about potential race condition
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
}
startTime = time.Now() startTime = time.Now()
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime) consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
for consensus.state == Finished { for consensus.state == Finished {
@ -63,6 +72,7 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc
data, err := rlp.EncodeToBytes(newBlock) data, err := rlp.EncodeToBytes(newBlock)
if err == nil { if err == nil {
consensus.Log.Debug("Sample tx", "tx", newBlock.Transactions()[0]) consensus.Log.Debug("Sample tx", "tx", newBlock.Transactions()[0])
consensus.ResetState()
consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: data}) consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: data})
} else { } else {
consensus.Log.Error("Failed encoding the block with RLP") consensus.Log.Error("Failed encoding the block with RLP")
@ -498,6 +508,10 @@ func (consensus *Consensus) reportMetrics(block blockchain.Block) {
profiler.LogMetrics(metrics) profiler.LogMetrics(metrics)
} }
// HasEnoughValidators checks the number of publicKeys to determine
// if the shard has enough validators
// FIXME (HAR-82): we need epoch support or a better way to determine
// when to initiate the consensus
func (consensus *Consensus) HasEnoughValidators() bool { func (consensus *Consensus) HasEnoughValidators() bool {
if len(consensus.PublicKeys) < consensus.MinPeers { if len(consensus.PublicKeys) < consensus.MinPeers {
return false return false

@ -8,6 +8,7 @@ BUCKET=unique-bucket-bin
GOOS=linux GOOS=linux
GOARCH=amd64 GOARCH=amd64
FOLDER=/${WHOAMI:-$USER} FOLDER=/${WHOAMI:-$USER}
RACE=
if [ "$(uname -s)" == "Darwin" ]; then if [ "$(uname -s)" == "Darwin" ]; then
MD5='md5 -r' MD5='md5 -r'
@ -29,6 +30,7 @@ OPTIONS:
-o os set build OS (default: $GOOS, windows is supported) -o os set build OS (default: $GOOS, windows is supported)
-b bucket set the upload bucket name (default: $BUCKET) -b bucket set the upload bucket name (default: $BUCKET)
-f folder set the upload folder name in the bucket (default: $FOLDER) -f folder set the upload folder name in the bucket (default: $FOLDER)
-r enable -race build option (default: $RACE)
ACTION: ACTION:
build build binaries only (default action) build build binaries only (default action)
@ -57,7 +59,7 @@ function build_only
BUILTBY=${USER}@ BUILTBY=${USER}@
for bin in "${!SRC[@]}"; do for bin in "${!SRC[@]}"; do
env GOOS=$GOOS GOARCH=$GOARCH go build -ldflags="-X main.version=v${VERSION} -X main.commit=${COMMIT} -X main.builtAt=${BUILTAT} -X main.builtBy=${BUILTBY}" -o $BINDIR/$bin ${SRC[$bin]} env GOOS=$GOOS GOARCH=$GOARCH go build -ldflags="-X main.version=v${VERSION} -X main.commit=${COMMIT} -X main.builtAt=${BUILTAT} -X main.builtBy=${BUILTBY}" -o $BINDIR/$bin $RACE ${SRC[$bin]}
if [ "$(uname -s)" == "Linux" ]; then if [ "$(uname -s)" == "Linux" ]; then
$BINDIR/$bin -version $BINDIR/$bin -version
fi fi
@ -84,7 +86,7 @@ function upload
} }
################################ MAIN FUNCTION ############################## ################################ MAIN FUNCTION ##############################
while getopts "hp:a:o:b:f:" option; do while getopts "hp:a:o:b:f:r" option; do
case $option in case $option in
h) usage ;; h) usage ;;
p) PROFILE=$OPTARG ;; p) PROFILE=$OPTARG ;;
@ -92,6 +94,7 @@ while getopts "hp:a:o:b:f:" option; do
o) GOOS=$OPTARG ;; o) GOOS=$OPTARG ;;
b) BUCKET=$OPTARG/ ;; b) BUCKET=$OPTARG/ ;;
f) FOLDER=$OPTARG ;; f) FOLDER=$OPTARG ;;
r) RACE=-race ;;
esac esac
done done

@ -8,6 +8,8 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"net" "net"
"os"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -31,17 +33,26 @@ import (
downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto"
) )
type NodeState byte // State is a state of a node.
type State byte
// All constants except the NodeLeader below are for validators only.
const ( const (
NodeInit NodeState = iota // Node just started, before contacting BeaconChain NodeInit State = iota // Node just started, before contacting BeaconChain
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeJoinedShard // Node joined Shard, ready for consensus NodeJoinedShard // Node joined Shard, ready for consensus
NodeOffline // Node is offline NodeOffline // Node is offline
NodeReadyForConsensus // Node is ready to do consensus NodeReadyForConsensus // Node is ready to do consensus
NodeDoingConsensus // Node is already doing consensus NodeDoingConsensus // Node is already doing consensus
NodeLeader // Node is the leader of some shard.
) )
const (
// TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus
TimeToSleepForSyncing = time.Second * 30
)
// NetworkNode ...
type NetworkNode struct { type NetworkNode struct {
SelfPeer p2p.Peer SelfPeer p2p.Peer
IDCPeer p2p.Peer IDCPeer p2p.Peer
@ -64,14 +75,13 @@ type Node struct {
crossTxToReturnMutex sync.Mutex crossTxToReturnMutex sync.Mutex
ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept ClientPeer *p2p.Peer // The peer for the benchmark tx generator client, used for leaders to return proof-of-accept
Client *client.Client // The presence of a client object means this node will also act as a client Client *client.Client // The presence of a client object means this node will also act as a client
IsWaiting bool SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer IDCPeer p2p.Peer
SyncNode bool // TODO(minhdoan): Remove it later. SyncNode bool // TODO(minhdoan): Remove it later.
chain *core.BlockChain // Account Model chain *core.BlockChain // Account Model
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State NodeState // State of the Node State State // State of the Node
// Account Model // Account Model
pendingTransactionsAccount types.Transactions // TODO: replace with txPool pendingTransactionsAccount types.Transactions // TODO: replace with txPool
@ -150,6 +160,7 @@ func (node *Node) StartServer(port string) {
node.listenOnPort(port) node.listenOnPort(port)
} }
// SetLog sets log for Node.
func (node *Node) SetLog() *Node { func (node *Node) SetLog() *Node {
node.log = log.New() node.log = log.New()
return node return node
@ -205,7 +216,7 @@ func (node *Node) countNumTransactionsInBlockchainAccount() int {
return count return count
} }
//ConnectIdentityChain connects to identity chain //ConnectBeaconChain connects to identity chain
func (node *Node) ConnectBeaconChain() { func (node *Node) ConnectBeaconChain() {
Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer} Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer}
msg := node.SerializeNode(Nnode) msg := node.SerializeNode(Nnode)
@ -298,12 +309,16 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
} }
// Logger // Logger
node.log = log.New() node.log = log.New()
node.State = NodeInit if consensus.IsLeader {
node.State = NodeLeader
} else {
node.State = NodeInit
}
return &node return &node
} }
// Add neighbors nodes // AddPeers adds neighbors nodes
func (node *Node) AddPeers(peers []p2p.Peer) int { func (node *Node) AddPeers(peers []p2p.Peer) int {
count := 0 count := 0
for _, p := range peers { for _, p := range peers {
@ -332,14 +347,28 @@ func (node *Node) JoinShard(leader p2p.Peer) {
buffer := ping.ConstructPingMessage() buffer := ping.ConstructPingMessage()
p2p.SendMessage(leader, buffer) p2p.SendMessage(leader, buffer)
node.log.Debug("Sent ping message")
} }
} }
// StartDownloaderServer starts downloader server. // SupportSyncing keeps sleeping until it's doing consensus or it's a leader.
func (node *Node) StartDownloaderServer() { func (node *Node) SupportSyncing() {
node.InitSyncingServer()
node.StartSyncingServer()
}
// InitSyncingServer starts downloader server.
func (node *Node) InitSyncingServer() {
node.downloaderServer = downloader.NewServer(node) node.downloaderServer = downloader.NewServer(node)
// node.downloaderServer.Start(node.) }
// StartSyncingServer starts syncing server.
func (node *Node) StartSyncingServer() {
if port, err := strconv.Atoi(node.SelfPeer.Port); err == nil {
node.downloaderServer.Start(node.SelfPeer.Ip, fmt.Sprintf("%d", port-1000))
} else {
node.log.Error("Wrong port format provided")
os.Exit(1)
}
} }
// CalculateResponse implements DownloadInterface on Node object. // CalculateResponse implements DownloadInterface on Node object.

@ -372,7 +372,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
} }
} }
// WaitForConsensusReady ... // WaitForConsensusReadyAccount ...
func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) { func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
node.log.Debug("Waiting for Consensus ready", "node", node) node.log.Debug("Waiting for Consensus ready", "node", node)
@ -466,7 +466,7 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
return node.UtxoPool.VerifyTransactions(newBlock.Transactions) return node.UtxoPool.VerifyTransactions(newBlock.Transactions)
} }
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on // VerifyNewBlockAccount is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool { func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool {
err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err != nil { if err != nil {
@ -604,6 +604,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return -1 return -1
} }
// node.log.Info("Pong", "Msg", pong) // node.log.Info("Pong", "Msg", pong)
// TODO (lc) state syncing, and wait for all public keys
node.State = NodeJoinedShard node.State = NodeJoinedShard
peers := make([]p2p.Peer, 0) peers := make([]p2p.Peer, 0)
@ -621,15 +622,16 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
continue continue
} }
peers = append(peers, *peer) peers = append(peers, *peer)
} }
count := node.AddPeers(peers) if len(peers) > 0 {
node.AddPeers(peers)
}
// Reset Validator PublicKeys every time we receive PONG message from Leader // Reset Validator PublicKeys every time we receive PONG message from Leader
// The PublicKeys has to be idential across the shard on every node // The PublicKeys has to be idential across the shard on every node
// TODO (lc): we need to handle RemovePeer situation // TODO (lc): we need to handle RemovePeer situation
node.Consensus.PublicKeys = make([]kyber.Point, 0) publicKeys := make([]kyber.Point, 0)
// Create the the PubKey from the []byte sent from leader // Create the the PubKey from the []byte sent from leader
for _, k := range pong.PubKeys { for _, k := range pong.PubKeys {
@ -639,8 +641,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.log.Error("UnmarshalBinary Failed PubKeys", "error", err) node.log.Error("UnmarshalBinary Failed PubKeys", "error", err)
continue continue
} }
node.Consensus.PublicKeys = append(node.Consensus.PublicKeys, key) publicKeys = append(publicKeys, key)
} }
return count return node.Consensus.UpdatePublicKeys(publicKeys)
} }

@ -135,6 +135,9 @@ func send(ip, port string, message []byte) {
for trial := 0; trial < 10; trial++ { for trial := 0; trial < 10; trial++ {
err := sendWithSocketClient(ip, port, message) err := sendWithSocketClient(ip, port, message)
if err == nil { if err == nil {
if trial > 0 {
log.Warn("retry sendWithSocketClient", "rety", trial)
}
return return
} }
log.Info("sleeping before trying to send again", log.Info("sleeping before trying to send again",

@ -11,6 +11,10 @@ import (
pb "github.com/harmony-one/harmony/syncing/downloader/proto" pb "github.com/harmony-one/harmony/syncing/downloader/proto"
) )
const (
DefaultDownloadPort = "6666"
)
// Server is the Server struct for downloader package. // Server is the Server struct for downloader package.
type Server struct { type Server struct {
downloadInterface DownloadInterface downloadInterface DownloadInterface

Loading…
Cancel
Save