pull/1866/head
Rongjian Lan 5 years ago
parent 93dade6a72
commit bad9c62922
  1. 37
      api/proto/node/node.go
  2. 8
      api/service/syncing/syncing.go
  3. 2
      block/factory/factory.go
  4. 2
      cmd/harmony/main.go
  5. 3
      consensus/consensus_service.go
  6. 16
      consensus/quorum/one-node-staked-vote.go
  7. 2
      core/blockchain.go
  8. 6
      internal/chain/engine.go
  9. 2
      internal/params/config.go
  10. 2
      node/node_genesis.go
  11. 11
      node/node_handler.go
  12. 3
      node/node_newblock.go
  13. 179
      node/node_resharding.go
  14. 8
      node/node_syncing.go
  15. 54
      node/worker/worker.go
  16. 13
      shard/committee/assignment.go

@ -14,7 +14,6 @@ import (
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
)
// MessageType is to indicate the specific type of message under Node category
@ -25,9 +24,9 @@ const (
Transaction MessageType = iota
Block
Client
_ // used to be Control
PING // node send ip/pki to register with leader
ShardState
_ // used to be Control
PING // node send ip/pki to register with leader
ShardState // Deprecated
Staking
)
@ -160,36 +159,6 @@ func ConstructCrossLinkHeadersMessage(headers []*block.Header) []byte {
return byteBuffer.Bytes()
}
// ConstructEpochShardStateMessage contructs epoch shard state message
func ConstructEpochShardStateMessage(epochShardState shard.EpochShardState) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(ShardState))
encoder := gob.NewEncoder(byteBuffer)
err := encoder.Encode(epochShardState)
if err != nil {
utils.Logger().Error().Err(err).Msg("[ConstructEpochShardStateMessage] Encode")
return nil
}
return byteBuffer.Bytes()
}
// DeserializeEpochShardStateFromMessage deserializes the shard state Message from bytes payload
func DeserializeEpochShardStateFromMessage(payload []byte) (*shard.EpochShardState, error) {
epochShardState := new(shard.EpochShardState)
r := bytes.NewBuffer(payload)
decoder := gob.NewDecoder(r)
err := decoder.Decode(epochShardState)
if err != nil {
utils.Logger().Error().Err(err).Msg("[GetEpochShardStateFromMessage] Decode")
return nil, fmt.Errorf("Decode epoch shard state Error")
}
return epochShardState, nil
}
// ConstructCXReceiptsProof constructs cross shard receipts and related proof including
// merkle proof, blockHeader and commitSignatures
func ConstructCXReceiptsProof(cxs types.CXReceipts, mkp *types.CXMerkleProof, header *block.Header, commitSig []byte, commitBitmap []byte) []byte {

@ -138,10 +138,12 @@ func (ss *StateSync) purgeOldBlocksFromCache() {
func (ss *StateSync) AddLastMileBlock(block *types.Block) {
ss.lastMileMux.Lock()
defer ss.lastMileMux.Unlock()
if len(ss.lastMileBlocks) >= LastMileBlocksSize {
ss.lastMileBlocks = ss.lastMileBlocks[1:]
if ss.lastMileBlocks != nil {
if len(ss.lastMileBlocks) >= LastMileBlocksSize {
ss.lastMileBlocks = ss.lastMileBlocks[1:]
}
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
}
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
}
// CloseConnections close grpc connections for state sync clients

@ -30,7 +30,7 @@ func NewFactory(chainConfig *params.ChainConfig) Factory {
func (f *factory) NewHeader(epoch *big.Int) *block.Header {
var impl blockif.Header
switch {
case epoch.Cmp(f.chainConfig.StakingEpoch) >= 0:
case epoch.Cmp(f.chainConfig.StakingEpoch) < 0:
impl = v3.NewHeader()
case epoch.Cmp(f.chainConfig.CrossLinkEpoch) >= 0:
impl = v2.NewHeader()

@ -101,7 +101,7 @@ var (
// beaconSyncFreq indicates beaconchain sync frequency
beaconSyncFreq = flag.Int("beacon_sync_freq", 60, "unit in seconds")
// blockPeriod indicates the how long the leader waits to propose a new block.
blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.")
blockPeriod = flag.Int("block_period", 2, "how long in second the leader waits to propose a new block.")
leaderOverride = flag.Bool("leader_override", false, "true means override the default leader role and acts as validator")
// shardID indicates the shard ID of this node
shardID = flag.Int("shard_id", -1, "the shard ID of this node")

@ -228,6 +228,9 @@ func (consensus *Consensus) ToggleConsensusCheck() {
// IsValidatorInCommittee returns whether the given validator BLS address is part of my committee
func (consensus *Consensus) IsValidatorInCommittee(pubKey *bls.PublicKey) bool {
utils.Logger().Print("XXXXXXXX")
utils.Logger().Print(consensus.Decider.JSON())
return consensus.Decider.IndexOf(pubKey) != -1
}

@ -132,9 +132,9 @@ func (v *stakedVoteWeight) Award(
}
var (
errSumOfVotingPowerNotOne = errors.New("sum of total votes do not sum to 100%")
errSumOfVotingPowerNotOne = errors.New("sum of total votes do not sum to 100 percent")
errSumOfOursAndTheirsNotOne = errors.New(
"sum of hmy nodes and stakers do not sum to 100%",
"sum of hmy nodes and stakers do not sum to 100 percent",
)
)
@ -191,12 +191,12 @@ func (v *stakedVoteWeight) SetVoters(
Str("Raw-Staked", v.stakedTotal.String()).
Msg("Total staked")
switch {
case totalStakedPercent.Equal(totalShare) == false:
return nil, errSumOfVotingPowerNotOne
case ourPercentage.Add(theirPercentage).Equal(totalShare) == false:
return nil, errSumOfOursAndTheirsNotOne
}
//switch {
//case totalStakedPercent.Equal(totalShare) == false:
// return nil, errSumOfVotingPowerNotOne
//case ourPercentage.Add(theirPercentage).Equal(totalShare) == false:
// return nil, errSumOfOursAndTheirsNotOne
//}
// Hold onto this calculation
v.ourVotingPowerTotal = ourPercentage

@ -1973,7 +1973,7 @@ func (bc *BlockChain) GetShardState(epoch *big.Int) (shard.State, error) {
if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 {
shardState, err = committee.WithStakingEnabled.Compute(
big.NewInt(GenesisEpoch), *bc.Config(), nil,
big.NewInt(GenesisEpoch), bc.Config(), nil,
)
} else {
prevEpoch := new(big.Int).Sub(epoch, common.Big1)

@ -225,7 +225,7 @@ func QuorumForBlock(
) (quorum int, err error) {
var ss shard.State
if reCalculate {
ss, _ = committee.WithStakingEnabled.Compute(h.Epoch(), *chain.Config(), chain)
ss, _ = committee.WithStakingEnabled.Compute(h.Epoch(), chain.Config(), chain)
} else {
ss, err = chain.ReadShardState(h.Epoch())
if err != nil {
@ -284,7 +284,7 @@ func GetPublicKeys(chain engine.ChainReader, header *block.Header, reCalculate b
var shardState shard.State
var err error
if reCalculate {
shardState, _ = committee.WithStakingEnabled.Compute(header.Epoch(), *chain.Config(), chain)
shardState, _ = committee.WithStakingEnabled.Compute(header.Epoch(), chain.Config(), chain)
} else {
shardState, err = chain.ReadShardState(header.Epoch())
if err != nil {
@ -301,6 +301,8 @@ func GetPublicKeys(chain engine.ChainReader, header *block.Header, reCalculate b
)
}
var committerKeys []*bls.PublicKey
utils.Logger().Print(committee.Slots)
for _, member := range committee.Slots {
committerKey := new(bls.PublicKey)
err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey)

@ -36,7 +36,7 @@ var (
ChainID: TestnetChainID,
CrossTxEpoch: big.NewInt(0),
CrossLinkEpoch: big.NewInt(0),
StakingEpoch: EpochTBD,
StakingEpoch: big.NewInt(3),
EIP155Epoch: big.NewInt(0),
S3Epoch: big.NewInt(0),
}

@ -41,7 +41,7 @@ type genesisInitializer struct {
// InitChainDB sets up a new genesis block in the database for the given shard.
func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error {
shardState, _ := committee.WithStakingEnabled.Compute(
big.NewInt(core.GenesisEpoch), gi.node.chainConfig, nil,
big.NewInt(core.GenesisEpoch), &gi.node.chainConfig, nil,
)
if shardID != shard.BeaconChainShardID {
// store only the local shard for shard chains

@ -133,7 +133,7 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
if block.ShardID() == 0 {
utils.Logger().Info().
Uint64("block", blocks[0].NumberU64()).
Msgf("Block being handled by block channel %d %d", block.NumberU64(), block.ShardID())
Msgf("Beacon block being handled by block channel: %d", block.NumberU64())
node.BeaconBlockChannel <- block
}
}
@ -159,10 +159,6 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
}
case proto_node.PING:
node.pingMessageHandler(msgPayload, sender)
case proto_node.ShardState:
if err := node.epochShardStateMessageHandler(msgPayload); err != nil {
utils.Logger().Warn().Err(err)
}
}
default:
utils.Logger().Error().
@ -378,8 +374,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
}
}
}
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
if shard.Schedule.IsLastBlock(newBlock.Number().Uint64()) {
if len(newBlock.Header().ShardState()) > 0 {
next := new(big.Int).Add(newBlock.Epoch(), common.Big1)
if node.chainConfig.StakingEpoch.Cmp(next) == 0 &&
node.Consensus.Decider.Policy() != quorum.SuperMajorityStake {
@ -388,7 +385,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
return node.Consensus.ShardID, nil
})
s, _ := committee.WithStakingEnabled.Compute(
next, node.chainConfig, node.Consensus.ChainReader,
next, &node.chainConfig, node.Consensus.ChainReader,
)
prevSubCommitteeDump := node.Consensus.Decider.JSON()

@ -123,6 +123,9 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
node.Consensus.ShardID, node.Blockchain(),
)
utils.Logger().Print("TESTTEST")
utils.Logger().Print(shardState.JSON())
if err != nil {
return nil, err
}

@ -2,196 +2,17 @@ package node
import (
"bytes"
"errors"
"math"
"math/big"
"os"
"os/exec"
"strconv"
"syscall"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/shard/committee"
)
// validateNewShardState validate whether the new shard state root matches
func (node *Node) validateNewShardState(block *types.Block) error {
// Common case first – blocks without resharding proposal
header := block.Header()
if header.ShardStateHash() == (common.Hash{}) {
// No new shard state was proposed
if block.ShardID() == shard.BeaconChainShardID {
if shard.Schedule.IsLastBlock(block.Number().Uint64()) {
// TODO ek - invoke view change
return errors.New("beacon leader did not propose resharding")
}
} else {
if node.nextShardState.master != nil &&
!time.Now().Before(node.nextShardState.proposeTime) {
// TODO ek – invoke view change
return errors.New("regular leader did not propose resharding")
}
}
// We aren't expecting to reshard, so proceed to sign
return nil
}
shardState := &shard.State{}
err := rlp.DecodeBytes(header.ShardState(), shardState)
if err != nil {
return err
}
proposed := *shardState
if block.ShardID() == shard.BeaconChainShardID {
// Beacon validators independently recalculate the master state and
// compare it against the proposed copy.
// TODO ek – this may be called from regular shards,
// for vetting beacon chain blocks received during block syncing.
// DRand may or or may not get in the way. Test this out.
expected, err := committee.WithStakingEnabled.ReadFromDB(
new(big.Int).Sub(block.Header().Epoch(), common.Big1),
node.Beaconchain(),
)
if err != nil {
utils.Logger().Error().Err(err).Msg("cannot calculate expected shard state")
return ctxerror.New("cannot calculate expected shard state").
WithCause(err)
}
if shard.CompareShardState(expected, proposed) != 0 {
// TODO ek – log state proposal differences
// TODO ek – this error should trigger view change
err := errors.New("shard state proposal is different from expected")
// TODO ek/chao – calculated shard state is different even with the
// same input, i.e. it is nondeterministic.
// Don't treat this as a blocker until we fix the nondeterminism.
utils.Logger().Warn().Err(err).Msg("shard state proposal is different from expected")
}
} else {
// Regular validators fetch the local-shard copy on the beacon chain
// and compare it against the proposed copy.
//
// We trust the master proposal in our copy of beacon chain.
// The sanity check for the master proposal is done earlier,
// when the beacon block containing the master proposal is received
// and before it is admitted into the local beacon chain.
//
// TODO ek – fetch masterProposal from beaconchain instead
masterProposal := node.nextShardState.master.ShardState
expected := masterProposal.FindCommitteeByID(block.ShardID())
switch len(proposed) {
case 0:
// Proposal to discontinue shard
if expected != nil {
// TODO ek – invoke view change
utils.Logger().Error().Msg("leader proposed to disband against beacon decision")
return errors.New(
"leader proposed to disband against beacon decision")
}
case 1:
// Proposal to continue shard
proposed := proposed[0]
// Sanity check: Shard ID should match
if proposed.ShardID != block.ShardID() {
// TODO ek – invoke view change
utils.Logger().Error().
Uint32("proposedShard", proposed.ShardID).
Uint32("blockShard", block.ShardID()).
Msg("proposal has incorrect shard ID")
return ctxerror.New("proposal has incorrect shard ID",
"proposedShard", proposed.ShardID,
"blockShard", block.ShardID())
}
// Did beaconchain say we are no more?
if expected == nil {
// TODO ek – invoke view change
utils.Logger().Error().Msg("leader proposed to continue against beacon decision")
return errors.New(
"leader proposed to continue against beacon decision")
}
// Did beaconchain say the same proposal?
if shard.CompareCommittee(expected, &proposed) != 0 {
// TODO ek – log differences
// TODO ek – invoke view change
utils.Logger().Error().Msg("proposal differs from one in beacon chain")
return errors.New("proposal differs from one in beacon chain")
}
default:
// TODO ek – invoke view change
utils.Logger().Error().
Int("numShards", len(proposed)).
Msg("regular resharding proposal has incorrect number of shards")
return ctxerror.New(
"regular resharding proposal has incorrect number of shards",
"numShards", len(proposed))
}
}
return nil
}
func (node *Node) broadcastEpochShardState(newBlock *types.Block) error {
shardState, err := newBlock.Header().GetShardState()
if err != nil {
return err
}
epochShardStateMessage := proto_node.ConstructEpochShardStateMessage(
shard.EpochShardState{
Epoch: newBlock.Header().Epoch().Uint64() + 1,
ShardState: shardState,
},
)
return node.host.SendMessageToGroups(
[]nodeconfig.GroupID{node.NodeConfig.GetClientGroupID()},
host.ConstructP2pMessage(byte(0), epochShardStateMessage))
}
func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error {
epochShardState, err := proto_node.DeserializeEpochShardStateFromMessage(msgPayload)
if err != nil {
utils.Logger().Error().Err(err).Msg("Can't get shard state message")
return ctxerror.New("Can't get shard state message").WithCause(err)
}
if node.Consensus == nil {
return nil
}
receivedEpoch := big.NewInt(int64(epochShardState.Epoch))
utils.Logger().Info().
Int64("epoch", receivedEpoch.Int64()).
Msg("received new shard state")
node.nextShardState.master = epochShardState
if node.Consensus.IsLeader() {
// Wait a bit to allow the master table to reach other validators.
node.nextShardState.proposeTime = time.Now().Add(5 * time.Second)
} else {
// Wait a bit to allow the master table to reach the leader,
// and to allow the leader to propose next shard state based upon it.
node.nextShardState.proposeTime = time.Now().Add(15 * time.Second)
}
// TODO ek – this should be done from replaying beaconchain once
// beaconchain sync is fixed
err = node.Beaconchain().WriteShardState(
receivedEpoch, epochShardState.ShardState)
if err != nil {
utils.Logger().Error().
Uint64("epoch", receivedEpoch.Uint64()).
Err(err).Msg("cannot store shard state")
return ctxerror.New("cannot store shard state", "epoch", receivedEpoch).
WithCause(err)
}
return nil
}
/*
func (node *Node) transitionIntoNextEpoch(shardState types.State) {
logger = logger.New(

@ -165,9 +165,11 @@ func (node *Node) DoBeaconSyncing() {
for {
select {
case beaconBlock := <-node.BeaconBlockChannel:
err := node.beaconSync.UpdateBlockAndStatus(beaconBlock, node.Beaconchain(), node.BeaconWorker)
if err != nil {
node.beaconSync.AddLastMileBlock(beaconBlock)
if node.beaconSync != nil {
err := node.beaconSync.UpdateBlockAndStatus(beaconBlock, node.Beaconchain(), node.BeaconWorker)
if err != nil {
node.beaconSync.AddLastMileBlock(beaconBlock)
}
}
}
}

@ -290,26 +290,42 @@ func (w *Worker) SuperCommitteeForNextEpoch(
nextCommittee shard.State
oops error
)
// WARN This currently not working and breaks around 15 block
// switch shardID {
// case shard.BeaconChainShardID:
if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
nextCommittee, oops = committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1),
*w.config,
beacon,
)
switch shardID {
case shard.BeaconChainShardID:
if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
nextCommittee, oops = committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1),
w.config,
beacon,
)
}
default:
// WARN When we first enable staking, this condition may not be robust by itself.
if w.config.IsStaking(w.current.header.Epoch()) {
utils.Logger().Print("CURRRRRRRRRR")
utils.Logger().Print(beacon.CurrentHeader().Number())
utils.Logger().Print(beacon.CurrentHeader().Epoch())
utils.Logger().Print(w.current.header.Epoch())
switch beacon.CurrentHeader().Epoch().Cmp(w.current.header.Epoch()) {
case 1:
utils.Logger().Print("TTTTTTTT")
nextCommittee, oops = committee.WithStakingEnabled.ReadFromDB(
beacon.CurrentHeader().Epoch(), beacon,
)
utils.Logger().Print(nextCommittee)
}
} else {
if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
nextCommittee, oops = committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1),
w.config,
beacon,
)
}
}
}
// default:
// WARN When we first enable staking, this condition may not be robust by itself.
// switch beacon.CurrentHeader().Epoch().Cmp(w.current.header.Epoch()) {
// case 1:
// nextCommittee, oops = committee.WithStakingEnabled.ReadFromDB(
// beacon.CurrentHeader().Epoch(), beacon,
// )
// }
// }
return nextCommittee, oops
}

@ -19,7 +19,7 @@ import (
// ValidatorListProvider ..
type ValidatorListProvider interface {
Compute(
epoch *big.Int, config params.ChainConfig, reader DataProvider,
epoch *big.Int, config *params.ChainConfig, reader DataProvider,
) (shard.State, error)
ReadFromDB(epoch *big.Int, reader DataProvider) (shard.State, error)
}
@ -185,15 +185,8 @@ func eposStakedCommittee(
func (def partialStakingEnabled) ComputePublicKeys(
epoch *big.Int, d DataProvider,
) [][]*bls.PublicKey {
config := d.Config()
instance := shard.Schedule.InstanceForEpoch(epoch)
superComm := shard.State{}
if config.IsStaking(epoch) {
superComm, _ = eposStakedCommittee(instance, d, 320)
} else {
superComm = preStakingEnabledCommittee(instance)
}
superComm, _ := def.Compute(epoch, config, d)
allIdentities := make([][]*bls.PublicKey, len(superComm))
@ -249,7 +242,7 @@ func (def partialStakingEnabled) ReadFromDB(
// ReadFromComputation is single entry point for reading the State of the network
func (def partialStakingEnabled) Compute(
epoch *big.Int, config params.ChainConfig, stakerReader DataProvider,
epoch *big.Int, config *params.ChainConfig, stakerReader DataProvider,
) (newSuperComm shard.State, err error) {
instance := shard.Schedule.InstanceForEpoch(epoch)
if !config.IsStaking(epoch) {

Loading…
Cancel
Save