Fix staking epoch logic

pull/1870/head
Rongjian Lan 5 years ago
parent c258ab167f
commit d9900b44df
  1. 2
      cmd/client/txgen/main.go
  2. 11
      cmd/harmony/main.go
  3. 71
      consensus/consensus_service.go
  4. 1
      core/state_transition.go
  5. 12
      internal/utils/utils.go
  6. 11
      node/node_newblock.go
  7. 23
      node/worker/worker.go

@ -219,7 +219,7 @@ syncLoop:
Msg("Error when adding new block")
}
stateMutex.Lock()
if err := txGen.Worker.UpdateCurrent(block.Coinbase()); err != nil {
if err := txGen.Worker.UpdateCurrent(); err != nil {
utils.Logger().Warn().Err(err).Msg("(*Worker).UpdateCurrent failed")
}
stateMutex.Unlock()

@ -195,6 +195,7 @@ func setupInitialAccount() (isLeader bool) {
pubKey := setupConsensusKey(nodeconfig.GetDefaultConfig())
reshardingEpoch := genesisShardingConfig.ReshardingEpoch()
// TODO: after staking, what if the FN validator uses the old bls pub keys?
if reshardingEpoch != nil && len(reshardingEpoch) > 0 {
for _, epoch := range reshardingEpoch {
config := shard.Schedule.InstanceForEpoch(epoch)
@ -208,12 +209,14 @@ func setupInitialAccount() (isLeader bool) {
}
if initialAccount == nil {
fmt.Fprintf(os.Stderr, "ERROR cannot find your BLS key in the genesis/FN tables: %s\n", pubKey.SerializeToHexStr())
os.Exit(100)
initialAccount.ShardID = uint32(*shardID)
initialAccount.BlsPublicKey = pubKey.SerializeToHexStr()
blsAddressBytes := pubKey.GetAddress()
initialAccount.Address = hex.EncodeToString(blsAddressBytes[:])
} else {
fmt.Printf("My Genesis Account: %v\n", *initialAccount)
}
fmt.Printf("My Genesis Account: %v\n", *initialAccount)
return isLeader
}

@ -428,15 +428,29 @@ func (consensus *Consensus) getLeaderPubKeyFromCoinbase(header *block.Header) (*
)
}
committerKey := new(bls.PublicKey)
isStaking := consensus.ChainReader.Config().IsStaking(header.Epoch())
for _, member := range committee.Slots {
if member.EcdsaAddress == header.Coinbase() {
err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
return nil, ctxerror.New("cannot convert BLS public key",
"blsPublicKey", member.BlsPublicKey,
"coinbaseAddr", header.Coinbase()).WithCause(err)
if isStaking {
// After staking the coinbase address will be the address of bls public key
if utils.GetAddressFromBlsPubKeyBytes(member.BlsPublicKey[:]) == header.Coinbase() {
err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
return nil, ctxerror.New("cannot convert BLS public key",
"blsPublicKey", member.BlsPublicKey,
"coinbaseAddr", header.Coinbase()).WithCause(err)
}
return committerKey, nil
}
} else {
if member.EcdsaAddress == header.Coinbase() {
err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
return nil, ctxerror.New("cannot convert BLS public key",
"blsPublicKey", member.BlsPublicKey,
"coinbaseAddr", header.Coinbase()).WithCause(err)
}
return committerKey, nil
}
return committerKey, nil
}
}
return nil, ctxerror.New("cannot find corresponding BLS Public Key", "coinbaseAddr", header.Coinbase())
@ -455,9 +469,10 @@ func (consensus *Consensus) getLeaderPubKeyFromCoinbase(header *block.Header) (*
func (consensus *Consensus) UpdateConsensusInformation() Mode {
curHeader := consensus.ChainReader.CurrentHeader()
next := new(big.Int).Add(curHeader.Epoch(), common.Big1)
if consensus.ChainReader.Config().IsStaking(next) &&
consensus.Decider.Policy() != quorum.SuperMajorityStake {
curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
if (consensus.ChainReader.Config().IsStaking(nextEpoch) && len(curHeader.ShardState()) > 0 && !consensus.ChainReader.Config().IsStaking(curEpoch)) ||
(consensus.ChainReader.Config().IsStaking(curEpoch) && consensus.Decider.Policy() != quorum.SuperMajorityStake) {
prevSubCommitteeDump := consensus.Decider.JSON()
@ -466,7 +481,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
return consensus.ShardID, nil
})
s, err := committee.WithStakingEnabled.ReadFromDB(
next, consensus.ChainReader,
nextEpoch, consensus.ChainReader,
)
if err != nil {
@ -489,7 +504,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
utils.Logger().Info().
Uint64("block-number", curHeader.Number().Uint64()).
Uint64("epoch", curHeader.Epoch().Uint64()).
Uint64("curEpoch", curHeader.Epoch().Uint64()).
Uint32("shard-id", consensus.ShardID).
RawJSON("prev-subcommittee", []byte(prevSubCommitteeDump)).
RawJSON("current-subcommittee", []byte(consensus.Decider.JSON())).
@ -498,12 +513,10 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
pubKeys := []*bls.PublicKey{}
hasError := false
header := consensus.ChainReader.CurrentHeader()
epoch := header.Epoch()
// TODO: change GetCommitteePublicKeys to read from DB
curShardState, err := committee.WithStakingEnabled.ReadFromDB(
epoch, consensus.ChainReader,
curEpoch, consensus.ChainReader,
)
if err != nil {
utils.Logger().Error().
@ -513,35 +526,35 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
return Syncing
}
curCommittee := curShardState.FindCommitteeByID(header.ShardID())
curCommittee := curShardState.FindCommitteeByID(curHeader.ShardID())
curPubKeys := committee.WithStakingEnabled.GetCommitteePublicKeys(
curCommittee,
)
consensus.numPrevPubKeys = len(curPubKeys)
consensus.getLogger().Info().Msg("[UpdateConsensusInformation] Updating.....")
if shard.Schedule.IsLastBlock(header.Number().Uint64()) {
// increase epoch by one if it's the last block
consensus.SetEpochNum(epoch.Uint64() + 1)
consensus.getLogger().Info().Uint64("headerNum", header.Number().Uint64()).
Msg("[UpdateConsensusInformation] Epoch updated for next epoch")
if len(curHeader.ShardState()) > 0 {
// increase curEpoch by one if it's the last block
consensus.SetEpochNum(curEpoch.Uint64() + 1)
consensus.getLogger().Info().Uint64("headerNum", curHeader.Number().Uint64()).
Msg("[UpdateConsensusInformation] Epoch updated for nextEpoch curEpoch")
nextShardState, err := committee.WithStakingEnabled.ReadFromDB(
new(big.Int).Add(epoch, common.Big1), consensus.ChainReader,
nextEpoch, consensus.ChainReader,
)
if err != nil {
utils.Logger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving next shard state")
Msg("Error retrieving nextEpoch shard state")
return Syncing
}
nextCommittee := nextShardState.FindCommitteeByID(header.ShardID())
nextCommittee := nextShardState.FindCommitteeByID(curHeader.ShardID())
pubKeys = committee.WithStakingEnabled.GetCommitteePublicKeys(
nextCommittee,
)
} else {
consensus.SetEpochNum(epoch.Uint64())
consensus.SetEpochNum(curEpoch.Uint64())
pubKeys = curPubKeys
}
@ -558,10 +571,10 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode {
Msg("[UpdateConsensusInformation] Successfully updated public keys")
consensus.UpdatePublicKeys(pubKeys)
// take care of possible leader change during the epoch
if !shard.Schedule.IsLastBlock(header.Number().Uint64()) &&
header.Number().Uint64() != 0 {
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(header)
// take care of possible leader change during the curEpoch
if !shard.Schedule.IsLastBlock(curHeader.Number().Uint64()) &&
curHeader.Number().Uint64() != 0 {
leaderPubKey, err := consensus.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil || leaderPubKey == nil {
consensus.getLogger().Debug().Err(err).
Msg("[SYNC] Unable to get leaderPubKey from coinbase")

@ -243,6 +243,7 @@ func (st *StateTransition) TransitionDb() (ret []byte, usedGas uint64, failed bo
}
}
st.refundGas()
// TODO: need to move the gas fee to the general block rewards
st.state.AddBalance(st.evm.Coinbase, new(big.Int).Mul(new(big.Int).SetUint64(st.gasUsed()), st.gasPrice))
return ret, st.gasUsed(), vmerr != nil, err

@ -86,6 +86,18 @@ func GetAddressFromBlsPubKey(pubKey *bls.PublicKey) common.Address {
return addr
}
// GetAddressFromBlsPubKeyBytes return the address object from bls pub key.
func GetAddressFromBlsPubKeyBytes(pubKeyBytes []byte) common.Address {
pubKey := &bls.PublicKey{}
err := pubKey.Deserialize(pubKeyBytes[:])
addr := common.Address{}
if err != nil {
addrBytes := pubKey.GetAddress()
addr.SetBytes(addrBytes[:])
}
return addr
}
// GenKeyP2P generates a pair of RSA keys used in libp2p host
func GenKeyP2P(ip, port string) (p2p_crypto.PrivKey, p2p_crypto.PubKey, error) {
r := mrand.New(mrand.NewSource(int64(GetUniqueIDFromIPPort(ip, port))))

@ -78,7 +78,16 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch
func (node *Node) proposeNewBlock() (*types.Block, error) {
// Update worker's current header and state data in preparation to propose/process new transactions
coinbase := node.Consensus.SelfAddress
node.Worker.UpdateCurrent(coinbase)
// After staking, all coinbase will be the address of bls pub key
if node.Blockchain().Config().IsStaking(node.Worker.GetNewEpoch()) {
addr := common.Address{}
blsPubKeyBytes := node.Consensus.PubKey.GetAddress()
addr.SetBytes(blsPubKeyBytes[:])
coinbase = addr
}
node.Worker.UpdateCurrent()
// Prepare transactions including staking transactions
pending, err := node.TxPool.Pending()

@ -215,7 +215,7 @@ func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error {
}
// UpdateCurrent updates the current environment with the current state and header.
func (w *Worker) UpdateCurrent(coinbase common.Address) error {
func (w *Worker) UpdateCurrent() error {
parent := w.chain.CurrentBlock()
num := parent.Number()
timestamp := time.Now().Unix()
@ -227,7 +227,6 @@ func (w *Worker) UpdateCurrent(coinbase common.Address) error {
GasLimit(core.CalcGasLimit(parent, w.gasFloor, w.gasCeil)).
Time(big.NewInt(timestamp)).
ShardID(w.chain.ShardID()).
Coinbase(coinbase).
Header()
return w.makeCurrent(parent, header)
}
@ -301,7 +300,9 @@ func (w *Worker) SuperCommitteeForNextEpoch(
default:
// TODO: needs to make sure beacon chain sync works.
beaconEpoch := beacon.CurrentHeader().Epoch()
if w.config.IsStaking(w.current.header.Epoch()) {
nextEpoch := new(big.Int).Add(w.current.header.Epoch(), common.Big1)
if w.config.IsStaking(nextEpoch) {
// If next epoch is staking epoch, I should wait and listen for beacon chain for epoch changes
switch beaconEpoch.Cmp(w.current.header.Epoch()) {
case 1:
// If beacon chain is bigger than shard chain in epoch, it means I should catch up with beacon chain now
@ -315,31 +316,30 @@ func (w *Worker) SuperCommitteeForNextEpoch(
utils.Logger().Debug().
Uint64("blockNum", w.current.header.Number().Uint64()).
Uint64("myPrevEpoch", w.current.header.Epoch().Uint64()).
Uint64("myNewEpoch", blockEpoch.Uint64()).
Uint64("myCurEpoch", blockEpoch.Uint64()).
Msg("Propose new epoch as beacon chain's epoch")
w.current.header.SetEpoch(blockEpoch)
case 0:
// If it's same epoch, no need to propose new shard state (new epoch change)
case -1:
// If beacon chain is behind, shard chain should wait for the beacon chain by not changing epochs.
}
} else {
beaconEpochSubOne := big.NewInt(0).Set(beaconEpoch).Sub(beaconEpoch, big.NewInt(1))
if w.config.IsStaking(beaconEpochSubOne) {
// If I am not in staking epoch yet and beacon chain already proposed a staking-based shard state,
// which means beacon chain's epoch is greater than stakingEpoch, I should just catch up with beacon chain's epoch
if w.config.IsStaking(beaconEpoch) {
// If I am not even in the last epoch before staking epoch and beacon chain is already in staking epoch,
// I should just catch up with beacon chain's epoch
nextCommittee, err = committee.WithStakingEnabled.ReadFromDB(
beaconEpoch, beacon,
)
blockEpoch := big.NewInt(0).Set(beaconEpoch).Sub(beaconEpoch, big.NewInt(1))
utils.Logger().Debug().
Uint64("blockNum", w.current.header.Number().Uint64()).
Uint64("myPrevEpoch", w.current.header.Epoch().Uint64()).
Uint64("myNewEpoch", beaconEpochSubOne.Uint64()).
Uint64("myCurEpoch", blockEpoch.Uint64()).
Msg("Propose one-time catch up with beacon chain's epoch")
// Set this block's epoch to be beaconEpoch - 1, so the next block will have beaconEpoch
w.current.header.SetEpoch(beaconEpochSubOne)
w.current.header.SetEpoch(blockEpoch)
} else {
// If I are not in staking nor has beacon chain proposed a staking-based shard state,
// do pre-staking committee calculation
@ -397,6 +397,7 @@ func (w *Worker) FinalizeNewBlock(sig []byte, signers []byte, viewID uint64, coi
s := w.current.state.Copy()
copyHeader := types.CopyHeader(w.current.header)
// TODO: feed coinbase into here so the proposer gets extra rewards.
block, err := w.engine.Finalize(w.chain, copyHeader, s, w.current.txs, w.current.receipts, w.current.outcxs, w.current.incxs, w.current.stakingTxs)
if err != nil {
return nil, ctxerror.New("cannot finalize block").WithCause(err)

Loading…
Cancel
Save