From f8aa5a218e4ef2757b2155fa034b807663ebee66 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 12 Feb 2019 11:42:11 -0800 Subject: [PATCH 1/4] Add randomness service to beacon leader --- api/service/manager.go | 3 +++ .../{randgen => randomness}/service.go | 22 ++++++++++--------- cmd/harmony.go | 6 +++++ node/node.go | 6 +++++ 4 files changed, 27 insertions(+), 10 deletions(-) rename api/service/{randgen => randomness}/service.go (64%) diff --git a/api/service/manager.go b/api/service/manager.go index 1080a7598..04d3e993d 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -25,6 +25,7 @@ const ( ClientSupport SupportExplorer Consensus + Randomness BlockProposal NetworkInfo PeerDiscovery @@ -43,6 +44,8 @@ func (t Type) String() string { return "SupportExplorer" case Consensus: return "Consensus" + case Randomness: + return "Randomness" case BlockProposal: return "BlockProposal" case NetworkInfo: diff --git a/api/service/randgen/service.go b/api/service/randomness/service.go similarity index 64% rename from api/service/randgen/service.go rename to api/service/randomness/service.go index 95e17e6bc..a56c7cb0a 100644 --- a/api/service/randgen/service.go +++ b/api/service/randomness/service.go @@ -1,21 +1,23 @@ -package randgen +package randomness import ( + "github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/internal/utils" ) -// Service is the random generation service. +// Service is the randomness generation service. type Service struct { stopChan chan struct{} stoppedChan chan struct{} + DRand *drand.DRand } -// New returns random generation service. -func New() *Service { - return &Service{} +// New returns randomness generation service. +func New(dRand *drand.DRand) *Service { + return &Service{DRand: dRand} } -// StartService starts random generation service. +// StartService starts randomness generation service. func (s *Service) StartService() { s.stopChan = make(chan struct{}) s.stoppedChan = make(chan struct{}) @@ -24,11 +26,11 @@ func (s *Service) StartService() { s.Run(s.stopChan, s.stoppedChan) } -// Init initializes random generation. +// Init initializes randomness generation. func (s *Service) Init() { } -// Run runs random generation. +// Run runs randomness generation. func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { go func() { defer close(stoppedChan) @@ -45,12 +47,12 @@ func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { }() } -// DoRandomGeneration does random generation. +// DoRandomGeneration does rarandomnessndom generation. func (s *Service) DoRandomGeneration() { } -// StopService stops random generation service. +// StopService stops randomness generation service. func (s *Service) StopService() { utils.GetLogInstance().Info("Stopping random generation service.") s.stopChan <- struct{}{} diff --git a/cmd/harmony.go b/cmd/harmony.go index 4024a25c4..87815e5de 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -9,6 +9,8 @@ import ( "runtime" "time" + "github.com/harmony-one/harmony/drand" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/harmony/consensus" @@ -244,6 +246,10 @@ func main() { } else { currentNode.Role = node.BeaconValidator } + + // Add randomness protocol + dRand := drand.New(host, shardID, peers, leader) + currentNode.DRand = dRand } else { if role == "leader" { currentNode.Role = node.ShardLeader diff --git a/node/node.go b/node/node.go index 918895be6..ab0efb9df 100644 --- a/node/node.go +++ b/node/node.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/harmony-one/harmony/drand" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -28,6 +30,7 @@ import ( "github.com/harmony-one/harmony/api/service/discovery" "github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/api/service/networkinfo" + randomness_service "github.com/harmony-one/harmony/api/service/randomness" "github.com/harmony-one/harmony/api/service/staking" "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" @@ -120,6 +123,7 @@ type Node struct { pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus transactionInConsensus []*types.Transaction // The transactions selected into the new block and under Consensus process pendingTxMutex sync.Mutex + DRand *drand.DRand // The instance for distributed randomness protocol blockchain *core.BlockChain // The blockchain for the shard where this node belongs db *ethdb.LDBDatabase // LevelDB to store blockchain. @@ -620,6 +624,8 @@ func (node *Node) setupForBeaconLeader() { node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.SelfPeer.IP, node.SelfPeer.Port)) + // Register randomness service + node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand)) } func (node *Node) setupForBeaconValidator() { From e4917f4a7971caebe6fd32d1baa7b6e6b85b7ca4 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 12 Feb 2019 11:43:49 -0800 Subject: [PATCH 2/4] Add randomness service to beacon validator --- node/node.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/node/node.go b/node/node.go index ab0efb9df..29a8beb37 100644 --- a/node/node.go +++ b/node/node.go @@ -642,6 +642,8 @@ func (node *Node) setupForBeaconValidator() { node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer)) + // Register randomness service + node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand)) } func (node *Node) setupForNewNode() { From 8142b11aa7ee1bf53f7f82294c91e621c6577ba8 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 12 Feb 2019 13:53:32 -0800 Subject: [PATCH 3/4] Add channel for confirmed blocks and add that between node and drand --- api/service/randomness/service.go | 5 +++++ cmd/harmony.go | 2 +- drand/drand.go | 17 ++++++++++++----- drand/drand_leader_msg_test.go | 2 +- drand/drand_test.go | 2 +- drand/drand_validator_msg_test.go | 2 +- node/node.go | 4 +++- node/node_handler.go | 6 ++++++ 8 files changed, 30 insertions(+), 10 deletions(-) diff --git a/api/service/randomness/service.go b/api/service/randomness/service.go index a56c7cb0a..1e3177e23 100644 --- a/api/service/randomness/service.go +++ b/api/service/randomness/service.go @@ -28,6 +28,11 @@ func (s *Service) StartService() { // Init initializes randomness generation. func (s *Service) Init() { + for { + newBlock := <-s.DRand.ConfirmedBlockChannel + _ = newBlock + // TODO: process newBlock + } } // Run runs randomness generation. diff --git a/cmd/harmony.go b/cmd/harmony.go index 87815e5de..12851d1e1 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -248,7 +248,7 @@ func main() { } // Add randomness protocol - dRand := drand.New(host, shardID, peers, leader) + dRand := drand.New(host, shardID, peers, leader, currentNode.ConfirmedBlockChannel) currentNode.DRand = dRand } else { if role == "leader" { diff --git a/drand/drand.go b/drand/drand.go index ab878a705..3599d643d 100644 --- a/drand/drand.go +++ b/drand/drand.go @@ -7,6 +7,8 @@ import ( "strconv" "sync" + "github.com/harmony-one/harmony/core/types" + protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" drand_proto "github.com/harmony-one/harmony/api/drand" @@ -17,10 +19,11 @@ import ( // DRand is the main struct which contains state for the distributed randomness protocol. type DRand struct { - vrfs *map[uint32][]byte - bitmap *bls_cosi.Mask - pRand *[32]byte - rand *[32]byte + vrfs *map[uint32][]byte + bitmap *bls_cosi.Mask + pRand *[32]byte + rand *[32]byte + ConfirmedBlockChannel chan *types.Block // Channel for confirmed blocks // map of nodeID to validator Peer object // FIXME: should use PubKey of p2p.Peer as the hashkey @@ -53,10 +56,14 @@ type DRand struct { } // New creates a new dRand object -func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *DRand { +func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confirmedBlockChannel chan *types.Block) *DRand { dRand := DRand{} dRand.host = host + if confirmedBlockChannel != nil { + dRand.ConfirmedBlockChannel = confirmedBlockChannel + } + selfPeer := host.GetSelfPeer() if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { dRand.IsLeader = true diff --git a/drand/drand_leader_msg_test.go b/drand/drand_leader_msg_test.go index 93b7256d5..b57c83940 100644 --- a/drand/drand_leader_msg_test.go +++ b/drand/drand_leader_msg_test.go @@ -16,7 +16,7 @@ func TestConstructInitMessage(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil) dRand.blockHash = [32]byte{} msg := dRand.constructInitMessage() diff --git a/drand/drand_test.go b/drand/drand_test.go index b089221a9..24f59fe34 100644 --- a/drand/drand_test.go +++ b/drand/drand_test.go @@ -16,7 +16,7 @@ func TestNew(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil) if !dRand.IsLeader { test.Error("dRand should belong to a leader") diff --git a/drand/drand_validator_msg_test.go b/drand/drand_validator_msg_test.go index aa5e86e8f..659394498 100644 --- a/drand/drand_validator_msg_test.go +++ b/drand/drand_validator_msg_test.go @@ -16,7 +16,7 @@ func TestConstructCommitMessage(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil) dRand.blockHash = [32]byte{} msg := dRand.constructCommitMessage([32]byte{}, []byte{}) diff --git a/node/node.go b/node/node.go index 29a8beb37..b5c093fcc 100644 --- a/node/node.go +++ b/node/node.go @@ -119,7 +119,8 @@ type syncConfig struct { // Node represents a protocol-participating node in the network type Node struct { Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits) - BlockChannel chan *types.Block // The channel to receive new blocks from Node + BlockChannel chan *types.Block // The channel to send newly proposed blocks + ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus transactionInConsensus []*types.Transaction // The transactions selected into the new block and under Consensus process pendingTxMutex sync.Mutex @@ -266,6 +267,7 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { chain, _ := core.NewBlockChain(database, nil, gspec.Config, node.Consensus, vm.Config{}, nil) node.blockchain = chain node.BlockChannel = make(chan *types.Block) + node.ConfirmedBlockChannel = make(chan *types.Block) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID) node.AddFaucetContractToPendingTransactions() diff --git a/node/node_handler.go b/node/node_handler.go index 1137de8e2..eca508d0c 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -241,6 +241,12 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { } node.AddNewBlock(newBlock) + + if node.Role == BeaconLeader && node.DRand != nil { + go func() { + node.ConfirmedBlockChannel <- newBlock + }() + } } // AddNewBlock is usedd to add new block into the blockchain. From ffd06145ec146c6bf22e3c2bd184c0eca292ab98 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 12 Feb 2019 17:12:56 -0800 Subject: [PATCH 4/4] Add message passing between drand leader/validator --- api/proto/common.go | 8 ++++++++ api/service/randomness/service.go | 31 +------------------------------ cmd/harmony.go | 8 +++++--- consensus/consensus_leader.go | 4 ++-- drand/drand.go | 29 +++++++++++++++++++++++++++++ drand/drand_leader.go | 23 +++++++++++++++-------- drand/drand_leader_msg.go | 7 ++++--- drand/drand_leader_msg_test.go | 2 +- drand/drand_validator.go | 2 +- drand/drand_validator_msg.go | 7 ++++--- drand/drand_validator_msg_test.go | 2 +- node/node.go | 4 ++++ node/node_handler.go | 14 +++++++++++++- node/node_test.go | 4 ++++ 14 files changed, 92 insertions(+), 53 deletions(-) diff --git a/api/proto/common.go b/api/proto/common.go index b0cb59be4..88d2ae83b 100644 --- a/api/proto/common.go +++ b/api/proto/common.go @@ -76,6 +76,14 @@ func GetConsensusMessagePayload(message []byte) ([]byte, error) { return message[MessageCategoryBytes:], nil } +// GetDRandMessagePayload gets the randomness message payload from the p2p message content +func GetDRandMessagePayload(message []byte) ([]byte, error) { + if len(message) < MessageCategoryBytes { + return []byte{}, errors.New("failed to get message payload: no data available") + } + return message[MessageCategoryBytes:], nil +} + // ConstructConsensusMessage creates a message with the payload and returns as byte array. func ConstructConsensusMessage(payload []byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(Consensus)}) diff --git a/api/service/randomness/service.go b/api/service/randomness/service.go index 710dd166b..1a4a0bafb 100644 --- a/api/service/randomness/service.go +++ b/api/service/randomness/service.go @@ -21,36 +21,7 @@ func New(dRand *drand.DRand) *Service { func (s *Service) StartService() { s.stopChan = make(chan struct{}) s.stoppedChan = make(chan struct{}) - - s.Init() - s.Run(s.stopChan, s.stoppedChan) -} - -// Init initializes randomness generation. -func (s *Service) Init() { -} - -// Run runs randomness generation. -func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { - utils.GetLogInstance().Info("Running random generation") - go func() { - defer close(stoppedChan) - for { - select { - case newBlock := <-s.DRand.ConfirmedBlockChannel: - _ = newBlock - utils.GetLogInstance().Debug("[RAND] Received New Block") - s.DoRandomGeneration() - case <-stopChan: - return - } - } - }() -} - -// DoRandomGeneration does rarandomnessndom generation. -func (s *Service) DoRandomGeneration() { - + s.DRand.WaitForEpochBlock(s.DRand.ConfirmedBlockChannel, s.stopChan, s.stoppedChan) } // StopService stops randomness generation service. diff --git a/cmd/harmony.go b/cmd/harmony.go index 1bd07f1f5..5d463553c 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -254,9 +254,6 @@ func main() { currentNode.Role = node.BeaconValidator } - // Add randomness protocol - dRand := drand.New(host, shardID, peers, leader, currentNode.ConfirmedBlockChannel) - currentNode.DRand = dRand } else { if role == "leader" { currentNode.Role = node.ShardLeader @@ -265,6 +262,11 @@ func main() { } } + // Add randomness protocol + // TODO: enable drand only for beacon chain + dRand := drand.New(host, shardID, peers, leader, currentNode.ConfirmedBlockChannel) + currentNode.DRand = dRand + // If there is a client configured in the node list. if clientPeer != nil { currentNode.ClientPeer = clientPeer diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index fbaf84861..e1a70a82d 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -136,7 +136,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag } if len(prepareSigs) >= ((len(consensus.PublicKeys)*2)/3 + 1) { - utils.GetLogInstance().Debug("Received additional new prepare message", "validatorID", validatorID) + utils.GetLogInstance().Debug("Received additional prepare message", "validatorID", validatorID) return } @@ -244,7 +244,6 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message copy(blockObj.Header().CommitSignature[:], consensus.aggregatedCommitSig.Serialize()[:]) copy(blockObj.Header().CommitBitmap[:], consensus.commitBitmap.Bitmap) - consensus.OnConsensusDone(&blockObj) consensus.state = targetState select { @@ -262,6 +261,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message consensus.ResetState() consensus.consensusID++ + consensus.OnConsensusDone(&blockObj) utils.GetLogInstance().Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusID", consensus.consensusID, "numOfSignatures", len(commitSigs)) // TODO: remove this temporary delay diff --git a/drand/drand.go b/drand/drand.go index 3599d643d..85c4fbd88 100644 --- a/drand/drand.go +++ b/drand/drand.go @@ -34,6 +34,7 @@ type DRand struct { // Public keys of the committee including leader and validators PublicKeys []*bls.PublicKey + pubKeyLock sync.Mutex // private/public keys of current node priKey *bls.SecretKey @@ -114,6 +115,24 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confi return &dRand } +// AddPeers adds new peers into the validator map of the consensus +// and add the public keys +func (dRand *DRand) AddPeers(peers []*p2p.Peer) int { + count := 0 + + for _, peer := range peers { + _, ok := dRand.validators.Load(utils.GetUniqueIDFromPeer(*peer)) + if !ok { + dRand.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer) + dRand.pubKeyLock.Lock() + dRand.PublicKeys = append(dRand.PublicKeys, peer.PubKey) + dRand.pubKeyLock.Unlock() + } + count++ + } + return count +} + // Sign on the drand message signature field. func (dRand *DRand) signDRandMessage(message *drand_proto.Message) error { message.Signature = nil @@ -199,3 +218,13 @@ func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer { } return &value } + +// ResetState resets the state of the randomness protocol +func (dRand *DRand) ResetState() { + dRand.vrfs = &map[uint32][]byte{} + + bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey) + dRand.bitmap = bitmap + dRand.pRand = nil + dRand.rand = nil +} diff --git a/drand/drand_leader.go b/drand/drand_leader.go index 561309775..31ce31bbf 100644 --- a/drand/drand_leader.go +++ b/drand/drand_leader.go @@ -15,10 +15,8 @@ func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan c for { select { default: - // keep waiting for new blocks + // keep waiting for epoch block newBlock := <-blockChannel - // TODO: think about potential race condition - dRand.init(newBlock) case <-stopChan: return @@ -28,6 +26,8 @@ func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan c } func (dRand *DRand) init(epochBlock *types.Block) { + utils.GetLogInstance().Debug("INITING DRAND") + dRand.ResetState() // Copy over block hash and block header data blockHash := epochBlock.Hash() copy(dRand.blockHash[:], blockHash[:]) @@ -66,8 +66,16 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) { return } + validatorID := message.SenderId + validatorPeer := dRand.getValidatorPeerByID(validatorID) + vrfs := dRand.vrfs + if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) { + utils.GetLogInstance().Debug("Received additional randomness commit message", "validatorID", validatorID) + return + } + // Verify message signature - err := verifyMessageSig(dRand.leader.PubKey, message) + err := verifyMessageSig(validatorPeer.PubKey, message) if err != nil { utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) return @@ -79,15 +87,14 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) { _ = proof // TODO: check the validity of the vrf commit - validatorID := message.SenderId - validatorPeer := dRand.getValidatorPeerByID(validatorID) - vrfs := dRand.vrfs - utils.GetLogInstance().Debug("Received new prepare signature", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) + utils.GetLogInstance().Debug("Received new commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) (*vrfs)[validatorID] = message.Payload dRand.bitmap.SetKey(validatorPeer.PubKey, true) // Set the bitmap indicating that this validator signed. if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) { // Construct pRand and initiate consensus on it + utils.GetLogInstance().Debug("Received enough randomness commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) + // TODO: communicate the pRand to consensus } } diff --git a/drand/drand_leader_msg.go b/drand/drand_leader_msg.go index 48564fadc..44cefc822 100644 --- a/drand/drand_leader_msg.go +++ b/drand/drand_leader_msg.go @@ -7,13 +7,14 @@ import ( ) // Constructs the init message -func (drand *DRand) constructInitMessage() []byte { +func (dRand *DRand) constructInitMessage() []byte { message := drand_proto.Message{} message.Type = drand_proto.MessageType_INIT + message.SenderId = dRand.nodeID - message.BlockHash = drand.blockHash[:] + message.BlockHash = dRand.blockHash[:] // Don't need the payload in init message - marshaledMessage, err := drand.signAndMarshalDRandMessage(&message) + marshaledMessage, err := dRand.signAndMarshalDRandMessage(&message) if err != nil { utils.GetLogInstance().Error("Failed to sign and marshal the init message", "error", err) } diff --git a/drand/drand_leader_msg_test.go b/drand/drand_leader_msg_test.go index b57c83940..390b0e6c8 100644 --- a/drand/drand_leader_msg_test.go +++ b/drand/drand_leader_msg_test.go @@ -20,7 +20,7 @@ func TestConstructInitMessage(test *testing.T) { dRand.blockHash = [32]byte{} msg := dRand.constructInitMessage() - if len(msg) != 87 { + if len(msg) != 93 { test.Errorf("Init message is not constructed in the correct size: %d", len(msg)) } } diff --git a/drand/drand_validator.go b/drand/drand_validator.go index 151becd01..116533f69 100644 --- a/drand/drand_validator.go +++ b/drand/drand_validator.go @@ -17,7 +17,7 @@ func (dRand *DRand) ProcessMessageValidator(payload []byte) { } switch message.Type { - case drand_proto.MessageType_COMMIT: + case drand_proto.MessageType_INIT: dRand.processInitMessage(message) default: utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand) diff --git a/drand/drand_validator_msg.go b/drand/drand_validator_msg.go index d4fca412f..6c3d25d1e 100644 --- a/drand/drand_validator_msg.go +++ b/drand/drand_validator_msg.go @@ -7,14 +7,15 @@ import ( ) // Constructs the init message -func (drand *DRand) constructCommitMessage(vrf [32]byte, proof []byte) []byte { +func (dRand *DRand) constructCommitMessage(vrf [32]byte, proof []byte) []byte { message := drand_proto.Message{} message.Type = drand_proto.MessageType_COMMIT + message.SenderId = dRand.nodeID - message.BlockHash = drand.blockHash[:] + message.BlockHash = dRand.blockHash[:] message.Payload = append(vrf[:], proof...) - marshaledMessage, err := drand.signAndMarshalDRandMessage(&message) + marshaledMessage, err := dRand.signAndMarshalDRandMessage(&message) if err != nil { utils.GetLogInstance().Error("Failed to sign and marshal the commit message", "error", err) } diff --git a/drand/drand_validator_msg_test.go b/drand/drand_validator_msg_test.go index 659394498..14a6917be 100644 --- a/drand/drand_validator_msg_test.go +++ b/drand/drand_validator_msg_test.go @@ -20,7 +20,7 @@ func TestConstructCommitMessage(test *testing.T) { dRand.blockHash = [32]byte{} msg := dRand.constructCommitMessage([32]byte{}, []byte{}) - if len(msg) != 121 { + if len(msg) != 127 { test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) } } diff --git a/node/node.go b/node/node.go index 8f2d81d46..b9f95b409 100644 --- a/node/node.go +++ b/node/node.go @@ -412,6 +412,8 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int { if count > 0 { node.Consensus.AddPeers(peers) + // TODO: make peers into a context object shared by consensus and drand + node.DRand.AddPeers(peers) } return count } @@ -679,6 +681,8 @@ func (node *Node) setupForShardLeader() { node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) + // Register randomness service + node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand)) } func (node *Node) setupForShardValidator() { diff --git a/node/node_handler.go b/node/node_handler.go index da73c579a..3c8da7610 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -117,6 +117,17 @@ func (node *Node) messageHandler(content []byte) { // TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus // we should switch to other state rather than DoingConsensus. } + case proto.DRand: + msgPayload, _ := proto.GetDRandMessagePayload(content) + if node.DRand != nil { + if node.DRand.IsLeader { + utils.GetLogInstance().Info("NET: DRand Leader received message:", "messageCategory", msgCategory, "messageType", msgType) + node.DRand.ProcessMessageLeader(msgPayload) + } else { + utils.GetLogInstance().Info("NET: DRand Validator received message:", "messageCategory", msgCategory, "messageType", msgType) + node.DRand.ProcessMessageValidator(msgPayload) + } + } case proto.Node: actionType := proto_node.MessageType(msgType) switch actionType { @@ -249,7 +260,8 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { } node.AddNewBlock(newBlock) - if node.Role == BeaconLeader && node.DRand != nil { + // TODO: enable drand only for beacon chain + if node.DRand != nil { go func() { node.ConfirmedBlockChannel <- newBlock }() diff --git a/node/node_test.go b/node/node_test.go index d92484127..d08348e1a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/harmony-one/harmony/drand" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" @@ -97,8 +99,10 @@ func TestAddPeers(t *testing.T) { t.Fatalf("newhost failure: %v", err) } consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand := drand.New(host, "0", []p2p.Peer{leader, validator}, leader, nil) node := New(host, consensus, nil) + node.DRand = dRand r1 := node.AddPeers(peers1) e1 := 2 if r1 != e1 {