From 55c9386e4c5588f68376ecb5b232e492c530a9b4 Mon Sep 17 00:00:00 2001 From: Edgar Aroutiounian Date: Sun, 10 Nov 2019 21:18:15 -0800 Subject: [PATCH] [quorum] Staked quorum member (#1819) * [quorum] Factor out single vote & provide for staked quorum vote * [quorum] Pass ShardID to decider via cb * [quorum] Move ShardIDProvider higher, fix nil mistake * [quorum] ReadAllSignatures optimization * [quorum] Safer way to read over map, then to slice * [quorum] Address PR comments - naming changes --- cmd/harmony/main.go | 3 + consensus/consensus_v2.go | 10 +-- consensus/quorum/one-node-one-vote.go | 78 ++++++++++++++++++ consensus/quorum/one-node-staked-vote.go | 75 +++++++++++++++++ consensus/quorum/quorum.go | 100 +++++++++++++---------- consensus/view_change.go | 12 +-- core/blockchain.go | 4 +- internal/chain/engine.go | 4 +- node/node_explorer.go | 2 +- node/node_genesis.go | 4 +- node/node_newblock.go | 4 +- node/node_resharding.go | 2 +- node/worker/worker.go | 2 +- shard/committee/assignment.go | 14 ++-- 14 files changed, 242 insertions(+), 72 deletions(-) create mode 100644 consensus/quorum/one-node-one-vote.go create mode 100644 consensus/quorum/one-node-staked-vote.go diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index efe440d8e..2cb582bcf 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -288,6 +288,9 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { currentConsensus, err := consensus.New( myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey, decider, ) + currentConsensus.Decider.SetShardIDProvider(func() (uint32, error) { + return currentConsensus.ShardID, nil + }) currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address) if err != nil { diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index cc917476c..6d68c905e 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -385,7 +385,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { } logger = logger.With(). - Int64("NumReceivedSoFar", consensus.Decider.SignatoriesCount(quorum.Prepare)). + Int64("NumReceivedSoFar", consensus.Decider.SignersCount(quorum.Prepare)). Int64("PublicKeys", consensus.Decider.ParticipantsCount()).Logger() logger.Info().Msg("[OnPrepare] Received New Prepare Signature") consensus.Decider.AddSignature(quorum.Prepare, validatorPubKey, &sign) @@ -497,7 +497,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { utils.Logger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!!") return } - prepareCount := consensus.Decider.SignatoriesCount(quorum.Prepare) + prepareCount := consensus.Decider.SignersCount(quorum.Prepare) if count := utils.CountOneBits(mask.Bitmap); count < prepareCount { utils.Logger().Debug(). Int64("Need", prepareCount). @@ -729,7 +729,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { } logger = logger.With(). - Int64("numReceivedSoFar", consensus.Decider.SignatoriesCount(quorum.Commit)). + Int64("numReceivedSoFar", consensus.Decider.SignersCount(quorum.Commit)). Logger() logger.Info().Msg("[OnCommit] Received new commit message") consensus.Decider.AddSignature(quorum.Commit, validatorPubKey, &sign) @@ -761,7 +761,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { func (consensus *Consensus) finalizeCommits() { utils.Logger().Info(). - Int64("NumCommits", consensus.Decider.SignatoriesCount(quorum.Commit)). + Int64("NumCommits", consensus.Decider.SignersCount(quorum.Commit)). Msg("[Finalizing] Finalizing Block") beforeCatchupNum := consensus.blockNum @@ -885,7 +885,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { switch consensus.Decider.Policy() { case quorum.SuperMajorityVote: - threshold := consensus.Decider.QuorumThreshold() + threshold := consensus.Decider.QuorumThreshold().Int64() if count := utils.CountOneBits(mask.Bitmap); int64(count) < threshold { utils.Logger().Warn(). Int64("need", threshold). diff --git a/consensus/quorum/one-node-one-vote.go b/consensus/quorum/one-node-one-vote.go new file mode 100644 index 000000000..23a9867ef --- /dev/null +++ b/consensus/quorum/one-node-one-vote.go @@ -0,0 +1,78 @@ +package quorum + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/bls/ffi/go/bls" + common2 "github.com/harmony-one/harmony/internal/common" + "github.com/harmony-one/harmony/internal/utils" + // "github.com/harmony-one/harmony/staking/effective" +) + +type uniformVoteWeight struct { + SignatureReader + DependencyInjectionWriter +} + +// Policy .. +func (v *uniformVoteWeight) Policy() Policy { + return SuperMajorityVote +} + +// func (v *uniformVoteWeight) SetShardIDProvider(p func() (uint32, error)) { +// v.p = p +// } + +// IsQuorumAchieved .. +func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool { + r := v.SignersCount(p) >= v.QuorumThreshold().Int64() + utils.Logger().Info().Str("phase", p.String()). + Int64("signers-count", v.SignersCount(p)). + Int64("threshold", v.QuorumThreshold().Int64()). + Int64("participants", v.ParticipantsCount()). + Msg("Quorum details") + return r +} + +// QuorumThreshold .. +func (v *uniformVoteWeight) QuorumThreshold() *big.Int { + return big.NewInt(v.ParticipantsCount()*2/3 + 1) +} + +// RewardThreshold .. +func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool { + return v.SignersCount(Commit) >= (v.ParticipantsCount() * 9 / 10) +} + +// func (v *uniformVoteWeight) UpdateVotingPower(effective.StakeKeeper) { +// NO-OP do not add anything here +// } + +// ToggleActive for uniform vote is a no-op, always says that voter is active +func (v *uniformVoteWeight) ToggleActive(*bls.PublicKey) bool { + // NO-OP do not add anything here + return true +} + +// Award .. +func (v *uniformVoteWeight) Award( + // Here hook is the callback which gets the amount the earner is due in just reward + // up to the hook to do side-effects like write the statedb + Pie *big.Int, earners []common2.Address, hook func(earner common.Address, due *big.Int), +) *big.Int { + payout := big.NewInt(0) + last := big.NewInt(0) + count := big.NewInt(int64(len(earners))) + + for i, account := range earners { + cur := big.NewInt(0) + cur.Mul(Pie, big.NewInt(int64(i+1))).Div(cur, count) + diff := big.NewInt(0).Sub(cur, last) + hook(common.Address(account), diff) + payout = big.NewInt(0).Add(payout, diff) + last = cur + } + + return payout +} diff --git a/consensus/quorum/one-node-staked-vote.go b/consensus/quorum/one-node-staked-vote.go new file mode 100644 index 000000000..6af84a7aa --- /dev/null +++ b/consensus/quorum/one-node-staked-vote.go @@ -0,0 +1,75 @@ +package quorum + +import ( + "math/big" + + "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/internal/common" + "github.com/harmony-one/harmony/numeric" + "github.com/harmony-one/harmony/shard" +) + +var ( + twoThirds = numeric.NewDec(2).QuoInt64(3).Int +) + +type stakedVoter struct { + isActive, isHarmonyNode bool + effective numeric.Dec +} + +type stakedVoteWeight struct { + SignatureReader + DependencyInjectionWriter + // EPOS based staking + validatorStakes map[[shard.PublicKeySizeInBytes]byte]stakedVoter + totalEffectiveStakedAmount *big.Int +} + +// Policy .. +func (v *stakedVoteWeight) Policy() Policy { + return SuperMajorityStake +} + +// We must maintain 2/3 quoroum, so whatever is 2/3 staked amount, +// we divide that out & you +// IsQuorumAchieved .. +func (v *stakedVoteWeight) IsQuorumAchieved(p Phase) bool { + // TODO Implement this logic + return true +} + +// QuorumThreshold .. +func (v *stakedVoteWeight) QuorumThreshold() *big.Int { + return new(big.Int).Mul(v.totalEffectiveStakedAmount, twoThirds) +} + +// RewardThreshold .. +func (v *stakedVoteWeight) IsRewardThresholdAchieved() bool { + // TODO Implement + return false +} + +// HACK +var ( + hSentinel = big.NewInt(0) + hEffectiveSentinel = numeric.ZeroDec() +) + +// Award .. +func (v *stakedVoteWeight) Award( + Pie *big.Int, earners []common.Address, hook func(earner common.Address, due *big.Int), +) *big.Int { + // TODO Implement + return nil +} + +// UpdateVotingPower called only at epoch change, prob need to move to CalculateShardState +// func (v *stakedVoteWeight) UpdateVotingPower(keeper effective.StakeKeeper) { +// TODO Implement +// } + +func (v *stakedVoteWeight) ToggleActive(*bls.PublicKey) bool { + // TODO Implement + return true +} diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index 6afcf4fc3..61d91cfc6 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -1,7 +1,12 @@ package quorum import ( + "fmt" + "math/big" + "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/shard" + // "github.com/harmony-one/harmony/staking/effective" ) // Phase is a phase that needs quorum to proceed @@ -16,6 +21,19 @@ const ( ViewChange ) +var phaseNames = map[Phase]string{ + Prepare: "Announce", + Commit: "Prepare", + ViewChange: "Commit", +} + +func (p Phase) String() string { + if name, ok := phaseNames[p]; ok { + return name + } + return fmt.Sprintf("Unknown Quorum Phase %+v", byte(p)) +} + // Policy is the rule we used to decide is quorum achieved type Policy byte @@ -41,7 +59,7 @@ type SignatoryTracker interface { ParticipantTracker AddSignature(p Phase, PubKey *bls.PublicKey, sig *bls.Sign) // Caller assumes concurrency protection - SignatoriesCount(Phase) int64 + SignersCount(Phase) int64 Reset([]Phase) } @@ -52,6 +70,23 @@ type SignatureReader interface { ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign } +// DependencyInjectionWriter .. +type DependencyInjectionWriter interface { + SetShardIDProvider(func() (uint32, error)) +} + +// Decider .. +type Decider interface { + SignatureReader + DependencyInjectionWriter + ToggleActive(*bls.PublicKey) bool + // UpdateVotingPower(keeper effective.StakeKeeper) + Policy() Policy + IsQuorumAchieved(Phase) bool + QuorumThreshold() *big.Int + IsRewardThresholdAchieved() bool +} + // These maps represent the signatories (validators), keys are BLS public keys // and values are BLS private key signed signatures type cIdentities struct { @@ -64,6 +99,14 @@ type cIdentities struct { viewID map[string]*bls.Sign } +type depInject struct { + shardIDProvider func() (uint32, error) +} + +func (d *depInject) SetShardIDProvider(p func() (uint32, error)) { + d.shardIDProvider = p +} + func (s *cIdentities) IndexOf(pubKey *bls.PublicKey) int { idx := -1 for k, v := range s.publicKeys { @@ -104,7 +147,7 @@ func (s *cIdentities) ParticipantsCount() int64 { return int64(len(s.publicKeys)) } -func (s *cIdentities) SignatoriesCount(p Phase) int64 { +func (s *cIdentities) SignersCount(p Phase) int64 { switch p { case Prepare: return int64(len(s.prepare)) @@ -164,9 +207,7 @@ func (s *cIdentities) ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign { } func (s *cIdentities) ReadAllSignatures(p Phase) []*bls.Sign { - sigs := []*bls.Sign{} m := map[string]*bls.Sign{} - switch p { case Prepare: m = s.prepare @@ -175,9 +216,9 @@ func (s *cIdentities) ReadAllSignatures(p Phase) []*bls.Sign { case ViewChange: m = s.viewID } - - for _, sig := range m { - sigs = append(sigs, sig) + sigs := make([]*bls.Sign, 0, len(m)) + for _, value := range m { + sigs = append(sigs, value) } return sigs } @@ -189,47 +230,22 @@ func newMapBackedSignatureReader() SignatureReader { } } -// Decider .. -type Decider interface { - SignatureReader - Policy() Policy - IsQuorumAchieved(Phase) bool - QuorumThreshold() int64 - IsRewardThresholdAchieved() bool -} - -type uniformVoteWeight struct { - SignatureReader -} - // NewDecider .. func NewDecider(p Policy) Decider { + signatureStore := newMapBackedSignatureReader() + dependencies := &depInject{} switch p { case SuperMajorityVote: - return &uniformVoteWeight{newMapBackedSignatureReader()} - // case SuperMajorityStake: + return &uniformVoteWeight{signatureStore, dependencies} + case SuperMajorityStake: + return &stakedVoteWeight{ + signatureStore, + dependencies, + map[[shard.PublicKeySizeInBytes]byte]stakedVoter{}, + big.NewInt(0), + } default: // Should not be possible return nil } } - -// Policy .. -func (v *uniformVoteWeight) Policy() Policy { - return SuperMajorityVote -} - -// IsQuorumAchieved .. -func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool { - return v.SignatoriesCount(p) >= v.QuorumThreshold() -} - -// QuorumThreshold .. -func (v *uniformVoteWeight) QuorumThreshold() int64 { - return v.ParticipantsCount()*2/3 + 1 -} - -// RewardThreshold .. -func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool { - return v.SignatoriesCount(Commit) >= (v.ParticipantsCount() * 9 / 10) -} diff --git a/consensus/view_change.go b/consensus/view_change.go index db2aa8e91..b785f3417 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -157,8 +157,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { if consensus.Decider.IsQuorumAchieved(quorum.ViewChange) { utils.Logger().Debug(). - Int64("have", consensus.Decider.SignatoriesCount(quorum.ViewChange)). - Int64("need", consensus.Decider.QuorumThreshold()). + Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)). + Int64("need", consensus.Decider.QuorumThreshold().Int64()). Str("validatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()). Msg("[onViewChange] Received Enough View Change Messages") return @@ -282,7 +282,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { return } // check has 2f+1 signature in m1 type message - need := consensus.Decider.QuorumThreshold() + need := consensus.Decider.QuorumThreshold().Int64() if count := utils.CountOneBits(mask.Bitmap); count < need { utils.Logger().Debug().Int64("need", need).Int64("have", count). Msg("[onViewChange] M1 Payload Not Have Enough Signature") @@ -345,8 +345,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { // Set the bitmap indicating that this validator signed. consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true) utils.Logger().Debug(). - Int64("numSigs", consensus.Decider.SignatoriesCount(quorum.ViewChange)). - Int64("needed", consensus.Decider.QuorumThreshold()). + Int64("numSigs", consensus.Decider.SignersCount(quorum.ViewChange)). + Int64("needed", consensus.Decider.QuorumThreshold().Int64()). Msg("[onViewChange]") // received enough view change messages, change state to normal consensus @@ -446,7 +446,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) { viewIDBytes := make([]byte, 8) binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID) // check total number of sigs >= 2f+1 - need := consensus.Decider.QuorumThreshold() + need := consensus.Decider.QuorumThreshold().Int64() if count := utils.CountOneBits(m3Mask.Bitmap); count < need { utils.Logger().Debug().Int64("need", need).Int64("have", count). Msg("[onNewView] Not Have Enough M3 (ViewID) Signature") diff --git a/core/blockchain.go b/core/blockchain.go index 33fb29f92..1e5f3a9cb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1935,12 +1935,12 @@ func (bc *BlockChain) GetShardState(epoch *big.Int) (shard.State, error) { } if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 { - shardState, err = committee.WithStakingEnabled.ReadFromComputation( + shardState, err = committee.WithStakingEnabled.Compute( big.NewInt(GenesisEpoch), *bc.Config(), nil, ) } else { prevEpoch := new(big.Int).Sub(epoch, common.Big1) - shardState, err = committee.WithStakingEnabled.ReadFromChain( + shardState, err = committee.WithStakingEnabled.ReadFromDB( prevEpoch, bc, ) } diff --git a/internal/chain/engine.go b/internal/chain/engine.go index e927e6a65..922987dea 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -167,7 +167,7 @@ func (e *engineImpl) Finalize( func QuorumForBlock(chain engine.ChainReader, h *block.Header, reCalculate bool) (quorum int, err error) { var ss shard.State if reCalculate { - ss, _ = committee.WithStakingEnabled.ReadFromComputation(h.Epoch(), *chain.Config(), nil) + ss, _ = committee.WithStakingEnabled.Compute(h.Epoch(), *chain.Config(), nil) } else { ss, err = chain.ReadShardState(h.Epoch()) if err != nil { @@ -226,7 +226,7 @@ func GetPublicKeys(chain engine.ChainReader, header *block.Header, reCalculate b var shardState shard.State var err error if reCalculate { - shardState, _ = committee.WithStakingEnabled.ReadFromComputation(header.Epoch(), *chain.Config(), nil) + shardState, _ = committee.WithStakingEnabled.Compute(header.Epoch(), *chain.Config(), nil) } else { shardState, err = chain.ReadShardState(header.Epoch()) if err != nil { diff --git a/node/node_explorer.go b/node/node_explorer.go index 70fa2fe6e..3919c3890 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -50,7 +50,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) { } // check has 2f+1 signatures - need := node.Consensus.Decider.QuorumThreshold() + need := node.Consensus.Decider.QuorumThreshold().Int64() if count := utils.CountOneBits(mask.Bitmap); count < need { utils.Logger().Error().Int64("need", need).Int64("have", count). Msg("[Explorer] not have enough signature") diff --git a/node/node_genesis.go b/node/node_genesis.go index bc9740743..61400eb5b 100644 --- a/node/node_genesis.go +++ b/node/node_genesis.go @@ -8,10 +8,8 @@ import ( "strings" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" - blockfactory "github.com/harmony-one/harmony/block/factory" "github.com/harmony-one/harmony/common/denominations" "github.com/harmony-one/harmony/core" @@ -42,7 +40,7 @@ type genesisInitializer struct { // InitChainDB sets up a new genesis block in the database for the given shard. func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error { - shardState, _ := committee.WithStakingEnabled.ReadFromComputation( + shardState, _ := committee.WithStakingEnabled.Compute( big.NewInt(core.GenesisEpoch), gi.node.chainConfig, nil, ) if shardID != shard.BeaconChainShardID { diff --git a/node/node_newblock.go b/node/node_newblock.go index f6df9c1bd..515ec4212 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -127,7 +127,7 @@ func (node *Node) proposeShardStateWithoutBeaconSync(block *types.Block) shard.S if block == nil || !shard.Schedule.IsLastBlock(block.Number().Uint64()) { return nil } - shardState, _ := committee.WithStakingEnabled.ReadFromComputation( + shardState, _ := committee.WithStakingEnabled.Compute( new(big.Int).Add(block.Header().Epoch(), common.Big1), node.chainConfig, nil, ) return shardState @@ -151,7 +151,7 @@ func (node *Node) proposeBeaconShardState(block *types.Block) error { } // TODO Use ReadFromComputation prevEpoch := new(big.Int).Sub(block.Header().Epoch(), common.Big1) - shardState, err := committee.WithStakingEnabled.ReadFromChain( + shardState, err := committee.WithStakingEnabled.ReadFromDB( prevEpoch, node.Blockchain(), ) if err != nil { diff --git a/node/node_resharding.go b/node/node_resharding.go index 0c55369d9..c15e5bf04 100644 --- a/node/node_resharding.go +++ b/node/node_resharding.go @@ -57,7 +57,7 @@ func (node *Node) validateNewShardState(block *types.Block) error { // TODO ek – this may be called from regular shards, // for vetting beacon chain blocks received during block syncing. // DRand may or or may not get in the way. Test this out. - expected, err := committee.WithStakingEnabled.ReadFromChain( + expected, err := committee.WithStakingEnabled.ReadFromDB( new(big.Int).Sub(block.Header().Epoch(), common.Big1), node.Beaconchain(), ) diff --git a/node/worker/worker.go b/node/worker/worker.go index 8fc3a2054..e3cd959c9 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -368,7 +368,7 @@ func (w *Worker) ProposeShardStateWithoutBeaconSync() shard.State { if !shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) { return nil } - shardState, _ := committee.WithStakingEnabled.ReadFromComputation( + shardState, _ := committee.WithStakingEnabled.Compute( new(big.Int).Add(w.current.header.Epoch(), common.Big1), *w.config, nil, ) return shardState diff --git a/shard/committee/assignment.go b/shard/committee/assignment.go index 69afb9259..11143dd27 100644 --- a/shard/committee/assignment.go +++ b/shard/committee/assignment.go @@ -21,12 +21,12 @@ import ( // a shardID parameter const StateID = -1 -// MembershipList .. -type MembershipList interface { - ReadFromComputation( +// ValidatorList .. +type ValidatorList interface { + Compute( epoch *big.Int, config params.ChainConfig, reader StakingCandidatesReader, ) (shard.State, error) - ReadFromChain(epoch *big.Int, reader ChainReader) (shard.State, error) + ReadFromDB(epoch *big.Int, reader ChainReader) (shard.State, error) } // PublicKeys per epoch @@ -45,7 +45,7 @@ type PublicKeys interface { // Reader .. type Reader interface { PublicKeys - MembershipList + ValidatorList } // StakingCandidatesReader .. @@ -243,14 +243,14 @@ func (def partialStakingEnabled) ComputePublicKeys( return nil, nil } -func (def partialStakingEnabled) ReadFromChain( +func (def partialStakingEnabled) ReadFromDB( epoch *big.Int, reader ChainReader, ) (newSuperComm shard.State, err error) { return reader.ReadShardState(epoch) } // ReadFromComputation is single entry point for reading the State of the network -func (def partialStakingEnabled) ReadFromComputation( +func (def partialStakingEnabled) Compute( epoch *big.Int, config params.ChainConfig, stakerReader StakingCandidatesReader, ) (newSuperComm shard.State, err error) { instance := shard.Schedule.InstanceForEpoch(epoch)