Merge pull request #433 from harmony-one/rj_branch

Make drand as a service and make message passing work
pull/439/head
Rongjian Lan 6 years ago committed by GitHub
commit aded2838ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      api/proto/common.go
  2. 3
      api/service/manager.go
  3. 59
      api/service/randgen/service.go
  4. 33
      api/service/randomness/service.go
  5. 8
      cmd/harmony.go
  6. 4
      consensus/consensus_leader.go
  7. 46
      drand/drand.go
  8. 23
      drand/drand_leader.go
  9. 7
      drand/drand_leader_msg.go
  10. 4
      drand/drand_leader_msg_test.go
  11. 2
      drand/drand_test.go
  12. 2
      drand/drand_validator.go
  13. 7
      drand/drand_validator_msg.go
  14. 4
      drand/drand_validator_msg_test.go
  15. 17
      node/node.go
  16. 18
      node/node_handler.go
  17. 4
      node/node_test.go

@ -76,6 +76,14 @@ func GetConsensusMessagePayload(message []byte) ([]byte, error) {
return message[MessageCategoryBytes:], nil
}
// GetDRandMessagePayload gets the randomness message payload from the p2p message content
func GetDRandMessagePayload(message []byte) ([]byte, error) {
if len(message) < MessageCategoryBytes {
return []byte{}, errors.New("failed to get message payload: no data available")
}
return message[MessageCategoryBytes:], nil
}
// ConstructConsensusMessage creates a message with the payload and returns as byte array.
func ConstructConsensusMessage(payload []byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(Consensus)})

@ -25,6 +25,7 @@ const (
ClientSupport
SupportExplorer
Consensus
Randomness
BlockProposal
NetworkInfo
PeerDiscovery
@ -43,6 +44,8 @@ func (t Type) String() string {
return "SupportExplorer"
case Consensus:
return "Consensus"
case Randomness:
return "Randomness"
case BlockProposal:
return "BlockProposal"
case NetworkInfo:

@ -1,59 +0,0 @@
package randgen
import (
"github.com/harmony-one/harmony/internal/utils"
)
// Service is the random generation service.
type Service struct {
stopChan chan struct{}
stoppedChan chan struct{}
}
// New returns random generation service.
func New() *Service {
return &Service{}
}
// StartService starts random generation service.
func (s *Service) StartService() {
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.Init()
s.Run(s.stopChan, s.stoppedChan)
}
// Init initializes random generation.
func (s *Service) Init() {
}
// Run runs random generation.
func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) {
go func() {
defer close(stoppedChan)
for {
select {
default:
utils.GetLogInstance().Info("Running random generation")
// Write some logic here.
s.DoRandomGeneration()
case <-stopChan:
return
}
}
}()
}
// DoRandomGeneration does random generation.
func (s *Service) DoRandomGeneration() {
}
// StopService stops random generation service.
func (s *Service) StopService() {
utils.GetLogInstance().Info("Stopping random generation service.")
s.stopChan <- struct{}{}
<-s.stoppedChan
utils.GetLogInstance().Info("Random generation stopped.")
}

@ -0,0 +1,33 @@
package randomness
import (
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/utils"
)
// Service is the randomness generation service.
type Service struct {
stopChan chan struct{}
stoppedChan chan struct{}
DRand *drand.DRand
}
// New returns randomness generation service.
func New(dRand *drand.DRand) *Service {
return &Service{DRand: dRand}
}
// StartService starts randomness generation service.
func (s *Service) StartService() {
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.DRand.WaitForEpochBlock(s.DRand.ConfirmedBlockChannel, s.stopChan, s.stoppedChan)
}
// StopService stops randomness generation service.
func (s *Service) StopService() {
utils.GetLogInstance().Info("Stopping random generation service.")
s.stopChan <- struct{}{}
<-s.stoppedChan
utils.GetLogInstance().Info("Random generation stopped.")
}

@ -9,6 +9,8 @@ import (
"runtime"
"time"
"github.com/harmony-one/harmony/drand"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
peerstore "github.com/libp2p/go-libp2p-peerstore"
@ -251,6 +253,7 @@ func main() {
} else {
currentNode.Role = node.BeaconValidator
}
} else {
if role == "leader" {
currentNode.Role = node.ShardLeader
@ -259,6 +262,11 @@ func main() {
}
}
// Add randomness protocol
// TODO: enable drand only for beacon chain
dRand := drand.New(host, shardID, peers, leader, currentNode.ConfirmedBlockChannel)
currentNode.DRand = dRand
// If there is a client configured in the node list.
if clientPeer != nil {
currentNode.ClientPeer = clientPeer

@ -136,7 +136,7 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag
}
if len(prepareSigs) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
utils.GetLogInstance().Debug("Received additional new prepare message", "validatorID", validatorID)
utils.GetLogInstance().Debug("Received additional prepare message", "validatorID", validatorID)
return
}
@ -244,7 +244,6 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
copy(blockObj.Header().CommitSignature[:], consensus.aggregatedCommitSig.Serialize()[:])
copy(blockObj.Header().CommitBitmap[:], consensus.commitBitmap.Bitmap)
consensus.OnConsensusDone(&blockObj)
consensus.state = targetState
select {
@ -262,6 +261,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
consensus.ResetState()
consensus.consensusID++
consensus.OnConsensusDone(&blockObj)
utils.GetLogInstance().Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusID", consensus.consensusID, "numOfSignatures", len(commitSigs))
// TODO: remove this temporary delay

@ -7,6 +7,8 @@ import (
"strconv"
"sync"
"github.com/harmony-one/harmony/core/types"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
drand_proto "github.com/harmony-one/harmony/api/drand"
@ -17,10 +19,11 @@ import (
// DRand is the main struct which contains state for the distributed randomness protocol.
type DRand struct {
vrfs *map[uint32][]byte
bitmap *bls_cosi.Mask
pRand *[32]byte
rand *[32]byte
vrfs *map[uint32][]byte
bitmap *bls_cosi.Mask
pRand *[32]byte
rand *[32]byte
ConfirmedBlockChannel chan *types.Block // Channel for confirmed blocks
// map of nodeID to validator Peer object
// FIXME: should use PubKey of p2p.Peer as the hashkey
@ -31,6 +34,7 @@ type DRand struct {
// Public keys of the committee including leader and validators
PublicKeys []*bls.PublicKey
pubKeyLock sync.Mutex
// private/public keys of current node
priKey *bls.SecretKey
@ -53,10 +57,14 @@ type DRand struct {
}
// New creates a new dRand object
func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *DRand {
func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer, confirmedBlockChannel chan *types.Block) *DRand {
dRand := DRand{}
dRand.host = host
if confirmedBlockChannel != nil {
dRand.ConfirmedBlockChannel = confirmedBlockChannel
}
selfPeer := host.GetSelfPeer()
if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP {
dRand.IsLeader = true
@ -107,6 +115,24 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *DRan
return &dRand
}
// AddPeers adds new peers into the validator map of the consensus
// and add the public keys
func (dRand *DRand) AddPeers(peers []*p2p.Peer) int {
count := 0
for _, peer := range peers {
_, ok := dRand.validators.Load(utils.GetUniqueIDFromPeer(*peer))
if !ok {
dRand.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer)
dRand.pubKeyLock.Lock()
dRand.PublicKeys = append(dRand.PublicKeys, peer.PubKey)
dRand.pubKeyLock.Unlock()
}
count++
}
return count
}
// Sign on the drand message signature field.
func (dRand *DRand) signDRandMessage(message *drand_proto.Message) error {
message.Signature = nil
@ -192,3 +218,13 @@ func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer {
}
return &value
}
// ResetState resets the state of the randomness protocol
func (dRand *DRand) ResetState() {
dRand.vrfs = &map[uint32][]byte{}
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey)
dRand.bitmap = bitmap
dRand.pRand = nil
dRand.rand = nil
}

@ -15,10 +15,8 @@ func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan c
for {
select {
default:
// keep waiting for new blocks
// keep waiting for epoch block
newBlock := <-blockChannel
// TODO: think about potential race condition
dRand.init(newBlock)
case <-stopChan:
return
@ -28,6 +26,8 @@ func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan c
}
func (dRand *DRand) init(epochBlock *types.Block) {
utils.GetLogInstance().Debug("INITING DRAND")
dRand.ResetState()
// Copy over block hash and block header data
blockHash := epochBlock.Hash()
copy(dRand.blockHash[:], blockHash[:])
@ -66,8 +66,16 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) {
return
}
validatorID := message.SenderId
validatorPeer := dRand.getValidatorPeerByID(validatorID)
vrfs := dRand.vrfs
if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) {
utils.GetLogInstance().Debug("Received additional randomness commit message", "validatorID", validatorID)
return
}
// Verify message signature
err := verifyMessageSig(dRand.leader.PubKey, message)
err := verifyMessageSig(validatorPeer.PubKey, message)
if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err)
return
@ -79,15 +87,14 @@ func (dRand *DRand) processCommitMessage(message drand_proto.Message) {
_ = proof
// TODO: check the validity of the vrf commit
validatorID := message.SenderId
validatorPeer := dRand.getValidatorPeerByID(validatorID)
vrfs := dRand.vrfs
utils.GetLogInstance().Debug("Received new prepare signature", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
utils.GetLogInstance().Debug("Received new commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
(*vrfs)[validatorID] = message.Payload
dRand.bitmap.SetKey(validatorPeer.PubKey, true) // Set the bitmap indicating that this validator signed.
if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) {
// Construct pRand and initiate consensus on it
utils.GetLogInstance().Debug("Received enough randomness commit", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys))
// TODO: communicate the pRand to consensus
}
}

@ -7,13 +7,14 @@ import (
)
// Constructs the init message
func (drand *DRand) constructInitMessage() []byte {
func (dRand *DRand) constructInitMessage() []byte {
message := drand_proto.Message{}
message.Type = drand_proto.MessageType_INIT
message.SenderId = dRand.nodeID
message.BlockHash = drand.blockHash[:]
message.BlockHash = dRand.blockHash[:]
// Don't need the payload in init message
marshaledMessage, err := drand.signAndMarshalDRandMessage(&message)
marshaledMessage, err := dRand.signAndMarshalDRandMessage(&message)
if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the init message", "error", err)
}

@ -16,11 +16,11 @@ func TestConstructInitMessage(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader)
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil)
dRand.blockHash = [32]byte{}
msg := dRand.constructInitMessage()
if len(msg) != 87 {
if len(msg) != 93 {
test.Errorf("Init message is not constructed in the correct size: %d", len(msg))
}
}

@ -16,7 +16,7 @@ func TestNew(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader)
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil)
if !dRand.IsLeader {
test.Error("dRand should belong to a leader")

@ -17,7 +17,7 @@ func (dRand *DRand) ProcessMessageValidator(payload []byte) {
}
switch message.Type {
case drand_proto.MessageType_COMMIT:
case drand_proto.MessageType_INIT:
dRand.processInitMessage(message)
default:
utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand)

@ -7,14 +7,15 @@ import (
)
// Constructs the init message
func (drand *DRand) constructCommitMessage(vrf [32]byte, proof []byte) []byte {
func (dRand *DRand) constructCommitMessage(vrf [32]byte, proof []byte) []byte {
message := drand_proto.Message{}
message.Type = drand_proto.MessageType_COMMIT
message.SenderId = dRand.nodeID
message.BlockHash = drand.blockHash[:]
message.BlockHash = dRand.blockHash[:]
message.Payload = append(vrf[:], proof...)
marshaledMessage, err := drand.signAndMarshalDRandMessage(&message)
marshaledMessage, err := dRand.signAndMarshalDRandMessage(&message)
if err != nil {
utils.GetLogInstance().Error("Failed to sign and marshal the commit message", "error", err)
}

@ -16,11 +16,11 @@ func TestConstructCommitMessage(test *testing.T) {
if err != nil {
test.Fatalf("newhost failure: %v", err)
}
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader)
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader, nil)
dRand.blockHash = [32]byte{}
msg := dRand.constructCommitMessage([32]byte{}, []byte{})
if len(msg) != 121 {
if len(msg) != 127 {
test.Errorf("Commit message is not constructed in the correct size: %d", len(msg))
}
}

@ -12,6 +12,8 @@ import (
"sync"
"time"
"github.com/harmony-one/harmony/drand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
@ -28,6 +30,7 @@ import (
"github.com/harmony-one/harmony/api/service/discovery"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/api/service/networkinfo"
randomness_service "github.com/harmony-one/harmony/api/service/randomness"
"github.com/harmony-one/harmony/api/service/staking"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
@ -116,10 +119,12 @@ type syncConfig struct {
// Node represents a protocol-participating node in the network
type Node struct {
Consensus *bft.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
BlockChannel chan *types.Block // The channel to receive new blocks from Node
BlockChannel chan *types.Block // The channel to send newly proposed blocks
ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks
pendingTransactions types.Transactions // All the transactions received but not yet processed for Consensus
transactionInConsensus []*types.Transaction // The transactions selected into the new block and under Consensus process
pendingTxMutex sync.Mutex
DRand *drand.DRand // The instance for distributed randomness protocol
blockchain *core.BlockChain // The blockchain for the shard where this node belongs
db *ethdb.LDBDatabase // LevelDB to store blockchain.
@ -267,6 +272,7 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
chain, _ := core.NewBlockChain(database, nil, gspec.Config, node.Consensus, vm.Config{}, nil)
node.blockchain = chain
node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain)
node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID)
node.AddFaucetContractToPendingTransactions()
@ -406,6 +412,8 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
if count > 0 {
node.Consensus.AddPeers(peers)
// TODO: make peers into a context object shared by consensus and drand
node.DRand.AddPeers(peers)
}
return count
}
@ -673,6 +681,8 @@ func (node *Node) setupForShardLeader() {
node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port))
// Register randomness service
node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand))
}
func (node *Node) setupForShardValidator() {
@ -699,6 +709,9 @@ func (node *Node) setupForBeaconLeader() {
node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port))
// Register randomness service
node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand))
}
func (node *Node) setupForBeaconValidator() {
@ -715,6 +728,8 @@ func (node *Node) setupForBeaconValidator() {
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer))
// Register randomness service
node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand))
}
func (node *Node) setupForNewNode() {

@ -117,6 +117,17 @@ func (node *Node) messageHandler(content []byte) {
// TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus
// we should switch to other state rather than DoingConsensus.
}
case proto.DRand:
msgPayload, _ := proto.GetDRandMessagePayload(content)
if node.DRand != nil {
if node.DRand.IsLeader {
utils.GetLogInstance().Info("NET: DRand Leader received message:", "messageCategory", msgCategory, "messageType", msgType)
node.DRand.ProcessMessageLeader(msgPayload)
} else {
utils.GetLogInstance().Info("NET: DRand Validator received message:", "messageCategory", msgCategory, "messageType", msgType)
node.DRand.ProcessMessageValidator(msgPayload)
}
}
case proto.Node:
actionType := proto_node.MessageType(msgType)
switch actionType {
@ -248,6 +259,13 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
node.BroadcastNewBlock(newBlock)
}
node.AddNewBlock(newBlock)
// TODO: enable drand only for beacon chain
if node.DRand != nil {
go func() {
node.ConfirmedBlockChannel <- newBlock
}()
}
}
// AddNewBlock is usedd to add new block into the blockchain.

@ -7,6 +7,8 @@ import (
"testing"
"time"
"github.com/harmony-one/harmony/drand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
@ -97,8 +99,10 @@ func TestAddPeers(t *testing.T) {
t.Fatalf("newhost failure: %v", err)
}
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
dRand := drand.New(host, "0", []p2p.Peer{leader, validator}, leader, nil)
node := New(host, consensus, nil)
node.DRand = dRand
r1 := node.AddPeers(peers1)
e1 := 2
if r1 != e1 {

Loading…
Cancel
Save