diff --git a/api/proto/node/node.go b/api/proto/node/node.go index d8c4b5024..6b16aafcd 100644 --- a/api/proto/node/node.go +++ b/api/proto/node/node.go @@ -14,7 +14,6 @@ import ( "github.com/harmony-one/harmony/block" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/shard" ) // MessageType is to indicate the specific type of message under Node category @@ -25,9 +24,9 @@ const ( Transaction MessageType = iota Block Client - _ // used to be Control - PING // node send ip/pki to register with leader - ShardState + _ // used to be Control + PING // node send ip/pki to register with leader + ShardState // Deprecated Staking ) @@ -160,36 +159,6 @@ func ConstructCrossLinkHeadersMessage(headers []*block.Header) []byte { return byteBuffer.Bytes() } -// ConstructEpochShardStateMessage contructs epoch shard state message -func ConstructEpochShardStateMessage(epochShardState shard.EpochShardState) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) - byteBuffer.WriteByte(byte(ShardState)) - - encoder := gob.NewEncoder(byteBuffer) - err := encoder.Encode(epochShardState) - if err != nil { - utils.Logger().Error().Err(err).Msg("[ConstructEpochShardStateMessage] Encode") - return nil - } - return byteBuffer.Bytes() -} - -// DeserializeEpochShardStateFromMessage deserializes the shard state Message from bytes payload -func DeserializeEpochShardStateFromMessage(payload []byte) (*shard.EpochShardState, error) { - epochShardState := new(shard.EpochShardState) - - r := bytes.NewBuffer(payload) - decoder := gob.NewDecoder(r) - err := decoder.Decode(epochShardState) - - if err != nil { - utils.Logger().Error().Err(err).Msg("[GetEpochShardStateFromMessage] Decode") - return nil, fmt.Errorf("Decode epoch shard state Error") - } - - return epochShardState, nil -} - // ConstructCXReceiptsProof constructs cross shard receipts and related proof including // merkle proof, blockHeader and commitSignatures func ConstructCXReceiptsProof(cxs types.CXReceipts, mkp *types.CXMerkleProof, header *block.Header, commitSig []byte, commitBitmap []byte) []byte { diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index 32fe9c952..b745ee469 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -138,10 +138,12 @@ func (ss *StateSync) purgeOldBlocksFromCache() { func (ss *StateSync) AddLastMileBlock(block *types.Block) { ss.lastMileMux.Lock() defer ss.lastMileMux.Unlock() - if len(ss.lastMileBlocks) >= LastMileBlocksSize { - ss.lastMileBlocks = ss.lastMileBlocks[1:] + if ss.lastMileBlocks != nil { + if len(ss.lastMileBlocks) >= LastMileBlocksSize { + ss.lastMileBlocks = ss.lastMileBlocks[1:] + } + ss.lastMileBlocks = append(ss.lastMileBlocks, block) } - ss.lastMileBlocks = append(ss.lastMileBlocks, block) } // CloseConnections close grpc connections for state sync clients diff --git a/block/factory/factory.go b/block/factory/factory.go index dcef0c142..5659d9139 100644 --- a/block/factory/factory.go +++ b/block/factory/factory.go @@ -30,7 +30,7 @@ func NewFactory(chainConfig *params.ChainConfig) Factory { func (f *factory) NewHeader(epoch *big.Int) *block.Header { var impl blockif.Header switch { - case epoch.Cmp(f.chainConfig.StakingEpoch) >= 0: + case epoch.Cmp(f.chainConfig.StakingEpoch) < 0: impl = v3.NewHeader() case epoch.Cmp(f.chainConfig.CrossLinkEpoch) >= 0: impl = v2.NewHeader() diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 7d62c5c05..03d509a29 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -101,7 +101,7 @@ var ( // beaconSyncFreq indicates beaconchain sync frequency beaconSyncFreq = flag.Int("beacon_sync_freq", 60, "unit in seconds") // blockPeriod indicates the how long the leader waits to propose a new block. - blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.") + blockPeriod = flag.Int("block_period", 2, "how long in second the leader waits to propose a new block.") leaderOverride = flag.Bool("leader_override", false, "true means override the default leader role and acts as validator") // shardID indicates the shard ID of this node shardID = flag.Int("shard_id", -1, "the shard ID of this node") diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 624492934..a53856059 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -228,6 +228,9 @@ func (consensus *Consensus) ToggleConsensusCheck() { // IsValidatorInCommittee returns whether the given validator BLS address is part of my committee func (consensus *Consensus) IsValidatorInCommittee(pubKey *bls.PublicKey) bool { + + utils.Logger().Print("XXXXXXXX") + utils.Logger().Print(consensus.Decider.JSON()) return consensus.Decider.IndexOf(pubKey) != -1 } diff --git a/consensus/quorum/one-node-staked-vote.go b/consensus/quorum/one-node-staked-vote.go index 10a17b987..d8569d44f 100644 --- a/consensus/quorum/one-node-staked-vote.go +++ b/consensus/quorum/one-node-staked-vote.go @@ -132,9 +132,9 @@ func (v *stakedVoteWeight) Award( } var ( - errSumOfVotingPowerNotOne = errors.New("sum of total votes do not sum to 100%") + errSumOfVotingPowerNotOne = errors.New("sum of total votes do not sum to 100 percent") errSumOfOursAndTheirsNotOne = errors.New( - "sum of hmy nodes and stakers do not sum to 100%", + "sum of hmy nodes and stakers do not sum to 100 percent", ) ) @@ -191,12 +191,12 @@ func (v *stakedVoteWeight) SetVoters( Str("Raw-Staked", v.stakedTotal.String()). Msg("Total staked") - switch { - case totalStakedPercent.Equal(totalShare) == false: - return nil, errSumOfVotingPowerNotOne - case ourPercentage.Add(theirPercentage).Equal(totalShare) == false: - return nil, errSumOfOursAndTheirsNotOne - } + //switch { + //case totalStakedPercent.Equal(totalShare) == false: + // return nil, errSumOfVotingPowerNotOne + //case ourPercentage.Add(theirPercentage).Equal(totalShare) == false: + // return nil, errSumOfOursAndTheirsNotOne + //} // Hold onto this calculation v.ourVotingPowerTotal = ourPercentage diff --git a/core/blockchain.go b/core/blockchain.go index 4abbfd283..514c655f2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1973,7 +1973,7 @@ func (bc *BlockChain) GetShardState(epoch *big.Int) (shard.State, error) { if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 { shardState, err = committee.WithStakingEnabled.Compute( - big.NewInt(GenesisEpoch), *bc.Config(), nil, + big.NewInt(GenesisEpoch), bc.Config(), nil, ) } else { prevEpoch := new(big.Int).Sub(epoch, common.Big1) diff --git a/internal/chain/engine.go b/internal/chain/engine.go index 628843e53..3a543750e 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -225,7 +225,7 @@ func QuorumForBlock( ) (quorum int, err error) { var ss shard.State if reCalculate { - ss, _ = committee.WithStakingEnabled.Compute(h.Epoch(), *chain.Config(), chain) + ss, _ = committee.WithStakingEnabled.Compute(h.Epoch(), chain.Config(), chain) } else { ss, err = chain.ReadShardState(h.Epoch()) if err != nil { @@ -284,7 +284,7 @@ func GetPublicKeys(chain engine.ChainReader, header *block.Header, reCalculate b var shardState shard.State var err error if reCalculate { - shardState, _ = committee.WithStakingEnabled.Compute(header.Epoch(), *chain.Config(), chain) + shardState, _ = committee.WithStakingEnabled.Compute(header.Epoch(), chain.Config(), chain) } else { shardState, err = chain.ReadShardState(header.Epoch()) if err != nil { @@ -301,6 +301,8 @@ func GetPublicKeys(chain engine.ChainReader, header *block.Header, reCalculate b ) } var committerKeys []*bls.PublicKey + + utils.Logger().Print(committee.Slots) for _, member := range committee.Slots { committerKey := new(bls.PublicKey) err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey) diff --git a/internal/params/config.go b/internal/params/config.go index 21af1fda3..5892b2258 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -36,7 +36,7 @@ var ( ChainID: TestnetChainID, CrossTxEpoch: big.NewInt(0), CrossLinkEpoch: big.NewInt(0), - StakingEpoch: EpochTBD, + StakingEpoch: big.NewInt(3), EIP155Epoch: big.NewInt(0), S3Epoch: big.NewInt(0), } diff --git a/node/node_genesis.go b/node/node_genesis.go index 61400eb5b..4ce9bd1ff 100644 --- a/node/node_genesis.go +++ b/node/node_genesis.go @@ -41,7 +41,7 @@ type genesisInitializer struct { // InitChainDB sets up a new genesis block in the database for the given shard. func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error { shardState, _ := committee.WithStakingEnabled.Compute( - big.NewInt(core.GenesisEpoch), gi.node.chainConfig, nil, + big.NewInt(core.GenesisEpoch), &gi.node.chainConfig, nil, ) if shardID != shard.BeaconChainShardID { // store only the local shard for shard chains diff --git a/node/node_handler.go b/node/node_handler.go index 861367021..e5954c008 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -133,7 +133,7 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) { if block.ShardID() == 0 { utils.Logger().Info(). Uint64("block", blocks[0].NumberU64()). - Msgf("Block being handled by block channel %d %d", block.NumberU64(), block.ShardID()) + Msgf("Beacon block being handled by block channel: %d", block.NumberU64()) node.BeaconBlockChannel <- block } } @@ -159,10 +159,6 @@ func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) { } case proto_node.PING: node.pingMessageHandler(msgPayload, sender) - case proto_node.ShardState: - if err := node.epochShardStateMessageHandler(msgPayload); err != nil { - utils.Logger().Warn().Err(err) - } } default: utils.Logger().Error(). @@ -378,8 +374,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit } } } + // Update consensus keys at last so the change of leader status doesn't mess up normal flow - if shard.Schedule.IsLastBlock(newBlock.Number().Uint64()) { + if len(newBlock.Header().ShardState()) > 0 { next := new(big.Int).Add(newBlock.Epoch(), common.Big1) if node.chainConfig.StakingEpoch.Cmp(next) == 0 && node.Consensus.Decider.Policy() != quorum.SuperMajorityStake { @@ -388,7 +385,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit return node.Consensus.ShardID, nil }) s, _ := committee.WithStakingEnabled.Compute( - next, node.chainConfig, node.Consensus.ChainReader, + next, &node.chainConfig, node.Consensus.ChainReader, ) prevSubCommitteeDump := node.Consensus.Decider.JSON() diff --git a/node/node_newblock.go b/node/node_newblock.go index b8dec7191..902532675 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -123,6 +123,9 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { node.Consensus.ShardID, node.Blockchain(), ) + utils.Logger().Print("TESTTEST") + utils.Logger().Print(shardState.JSON()) + if err != nil { return nil, err } diff --git a/node/node_resharding.go b/node/node_resharding.go index bd2a6d410..64b527f15 100644 --- a/node/node_resharding.go +++ b/node/node_resharding.go @@ -2,196 +2,17 @@ package node import ( "bytes" - "errors" "math" - "math/big" "os" "os/exec" "strconv" "syscall" - "time" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" - proto_node "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/core/types" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/shard" - "github.com/harmony-one/harmony/shard/committee" ) -// validateNewShardState validate whether the new shard state root matches -func (node *Node) validateNewShardState(block *types.Block) error { - // Common case first – blocks without resharding proposal - header := block.Header() - if header.ShardStateHash() == (common.Hash{}) { - // No new shard state was proposed - if block.ShardID() == shard.BeaconChainShardID { - if shard.Schedule.IsLastBlock(block.Number().Uint64()) { - // TODO ek - invoke view change - return errors.New("beacon leader did not propose resharding") - } - } else { - if node.nextShardState.master != nil && - !time.Now().Before(node.nextShardState.proposeTime) { - // TODO ek – invoke view change - return errors.New("regular leader did not propose resharding") - } - } - // We aren't expecting to reshard, so proceed to sign - return nil - } - shardState := &shard.State{} - err := rlp.DecodeBytes(header.ShardState(), shardState) - if err != nil { - return err - } - proposed := *shardState - if block.ShardID() == shard.BeaconChainShardID { - // Beacon validators independently recalculate the master state and - // compare it against the proposed copy. - // TODO ek – this may be called from regular shards, - // for vetting beacon chain blocks received during block syncing. - // DRand may or or may not get in the way. Test this out. - expected, err := committee.WithStakingEnabled.ReadFromDB( - new(big.Int).Sub(block.Header().Epoch(), common.Big1), - node.Beaconchain(), - ) - - if err != nil { - utils.Logger().Error().Err(err).Msg("cannot calculate expected shard state") - return ctxerror.New("cannot calculate expected shard state"). - WithCause(err) - } - if shard.CompareShardState(expected, proposed) != 0 { - // TODO ek – log state proposal differences - // TODO ek – this error should trigger view change - err := errors.New("shard state proposal is different from expected") - // TODO ek/chao – calculated shard state is different even with the - // same input, i.e. it is nondeterministic. - // Don't treat this as a blocker until we fix the nondeterminism. - utils.Logger().Warn().Err(err).Msg("shard state proposal is different from expected") - } - } else { - // Regular validators fetch the local-shard copy on the beacon chain - // and compare it against the proposed copy. - // - // We trust the master proposal in our copy of beacon chain. - // The sanity check for the master proposal is done earlier, - // when the beacon block containing the master proposal is received - // and before it is admitted into the local beacon chain. - // - // TODO ek – fetch masterProposal from beaconchain instead - masterProposal := node.nextShardState.master.ShardState - expected := masterProposal.FindCommitteeByID(block.ShardID()) - switch len(proposed) { - case 0: - // Proposal to discontinue shard - if expected != nil { - // TODO ek – invoke view change - utils.Logger().Error().Msg("leader proposed to disband against beacon decision") - return errors.New( - "leader proposed to disband against beacon decision") - } - case 1: - // Proposal to continue shard - proposed := proposed[0] - // Sanity check: Shard ID should match - if proposed.ShardID != block.ShardID() { - // TODO ek – invoke view change - utils.Logger().Error(). - Uint32("proposedShard", proposed.ShardID). - Uint32("blockShard", block.ShardID()). - Msg("proposal has incorrect shard ID") - return ctxerror.New("proposal has incorrect shard ID", - "proposedShard", proposed.ShardID, - "blockShard", block.ShardID()) - } - // Did beaconchain say we are no more? - if expected == nil { - // TODO ek – invoke view change - - utils.Logger().Error().Msg("leader proposed to continue against beacon decision") - return errors.New( - "leader proposed to continue against beacon decision") - } - // Did beaconchain say the same proposal? - if shard.CompareCommittee(expected, &proposed) != 0 { - // TODO ek – log differences - // TODO ek – invoke view change - utils.Logger().Error().Msg("proposal differs from one in beacon chain") - return errors.New("proposal differs from one in beacon chain") - } - default: - // TODO ek – invoke view change - utils.Logger().Error(). - Int("numShards", len(proposed)). - Msg("regular resharding proposal has incorrect number of shards") - return ctxerror.New( - "regular resharding proposal has incorrect number of shards", - "numShards", len(proposed)) - } - } - return nil -} - -func (node *Node) broadcastEpochShardState(newBlock *types.Block) error { - shardState, err := newBlock.Header().GetShardState() - if err != nil { - return err - } - epochShardStateMessage := proto_node.ConstructEpochShardStateMessage( - shard.EpochShardState{ - Epoch: newBlock.Header().Epoch().Uint64() + 1, - ShardState: shardState, - }, - ) - return node.host.SendMessageToGroups( - []nodeconfig.GroupID{node.NodeConfig.GetClientGroupID()}, - host.ConstructP2pMessage(byte(0), epochShardStateMessage)) -} - -func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error { - epochShardState, err := proto_node.DeserializeEpochShardStateFromMessage(msgPayload) - if err != nil { - utils.Logger().Error().Err(err).Msg("Can't get shard state message") - return ctxerror.New("Can't get shard state message").WithCause(err) - } - if node.Consensus == nil { - return nil - } - receivedEpoch := big.NewInt(int64(epochShardState.Epoch)) - utils.Logger().Info(). - Int64("epoch", receivedEpoch.Int64()). - Msg("received new shard state") - - node.nextShardState.master = epochShardState - if node.Consensus.IsLeader() { - // Wait a bit to allow the master table to reach other validators. - node.nextShardState.proposeTime = time.Now().Add(5 * time.Second) - } else { - // Wait a bit to allow the master table to reach the leader, - // and to allow the leader to propose next shard state based upon it. - node.nextShardState.proposeTime = time.Now().Add(15 * time.Second) - } - // TODO ek – this should be done from replaying beaconchain once - // beaconchain sync is fixed - err = node.Beaconchain().WriteShardState( - receivedEpoch, epochShardState.ShardState) - if err != nil { - utils.Logger().Error(). - Uint64("epoch", receivedEpoch.Uint64()). - Err(err).Msg("cannot store shard state") - return ctxerror.New("cannot store shard state", "epoch", receivedEpoch). - WithCause(err) - } - return nil -} - /* func (node *Node) transitionIntoNextEpoch(shardState types.State) { logger = logger.New( diff --git a/node/node_syncing.go b/node/node_syncing.go index de13836a2..a53ed4351 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -165,9 +165,11 @@ func (node *Node) DoBeaconSyncing() { for { select { case beaconBlock := <-node.BeaconBlockChannel: - err := node.beaconSync.UpdateBlockAndStatus(beaconBlock, node.Beaconchain(), node.BeaconWorker) - if err != nil { - node.beaconSync.AddLastMileBlock(beaconBlock) + if node.beaconSync != nil { + err := node.beaconSync.UpdateBlockAndStatus(beaconBlock, node.Beaconchain(), node.BeaconWorker) + if err != nil { + node.beaconSync.AddLastMileBlock(beaconBlock) + } } } } diff --git a/node/worker/worker.go b/node/worker/worker.go index 20f43c8f1..b16d9f896 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -290,26 +290,42 @@ func (w *Worker) SuperCommitteeForNextEpoch( nextCommittee shard.State oops error ) - // WARN This currently not working and breaks around 15 block - // switch shardID { - // case shard.BeaconChainShardID: - if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) { - nextCommittee, oops = committee.WithStakingEnabled.Compute( - new(big.Int).Add(w.current.header.Epoch(), common.Big1), - *w.config, - beacon, - ) + switch shardID { + case shard.BeaconChainShardID: + if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) { + nextCommittee, oops = committee.WithStakingEnabled.Compute( + new(big.Int).Add(w.current.header.Epoch(), common.Big1), + w.config, + beacon, + ) + } + default: + // WARN When we first enable staking, this condition may not be robust by itself. + + if w.config.IsStaking(w.current.header.Epoch()) { + utils.Logger().Print("CURRRRRRRRRR") + utils.Logger().Print(beacon.CurrentHeader().Number()) + utils.Logger().Print(beacon.CurrentHeader().Epoch()) + utils.Logger().Print(w.current.header.Epoch()) + switch beacon.CurrentHeader().Epoch().Cmp(w.current.header.Epoch()) { + case 1: + utils.Logger().Print("TTTTTTTT") + nextCommittee, oops = committee.WithStakingEnabled.ReadFromDB( + beacon.CurrentHeader().Epoch(), beacon, + ) + utils.Logger().Print(nextCommittee) + } + } else { + if shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) { + nextCommittee, oops = committee.WithStakingEnabled.Compute( + new(big.Int).Add(w.current.header.Epoch(), common.Big1), + w.config, + beacon, + ) + } + } + } - // default: - // WARN When we first enable staking, this condition may not be robust by itself. - // switch beacon.CurrentHeader().Epoch().Cmp(w.current.header.Epoch()) { - // case 1: - // nextCommittee, oops = committee.WithStakingEnabled.ReadFromDB( - // beacon.CurrentHeader().Epoch(), beacon, - // ) - // } - - // } return nextCommittee, oops } diff --git a/shard/committee/assignment.go b/shard/committee/assignment.go index ee24b58c1..d41e357b8 100644 --- a/shard/committee/assignment.go +++ b/shard/committee/assignment.go @@ -19,7 +19,7 @@ import ( // ValidatorListProvider .. type ValidatorListProvider interface { Compute( - epoch *big.Int, config params.ChainConfig, reader DataProvider, + epoch *big.Int, config *params.ChainConfig, reader DataProvider, ) (shard.State, error) ReadFromDB(epoch *big.Int, reader DataProvider) (shard.State, error) } @@ -185,15 +185,8 @@ func eposStakedCommittee( func (def partialStakingEnabled) ComputePublicKeys( epoch *big.Int, d DataProvider, ) [][]*bls.PublicKey { - config := d.Config() - instance := shard.Schedule.InstanceForEpoch(epoch) - superComm := shard.State{} - if config.IsStaking(epoch) { - superComm, _ = eposStakedCommittee(instance, d, 320) - } else { - superComm = preStakingEnabledCommittee(instance) - } + superComm, _ := def.Compute(epoch, config, d) allIdentities := make([][]*bls.PublicKey, len(superComm)) @@ -249,7 +242,7 @@ func (def partialStakingEnabled) ReadFromDB( // ReadFromComputation is single entry point for reading the State of the network func (def partialStakingEnabled) Compute( - epoch *big.Int, config params.ChainConfig, stakerReader DataProvider, + epoch *big.Int, config *params.ChainConfig, stakerReader DataProvider, ) (newSuperComm shard.State, err error) { instance := shard.Schedule.InstanceForEpoch(epoch) if !config.IsStaking(epoch) {