Merge pull request #681 from harmony-one/rj_branch

Separate functionality and genesis block between beacon and shard chain
pull/692/head
Rongjian Lan 6 years ago committed by GitHub
commit 25dbb3fa58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      api/service/syncing/downloader/server.go
  2. 14
      cmd/harmony/main.go
  3. 49
      consensus/consensus_leader.go
  4. 39
      node/node.go
  5. 10
      node/node_genesis.go
  6. 78
      node/node_handler.go
  7. 7
      node/node_newblock.go
  8. 8
      node/node_syncing.go
  9. 2
      node/service_setup.go
  10. 4
      test/deploy.sh

@ -17,6 +17,7 @@ const (
// Server is the Server struct for downloader package.
type Server struct {
downloadInterface DownloadInterface
GrpcServer *grpc.Server
}
// Query returns the feature at the given point.
@ -39,6 +40,8 @@ func (s *Server) Start(ip, port string) (*grpc.Server, error) {
grpcServer := grpc.NewServer(opts...)
pb.RegisterDownloaderServer(grpcServer, s)
go grpcServer.Serve(lis)
s.GrpcServer = grpcServer
return grpcServer, nil
}

@ -167,7 +167,7 @@ func createGlobalConfig() *nodeconfig.ConfigType {
if nodeConfig.MainDB, err = InitLDBDatabase(*ip, *port, *freshDB, false); err != nil {
panic(err)
}
if !*isGenesis {
if shardID != 0 {
if nodeConfig.BeaconDB, err = InitLDBDatabase(*ip, *port, *freshDB, true); err != nil {
panic(err)
}
@ -238,8 +238,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) (*consensus.Consen
currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID)))
}
} else {
currentNode.AddBeaconChainDatabase(nodeConfig.BeaconDB)
if *isNewNode {
currentNode.NodeConfig.SetRole(nodeconfig.NewNode)
// TODO: fix the roles as it's unknown before resharding.
@ -262,6 +260,12 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) (*consensus.Consen
currentNode.Consensus.RegisterRndChannel(dRand.RndChannel)
currentNode.DRand = dRand
if consensus.ShardID != 0 {
currentNode.AddBeaconChainDatabase(nodeConfig.BeaconDB)
}
// This needs to be executed after consensus and drand are setup
currentNode.InitGenesisShardState()
// Assign closure functions to the consensus object
consensus.BlockVerifier = currentNode.VerifyNewBlock
consensus.OnConsensusDone = currentNode.PostConsensusProcessing
@ -293,6 +297,10 @@ func main() {
if consensus.IsLeader {
go currentNode.SendPongMessage()
}
// TODO: enable beacon chain sync
//if consensus.ShardID != 0 {
// go currentNode.SupportBeaconSyncing()
//}
go currentNode.SupportSyncing()
utils.GetLogInstance().Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty()))
currentNode.ServiceManagerSetup()

@ -55,28 +55,31 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop
utils.GetLogInstance().Debug("WaitForNewBlock", "removed peers", c)
}
if core.IsEpochBlock(newBlock) {
// Receive pRnd from DRG protocol
utils.GetLogInstance().Debug("[DRG] Waiting for pRnd")
pRndAndBitmap := <-consensus.PRndChannel
utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap)
pRnd := [32]byte{}
copy(pRnd[:], pRndAndBitmap[:32])
bitmap := pRndAndBitmap[32:]
vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.ConsensusPubKey)
vrfBitmap.SetMask(bitmap)
// TODO: check validity of pRnd
newBlock.AddRandPreimage(pRnd)
}
rnd, blockHash, err := consensus.GetNextRnd()
if err == nil {
// Verify the randomness
_ = blockHash
utils.GetLogInstance().Info("Adding randomness into new block", "rnd", rnd)
newBlock.AddRandSeed(rnd)
} else {
utils.GetLogInstance().Info("Failed to get randomness", "error", err)
if consensus.ShardID == 0 {
if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation
// Receive pRnd from DRG protocol
utils.GetLogInstance().Debug("[DRG] Waiting for pRnd")
pRndAndBitmap := <-consensus.PRndChannel
utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap)
pRnd := [32]byte{}
copy(pRnd[:], pRndAndBitmap[:32])
bitmap := pRndAndBitmap[32:]
vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.ConsensusPubKey)
vrfBitmap.SetMask(bitmap)
// TODO: check validity of pRnd
newBlock.AddRandPreimage(pRnd)
}
rnd, blockHash, err := consensus.GetNextRnd()
if err == nil {
// Verify the randomness
_ = blockHash
utils.GetLogInstance().Info("Adding randomness into new block", "rnd", rnd)
newBlock.AddRandSeed(rnd)
} else {
utils.GetLogInstance().Info("Failed to get randomness", "error", err)
}
}
startTime = time.Now()
utils.GetLogInstance().Debug("STARTING CONSENSUS", "numTxs", len(newBlock.Transactions()), "consensus", consensus, "startTime", startTime, "publicKeys", len(consensus.PublicKeys))
@ -312,7 +315,7 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) {
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
utils.GetLogInstance().Info("[SYNC] consensus verified block send to chan failed", "blockHash", blockObj.Hash())
utils.GetLogInstance().Info("[SYNC] Failed to send consensus verified block for state sync", "blockHash", blockObj.Hash())
}
consensus.reportMetrics(blockObj)

@ -243,13 +243,13 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is
database := db
if database == nil {
database = ethdb.NewMemDatabase()
chain, err = node.GenesisBlockSetup(database, false)
chain, err = node.GenesisBlockSetup(database, consensusObj.ShardID, false)
isFirstTime = true
} else {
chain, err = node.InitBlockChainFromDB(db, node.Consensus, isArchival)
isFirstTime = false
if err != nil || chain == nil || chain.CurrentBlock().NumberU64() <= 0 {
chain, err = node.GenesisBlockSetup(database, isArchival)
chain, err = node.GenesisBlockSetup(database, consensusObj.ShardID, isArchival)
isFirstTime = true
}
}
@ -267,18 +267,20 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
if isFirstTime {
// Setup one time smart contracts
node.AddFaucetContractToPendingTransactions()
node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
// TODO(minhdoan): Think of a better approach to deploy smart contract.
// This is temporary for demo purpose.
node.AddLotteryContract()
} else {
node.AddContractKeyAndAddress()
if node.Consensus.ShardID == 0 {
// Contracts only exist in beacon chain
if isFirstTime {
// Setup one time smart contracts
node.AddFaucetContractToPendingTransactions()
node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
// TODO(minhdoan): Think of a better approach to deploy smart contract.
// This is temporary for demo purpose.
node.AddLotteryContract()
} else {
node.AddContractKeyAndAddress()
}
}
}
node.ContractCaller = contracts.NewContractCaller(&db, node.blockchain, params.TestChainConfig)
@ -315,7 +317,14 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is
// InitGenesisShardState initialize genesis shard state and update committee pub keys for consensus and drand
func (node *Node) InitGenesisShardState() {
// Store the genesis shard state into db.
shardState := node.blockchain.StoreNewShardState(node.blockchain.CurrentBlock(), nil)
shardState := types.ShardState{}
if node.Consensus != nil {
if node.Consensus.ShardID == 0 {
shardState = node.blockchain.StoreNewShardState(node.blockchain.CurrentBlock(), nil)
} else {
shardState = node.beaconChain.StoreNewShardState(node.beaconChain.CurrentBlock(), nil)
}
}
// Update validator public keys
for _, shard := range shardState {
if shard.ShardID == node.Consensus.ShardID {
@ -428,7 +437,7 @@ func (node *Node) AddBeaconChainDatabase(db ethdb.Database) {
database = ethdb.NewMemDatabase()
}
// TODO (chao) currently we use the same genesis block as normal shard
chain, err := node.GenesisBlockSetup(database, false)
chain, err := node.GenesisBlockSetup(database, 0, false)
if err != nil {
utils.GetLogInstance().Error("Error when doing genesis setup")
os.Exit(1)

@ -26,7 +26,7 @@ const (
)
// GenesisBlockSetup setups a genesis blockchain.
func (node *Node) GenesisBlockSetup(db ethdb.Database, isArchival bool) (*core.BlockChain, error) {
func (node *Node) GenesisBlockSetup(db ethdb.Database, shardID uint32, isArchival bool) (*core.BlockChain, error) {
// Initialize genesis block and blockchain
// Tests account for txgen to use
genesisAlloc := node.CreateGenesisAllocWithTestingAddresses(FakeAddressNumber)
@ -39,11 +39,13 @@ func (node *Node) GenesisBlockSetup(db ethdb.Database, isArchival bool) (*core.B
genesisAlloc[contractDeployerAddress] = core.GenesisAccount{Balance: contractDeployerFunds}
node.ContractDeployerKey = contractDeployerKey
// Accounts used by validator/nodes to stake and participate in the network.
AddNodeAddressesToGenesisAlloc(genesisAlloc)
if shardID == 0 {
// Accounts used by validator/nodes to stake and participate in the network.
AddNodeAddressesToGenesisAlloc(genesisAlloc)
}
chainConfig := params.TestChainConfig
chainConfig.ChainID = big.NewInt(int64(node.Consensus.ShardID)) // Use ChainID as piggybacked ShardID
chainConfig.ChainID = big.NewInt(int64(shardID)) // Use ChainID as piggybacked ShardID
gspec := core.Genesis{
Config: chainConfig,
Alloc: genesisAlloc,

@ -288,52 +288,54 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
node.AddNewBlock(newBlock)
// TODO: enable drand only for beacon chain
// ConfirmedBlockChannel which is listened by drand leader who will initiate DRG if its a epoch block (first block of a epoch)
if node.DRand != nil {
go func() {
node.ConfirmedBlockChannel <- newBlock
}()
}
if node.Consensus.ShardID == 0 {
// ConfirmedBlockChannel which is listened by drand leader who will initiate DRG if its a epoch block (first block of a epoch)
if node.DRand != nil {
go func() {
node.ConfirmedBlockChannel <- newBlock
}()
}
utils.GetLogInstance().Info("Updating staking list")
node.UpdateStakingList(node.QueryStakeInfo())
// node.printStakingList()
if core.IsEpochBlock(newBlock) {
shardState := node.blockchain.StoreNewShardState(newBlock, &node.CurrentStakes)
if shardState != nil {
myShard := uint32(math.MaxUint32)
isLeader := false
myBlsPubKey := node.Consensus.PubKey.Serialize()
for _, shard := range shardState {
for _, nodeID := range shard.NodeList {
if bytes.Compare(nodeID.BlsPublicKey[:], myBlsPubKey) == 0 {
myShard = shard.ShardID
isLeader = shard.Leader == nodeID
utils.GetLogInstance().Info("Updating staking list")
node.UpdateStakingList(node.QueryStakeInfo())
// node.printStakingList()
if core.IsEpochBlock(newBlock) {
shardState := node.blockchain.StoreNewShardState(newBlock, &node.CurrentStakes)
if shardState != nil {
myShard := uint32(math.MaxUint32)
isLeader := false
myBlsPubKey := node.Consensus.PubKey.Serialize()
for _, shard := range shardState {
for _, nodeID := range shard.NodeList {
if bytes.Compare(nodeID.BlsPublicKey[:], myBlsPubKey) == 0 {
myShard = shard.ShardID
isLeader = shard.Leader == nodeID
}
}
}
}
if myShard != uint32(math.MaxUint32) {
aboutLeader := ""
if node.Consensus.IsLeader {
aboutLeader = "I am not leader anymore"
if isLeader {
aboutLeader = "I am still leader"
if myShard != uint32(math.MaxUint32) {
aboutLeader := ""
if node.Consensus.IsLeader {
aboutLeader = "I am not leader anymore"
if isLeader {
aboutLeader = "I am still leader"
}
} else {
aboutLeader = "I am still validator"
if isLeader {
aboutLeader = "I become the leader"
}
}
} else {
aboutLeader = "I am still validator"
if isLeader {
aboutLeader = "I become the leader"
if node.blockchain.ShardID() == myShard {
utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] I stay at shard %d, %s", core.GetEpochFromBlockNumber(newBlock.NumberU64()), myShard, aboutLeader), "BlsPubKey", hex.EncodeToString(myBlsPubKey))
} else {
utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] I got resharded to shard %d from shard %d, %s", core.GetEpochFromBlockNumber(newBlock.NumberU64()), myShard, node.blockchain.ShardID(), aboutLeader), "BlsPubKey", hex.EncodeToString(myBlsPubKey))
}
}
if node.blockchain.ShardID() == myShard {
utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] I stay at shard %d, %s", core.GetEpochFromBlockNumber(newBlock.NumberU64()), myShard, aboutLeader), "BlsPubKey", hex.EncodeToString(myBlsPubKey))
} else {
utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] I got resharded to shard %d from shard %d, %s", core.GetEpochFromBlockNumber(newBlock.NumberU64()), myShard, node.blockchain.ShardID(), aboutLeader), "BlsPubKey", hex.EncodeToString(myBlsPubKey))
utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] Somehow I got kicked out", core.GetEpochFromBlockNumber(newBlock.NumberU64())), "BlsPubKey", hex.EncodeToString(myBlsPubKey))
}
} else {
utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] Somehow I got kicked out", core.GetEpochFromBlockNumber(newBlock.NumberU64())), "BlsPubKey", hex.EncodeToString(myBlsPubKey))
}
}
}

@ -60,9 +60,10 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan
if err != nil {
utils.GetLogInstance().Debug("Failed commiting new block", "Error", err)
} else {
// add new shard state if it's epoch block
// TODO(minhdoan): only happens for beaconchain
node.addNewShardStateHash(block)
if node.Consensus.ShardID == 0 {
// add new shard state if it's epoch block
node.addNewShardStateHash(block)
}
newBlock = block
utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len())
break

@ -111,13 +111,17 @@ func (node *Node) SupportSyncing() {
// InitSyncingServer starts downloader server.
func (node *Node) InitSyncingServer() {
node.downloaderServer = downloader.NewServer(node)
if node.downloaderServer == nil {
node.downloaderServer = downloader.NewServer(node)
}
}
// StartSyncingServer starts syncing server.
func (node *Node) StartSyncingServer() {
utils.GetLogInstance().Info("support_sycning: StartSyncingServer")
node.downloaderServer.Start(node.SelfPeer.IP, syncing.GetSyncingPort(node.SelfPeer.Port))
if node.downloaderServer.GrpcServer == nil {
node.downloaderServer.Start(node.SelfPeer.IP, syncing.GetSyncingPort(node.SelfPeer.Port))
}
}
// SendNewBlockToUnsync send latest verified block to unsync, registered nodes

@ -33,8 +33,6 @@ func (node *Node) setupForShardLeader() {
node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port))
// Register randomness service
node.serviceManager.RegisterService(service.Randomness, randomness.New(node.DRand))
}
func (node *Node) setupForShardValidator() {

@ -70,7 +70,7 @@ EOU
DB=
TXGEN=true
DURATION=90
DURATION=60
MIN=5
SHARDS=2
KILLPORT=9004
@ -99,7 +99,7 @@ if [ -z "$config" ]; then
fi
if [ "$SYNC" == "true" ]; then
DURATION=300
DURATION=120
fi
# Kill nodes if any

Loading…
Cancel
Save