Make shard chain follow beacon chain's epoch after stakingEpoch

pull/1870/head
Rongjian Lan 5 years ago
parent 9070421462
commit e79ba5fe88
  1. 6
      block/factory/factory.go
  2. 21
      consensus/consensus_service.go
  3. 1
      consensus/consensus_v2.go
  4. 34
      core/blockchain.go
  5. 51
      internal/params/config.go
  6. 25
      node/node.go
  7. 7
      node/node_handler.go
  8. 18
      node/node_newblock.go
  9. 51
      node/worker/worker.go
  10. 60
      shard/committee/assignment.go

@ -30,11 +30,11 @@ func NewFactory(chainConfig *params.ChainConfig) Factory {
func (f *factory) NewHeader(epoch *big.Int) *block.Header {
var impl blockif.Header
switch {
case epoch.Cmp(f.chainConfig.StakingEpoch) >= 0:
case f.chainConfig.IsPreStaking(epoch):
impl = v3.NewHeader()
case epoch.Cmp(f.chainConfig.CrossLinkEpoch) >= 0:
case f.chainConfig.IsCrossLink(epoch):
impl = v2.NewHeader()
case epoch.Cmp(f.chainConfig.CrossTxEpoch) >= 0:
case f.chainConfig.IsCrossTx(epoch):
impl = v1.NewHeader()
default:
impl = v0.NewHeader()

@ -465,7 +465,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.Decider.SetShardIDProvider(func() (uint32, error) {
return consensus.ShardID, nil
})
s, err := committee.WithStakingEnabled.Compute(
s, err := committee.WithStakingEnabled.ReadFromDB(
next, consensus.ChainReader,
)
@ -473,12 +473,10 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
utils.Logger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error when computing committee with staking")
Msg("Error when reading staking committee")
return Syncing
}
utils.Logger().Print("XXXXXXXX")
utils.Logger().Print(s.FindCommitteeByID(consensus.ShardID).Slots)
if _, err := consensus.Decider.SetVoters(
s.FindCommitteeByID(consensus.ShardID).Slots,
); err != nil {
@ -504,7 +502,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
epoch := header.Epoch()
// TODO: change GetCommitteePublicKeys to read from DB
curShardState, err := committee.WithStakingEnabled.Compute(
curShardState, err := committee.WithStakingEnabled.ReadFromDB(
epoch, consensus.ChainReader,
)
if err != nil {
@ -514,9 +512,11 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
Msg("Error retrieving current shard state")
return Syncing
}
curCommittee := curShardState.FindCommitteeByID(header.ShardID())
curPubKeys := committee.WithStakingEnabled.GetCommitteePublicKeys(
curShardState,
)[int(header.ShardID())]
curCommittee,
)
consensus.numPrevPubKeys = len(curPubKeys)
consensus.getLogger().Info().Msg("[UpdateConsensusInformation] Updating.....")
if shard.Schedule.IsLastBlock(header.Number().Uint64()) {
@ -525,7 +525,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.getLogger().Info().Uint64("headerNum", header.Number().Uint64()).
Msg("[UpdateConsensusInformation] Epoch updated for next epoch")
nextShardState, err := committee.WithStakingEnabled.Compute(
nextShardState, err := committee.WithStakingEnabled.ReadFromDB(
new(big.Int).Add(epoch, common.Big1), consensus.ChainReader,
)
if err != nil {
@ -536,9 +536,10 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
return Syncing
}
nextCommittee := nextShardState.FindCommitteeByID(header.ShardID())
pubKeys = committee.WithStakingEnabled.GetCommitteePublicKeys(
nextShardState,
)[int(header.ShardID())]
nextCommittee,
)
} else {
consensus.SetEpochNum(epoch.Uint64())
pubKeys = curPubKeys

@ -830,6 +830,7 @@ func (consensus *Consensus) finalizeCommits() {
utils.Logger().Info().
Uint64("blockNum", block.NumberU64()).
Uint64("epochNum", block.Epoch().Uint64()).
Uint64("ViewId", block.Header().ViewID().Uint64()).
Str("blockHash", block.Hash().String()).
Int("index", consensus.Decider.IndexOf(consensus.PubKey)).

@ -28,6 +28,7 @@ import (
"time"
"github.com/harmony-one/harmony/crypto/bls"
lru "github.com/hashicorp/golang-lru"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
@ -48,9 +49,7 @@ import (
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/shard/committee"
staking "github.com/harmony-one/harmony/staking/types"
lru "github.com/hashicorp/golang-lru"
)
var (
@ -1961,37 +1960,6 @@ func (bc *BlockChain) GetVrfByNumber(number uint64) []byte {
return header.Vrf()
}
// GetShardState returns the shard state for the given epoch,
// creating one if needed.
// TODO: [STAKING]
func (bc *BlockChain) GetShardState(epoch *big.Int) (shard.State, error) {
shardState, err := bc.ReadShardState(epoch)
if err == nil { // TODO ek – distinguish ErrNotFound
return shardState, err
}
if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 {
shardState, err = committee.WithStakingEnabled.Compute(
big.NewInt(GenesisEpoch), nil,
)
} else {
prevEpoch := new(big.Int).Sub(epoch, common.Big1)
shardState, err = committee.WithStakingEnabled.ReadFromDB(
prevEpoch, bc,
)
}
if err != nil {
return nil, err
}
err = bc.WriteShardState(epoch, shardState)
if err != nil {
return nil, err
}
utils.Logger().Debug().Str("epoch", epoch.String()).Msg("saved new shard state")
return shardState, nil
}
// ChainDb returns the database
func (bc *BlockChain) ChainDb() ethdb.Database { return bc.db }

@ -23,33 +23,36 @@ var EpochTBD = big.NewInt(10000000)
var (
// MainnetChainConfig is the chain parameters to run a node on the main network.
MainnetChainConfig = &ChainConfig{
ChainID: MainnetChainID,
CrossTxEpoch: big.NewInt(28),
CrossLinkEpoch: EpochTBD,
StakingEpoch: EpochTBD,
EIP155Epoch: big.NewInt(28),
S3Epoch: big.NewInt(28),
ChainID: MainnetChainID,
CrossTxEpoch: big.NewInt(28),
CrossLinkEpoch: EpochTBD,
StakingEpoch: EpochTBD,
PreStakingEpoch: EpochTBD,
EIP155Epoch: big.NewInt(28),
S3Epoch: big.NewInt(28),
}
// TestnetChainConfig contains the chain parameters to run a node on the harmony test network.
TestnetChainConfig = &ChainConfig{
ChainID: TestnetChainID,
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: big.NewInt(0),
StakingEpoch: big.NewInt(3),
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
ChainID: TestnetChainID,
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: big.NewInt(2),
StakingEpoch: big.NewInt(3),
PreStakingEpoch: big.NewInt(0),
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
}
// PangaeaChainConfig contains the chain parameters for the Pangaea network.
// All features except for CrossLink are enabled at launch.
PangaeaChainConfig = &ChainConfig{
ChainID: PangaeaChainID,
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: EpochTBD,
StakingEpoch: EpochTBD,
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
ChainID: PangaeaChainID,
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: EpochTBD,
StakingEpoch: EpochTBD,
PreStakingEpoch: EpochTBD,
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
}
// AllProtocolChanges ...
@ -60,6 +63,7 @@ var (
big.NewInt(0), // CrossTxEpoch
big.NewInt(0), // CrossLinkEpoch
big.NewInt(0), // StakingEpoch
big.NewInt(0), // PreStakingEpoch
big.NewInt(0), // EIP155Epoch
big.NewInt(0), // S3Epoch
}
@ -72,6 +76,7 @@ var (
big.NewInt(0), // CrossTxEpoch
big.NewInt(0), // CrossLinkEpoch
big.NewInt(0), // StakingEpoch
big.NewInt(0), // PreStakingEpoch
big.NewInt(0), // EIP155Epoch
big.NewInt(0), // S3Epoch
}
@ -108,9 +113,12 @@ type ChainConfig struct {
// cross-shard links.
CrossLinkEpoch *big.Int `json:"crossLinkEpoch,omitempty"`
// StakingEpoch is the epoch we allow staking transactions
// StakingEpoch is the epoch when shard assign takes staking into account
StakingEpoch *big.Int `json:"stakingEpoch,omitempty"`
// PreStakingEpoch is the epoch we allow staking transactions
PreStakingEpoch *big.Int `json:"preStakingEpoch,omitempty"`
EIP155Epoch *big.Int `json:"eip155Epoch,omitempty"` // EIP155 hard fork epoch (include EIP158 too)
S3Epoch *big.Int `json:"s3Epoch,omitempty"` // S3 epoch is the first epoch containing S3 mainnet and all ethereum update up to Constantinople
}
@ -151,6 +159,11 @@ func (c *ChainConfig) IsStaking(epoch *big.Int) bool {
return isForked(c.StakingEpoch, epoch)
}
// IsPreStaking determines whether staking transactions are allowed
func (c *ChainConfig) IsPreStaking(epoch *big.Int) bool {
return isForked(c.PreStakingEpoch, epoch)
}
// IsCrossLink returns whether epoch is either equal to the CrossLink fork epoch or greater.
func (c *ChainConfig) IsCrossLink(epoch *big.Int) bool {
return isForked(c.CrossLinkEpoch, epoch)

@ -278,17 +278,20 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) {
// Add new staking transactions to the pending staking transaction list.
func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) {
txPoolLimit := 1000 // TODO: incorporate staking txn into TxPool
node.pendingStakingTxMutex.Lock()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
node.pendingStakingTransactions[tx.Hash()] = tx
}
if len(node.pendingStakingTransactions) > txPoolLimit {
break
if node.Blockchain().Config().IsPreStaking(node.Worker.GetNewEpoch()) {
node.pendingStakingTxMutex.Lock()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
node.pendingStakingTransactions[tx.Hash()] = tx
}
if len(node.pendingStakingTransactions) > txPoolLimit {
break
}
}
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions")
node.pendingStakingTxMutex.Unlock()
}
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingStakingTransactions)).Msg("Got more staking transactions")
node.pendingStakingTxMutex.Unlock()
}
// AddPendingStakingTransaction staking transactions
@ -490,8 +493,8 @@ func (node *Node) InitConsensusWithValidators() (err error) {
epoch, node.Consensus.ChainReader,
)
pubKeys := committee.WithStakingEnabled.GetCommitteePublicKeys(
shardState,
)[int(shardID)]
shardState.FindCommitteeByID(shardID),
)
if len(pubKeys) == 0 {
utils.Logger().Error().
Uint32("shardID", shardID).

@ -342,7 +342,12 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
node.BroadcastCXReceipts(newBlock, commitSigAndBitmap)
} else {
utils.Logger().Info().
Uint64("BlockNum", newBlock.NumberU64()).
Uint64("blockNum", newBlock.NumberU64()).
Uint64("epochNum", newBlock.Epoch().Uint64()).
Uint64("ViewId", newBlock.Header().ViewID().Uint64()).
Str("blockHash", newBlock.Hash().String()).
Int("numTxns", len(newBlock.Transactions())).
Int("numStakingTxns", len(newBlock.StakingTransactions())).
Msg("BINGO !!! Reached Consensus")
// 15% of the validator also need to do broadcasting
rand.Seed(time.Now().UTC().UnixNano())

@ -78,6 +78,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch
func (node *Node) proposeNewBlock() (*types.Block, error) {
// Update worker's current header and state data in preparation to propose/process new transactions
coinbase := node.Consensus.SelfAddress
node.Worker.UpdateCurrent(coinbase)
// Prepare transactions including staking transactions
pending, err := node.TxPool.Pending()
@ -88,14 +89,16 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
// TODO: integrate staking transaction into tx pool
pendingStakingTransactions := types2.StakingTransactions{}
node.pendingStakingTxMutex.Lock()
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
// Only process staking transactions after pre-staking epoch happened.
if node.Blockchain().Config().IsPreStaking(node.Worker.GetNewEpoch()) {
node.pendingStakingTxMutex.Lock()
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
node.pendingStakingTxMutex.Unlock()
}
node.pendingStakingTransactions = make(map[common.Hash]*types2.StakingTransaction)
node.pendingStakingTxMutex.Unlock()
node.Worker.UpdateCurrent(coinbase)
if err := node.Worker.CommitTransactions(pending, pendingStakingTransactions, coinbase); err != nil {
utils.Logger().Error().Err(err).Msg("cannot commit transactions")
return nil, err
@ -119,8 +122,9 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
}
// Prepare shard state
// NOTE: this will potentially override shard chain's epoch to beacon chain's epoch during staking migration period.
shardState, err := node.Worker.SuperCommitteeForNextEpoch(
node.Consensus.ShardID, node.Blockchain(),
node.Consensus.ShardID, node.Beaconchain(),
)
if err != nil {

@ -288,38 +288,65 @@ func (w *Worker) SuperCommitteeForNextEpoch(
) (shard.State, error) {
var (
nextCommittee shard.State
oops error
err error
)
switch shardID {
case shard.BeaconChainShardID:
if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
nextCommittee, oops = committee.WithStakingEnabled.Compute(
nextCommittee, err = committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1),
beacon,
)
}
default:
// WARN When we first enable staking, this condition may not be robust by itself.
// TODO: needs to make sure beacon chain sync works.
beaconEpoch := beacon.CurrentHeader().Epoch()
if w.config.IsStaking(w.current.header.Epoch()) {
switch beacon.CurrentHeader().Epoch().Cmp(w.current.header.Epoch()) {
switch beaconEpoch.Cmp(w.current.header.Epoch()) {
case 1:
nextCommittee, oops = committee.WithStakingEnabled.ReadFromDB(
beacon.CurrentHeader().Epoch(), beacon,
// If beacon chain is bigger than shard chain in epoch, it means I should catch up with beacon chain now
nextCommittee, err = committee.WithStakingEnabled.ReadFromDB(
beaconEpoch, beacon,
)
blockEpoch := big.NewInt(0).Set(beaconEpoch).Sub(beaconEpoch, big.NewInt(1)) // Set this block's epoch to be beaconEpoch - 1, so the next block will have beaconEpoch
utils.Logger().Debug().
Uint64("blockNum", w.current.header.Number().Uint64()).
Uint64("myPrevEpoch", w.current.header.Epoch().Uint64()).
Uint64("myNewEpoch", blockEpoch.Uint64()).
Msg("Propose new epoch as beacon chain's epoch")
w.current.header.SetEpoch(blockEpoch)
case 0:
// If it's same epoch, no need to propose new shard state (new epoch change)
case -1:
// If beacon chain is behind, shard chain should wait for the beacon chain by not changing epochs.
}
} else {
if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
nextCommittee, oops = committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1),
beacon,
if w.config.IsStaking(beaconEpoch) {
// If beacon is already in staking, but I am not, I should just catch up with beacon chain's epoch
nextCommittee, err = committee.WithStakingEnabled.ReadFromDB(
beaconEpoch, beacon,
)
blockEpoch := big.NewInt(0).Set(beaconEpoch).Sub(beaconEpoch, big.NewInt(1)) // Set this block's epoch to be beaconEpoch - 1, so the next block will have beaconEpoch
utils.Logger().Debug().
Uint64("blockNum", w.current.header.Number().Uint64()).
Uint64("myPrevEpoch", w.current.header.Epoch().Uint64()).
Uint64("myNewEpoch", blockEpoch.Uint64()).
Msg("Propose one-time catch up with beacon chain's epoch")
w.current.header.SetEpoch(blockEpoch)
} else {
// If both beacon and I are not in staking, do pre-staking committee calculation
if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
nextCommittee, err = committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1),
w.chain,
)
}
}
}
}
return nextCommittee, oops
return nextCommittee, err
}
// FinalizeNewBlock generate a new block for the next consensus round.

@ -8,7 +8,6 @@ import (
"github.com/harmony-one/harmony/block"
common2 "github.com/harmony-one/harmony/internal/common"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
@ -22,19 +21,11 @@ type ValidatorListProvider interface {
epoch *big.Int, reader DataProvider,
) (shard.State, error)
ReadFromDB(epoch *big.Int, reader DataProvider) (shard.State, error)
}
// PublicKeysProvider per epoch
type PublicKeysProvider interface {
GetCommitteePublicKeys(superComm shard.State) [][]*bls.PublicKey
ReadPublicKeysFromDB(
hash common.Hash, reader DataProvider,
) ([]*bls.PublicKey, error)
GetCommitteePublicKeys(committee *shard.Committee) []*bls.PublicKey
}
// Reader is committee.Reader and it is the API that committee membership assignment needs
type Reader interface {
PublicKeysProvider
ValidatorListProvider
}
@ -187,54 +178,19 @@ func eposStakedCommittee(
return superComm, nil
}
// GetCommitteePublicKeys produces publicKeys of entire supercommittee per epoch
func (def partialStakingEnabled) GetCommitteePublicKeys(superComm shard.State) [][]*bls.PublicKey {
allIdentities := make([][]*bls.PublicKey, len(superComm))
// GetCommitteePublicKeys returns the public keys of a shard
func (def partialStakingEnabled) GetCommitteePublicKeys(committee *shard.Committee) []*bls.PublicKey {
allIdentities := make([]*bls.PublicKey, len(committee.Slots))
for i := range superComm {
allIdentities[i] = make([]*bls.PublicKey, len(superComm[i].Slots))
for j := range superComm[i].Slots {
identity := &bls.PublicKey{}
superComm[i].Slots[j].BlsPublicKey.ToLibBLSPublicKey(identity)
allIdentities[i][j] = identity
}
for i := range committee.Slots {
identity := &bls.PublicKey{}
committee.Slots[i].BlsPublicKey.ToLibBLSPublicKey(identity)
allIdentities[i] = identity
}
return allIdentities
}
func (def partialStakingEnabled) ReadPublicKeysFromDB(
h common.Hash, reader DataProvider,
) ([]*bls.PublicKey, error) {
header := reader.GetHeaderByHash(h)
shardID := header.ShardID()
superCommittee, err := reader.ReadShardState(header.Epoch())
if err != nil {
return nil, err
}
subCommittee := superCommittee.FindCommitteeByID(shardID)
if subCommittee == nil {
return nil, ctxerror.New("cannot find shard in the shard state",
"blockNumber", header.Number(),
"shardID", header.ShardID(),
)
}
committerKeys := []*bls.PublicKey{}
for i := range subCommittee.Slots {
committerKey := new(bls.PublicKey)
err := subCommittee.Slots[i].BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
return nil, ctxerror.New("cannot convert BLS public key",
"blsPublicKey", subCommittee.Slots[i].BlsPublicKey).WithCause(err)
}
committerKeys = append(committerKeys, committerKey)
}
return committerKeys, nil
return nil, nil
}
func (def partialStakingEnabled) ReadFromDB(
epoch *big.Int, reader DataProvider,
) (newSuperComm shard.State, err error) {

Loading…
Cancel
Save