[quorum] Staked quorum member (#1819)

* [quorum] Factor out single vote & provide for staked quorum vote

* [quorum] Pass ShardID to decider via cb

* [quorum] Move ShardIDProvider higher, fix nil mistake

* [quorum] ReadAllSignatures optimization

* [quorum] Safer way to read over map, then to slice

* [quorum] Address PR comments - naming changes
pull/1821/head
Edgar Aroutiounian 5 years ago committed by GitHub
parent e6a4fbea4f
commit 55c9386e4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      cmd/harmony/main.go
  2. 10
      consensus/consensus_v2.go
  3. 78
      consensus/quorum/one-node-one-vote.go
  4. 75
      consensus/quorum/one-node-staked-vote.go
  5. 100
      consensus/quorum/quorum.go
  6. 12
      consensus/view_change.go
  7. 4
      core/blockchain.go
  8. 4
      internal/chain/engine.go
  9. 2
      node/node_explorer.go
  10. 4
      node/node_genesis.go
  11. 4
      node/node_newblock.go
  12. 2
      node/node_resharding.go
  13. 2
      node/worker/worker.go
  14. 14
      shard/committee/assignment.go

@ -288,6 +288,9 @@ func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
currentConsensus, err := consensus.New(
myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey, decider,
)
currentConsensus.Decider.SetShardIDProvider(func() (uint32, error) {
return currentConsensus.ShardID, nil
})
currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address)
if err != nil {

@ -385,7 +385,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
}
logger = logger.With().
Int64("NumReceivedSoFar", consensus.Decider.SignatoriesCount(quorum.Prepare)).
Int64("NumReceivedSoFar", consensus.Decider.SignersCount(quorum.Prepare)).
Int64("PublicKeys", consensus.Decider.ParticipantsCount()).Logger()
logger.Info().Msg("[OnPrepare] Received New Prepare Signature")
consensus.Decider.AddSignature(quorum.Prepare, validatorPubKey, &sign)
@ -497,7 +497,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
utils.Logger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!!")
return
}
prepareCount := consensus.Decider.SignatoriesCount(quorum.Prepare)
prepareCount := consensus.Decider.SignersCount(quorum.Prepare)
if count := utils.CountOneBits(mask.Bitmap); count < prepareCount {
utils.Logger().Debug().
Int64("Need", prepareCount).
@ -729,7 +729,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
}
logger = logger.With().
Int64("numReceivedSoFar", consensus.Decider.SignatoriesCount(quorum.Commit)).
Int64("numReceivedSoFar", consensus.Decider.SignersCount(quorum.Commit)).
Logger()
logger.Info().Msg("[OnCommit] Received new commit message")
consensus.Decider.AddSignature(quorum.Commit, validatorPubKey, &sign)
@ -761,7 +761,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
func (consensus *Consensus) finalizeCommits() {
utils.Logger().Info().
Int64("NumCommits", consensus.Decider.SignatoriesCount(quorum.Commit)).
Int64("NumCommits", consensus.Decider.SignersCount(quorum.Commit)).
Msg("[Finalizing] Finalizing Block")
beforeCatchupNum := consensus.blockNum
@ -885,7 +885,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
switch consensus.Decider.Policy() {
case quorum.SuperMajorityVote:
threshold := consensus.Decider.QuorumThreshold()
threshold := consensus.Decider.QuorumThreshold().Int64()
if count := utils.CountOneBits(mask.Bitmap); int64(count) < threshold {
utils.Logger().Warn().
Int64("need", threshold).

@ -0,0 +1,78 @@
package quorum
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/internal/utils"
// "github.com/harmony-one/harmony/staking/effective"
)
type uniformVoteWeight struct {
SignatureReader
DependencyInjectionWriter
}
// Policy ..
func (v *uniformVoteWeight) Policy() Policy {
return SuperMajorityVote
}
// func (v *uniformVoteWeight) SetShardIDProvider(p func() (uint32, error)) {
// v.p = p
// }
// IsQuorumAchieved ..
func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool {
r := v.SignersCount(p) >= v.QuorumThreshold().Int64()
utils.Logger().Info().Str("phase", p.String()).
Int64("signers-count", v.SignersCount(p)).
Int64("threshold", v.QuorumThreshold().Int64()).
Int64("participants", v.ParticipantsCount()).
Msg("Quorum details")
return r
}
// QuorumThreshold ..
func (v *uniformVoteWeight) QuorumThreshold() *big.Int {
return big.NewInt(v.ParticipantsCount()*2/3 + 1)
}
// RewardThreshold ..
func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool {
return v.SignersCount(Commit) >= (v.ParticipantsCount() * 9 / 10)
}
// func (v *uniformVoteWeight) UpdateVotingPower(effective.StakeKeeper) {
// NO-OP do not add anything here
// }
// ToggleActive for uniform vote is a no-op, always says that voter is active
func (v *uniformVoteWeight) ToggleActive(*bls.PublicKey) bool {
// NO-OP do not add anything here
return true
}
// Award ..
func (v *uniformVoteWeight) Award(
// Here hook is the callback which gets the amount the earner is due in just reward
// up to the hook to do side-effects like write the statedb
Pie *big.Int, earners []common2.Address, hook func(earner common.Address, due *big.Int),
) *big.Int {
payout := big.NewInt(0)
last := big.NewInt(0)
count := big.NewInt(int64(len(earners)))
for i, account := range earners {
cur := big.NewInt(0)
cur.Mul(Pie, big.NewInt(int64(i+1))).Div(cur, count)
diff := big.NewInt(0).Sub(cur, last)
hook(common.Address(account), diff)
payout = big.NewInt(0).Add(payout, diff)
last = cur
}
return payout
}

@ -0,0 +1,75 @@
package quorum
import (
"math/big"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
)
var (
twoThirds = numeric.NewDec(2).QuoInt64(3).Int
)
type stakedVoter struct {
isActive, isHarmonyNode bool
effective numeric.Dec
}
type stakedVoteWeight struct {
SignatureReader
DependencyInjectionWriter
// EPOS based staking
validatorStakes map[[shard.PublicKeySizeInBytes]byte]stakedVoter
totalEffectiveStakedAmount *big.Int
}
// Policy ..
func (v *stakedVoteWeight) Policy() Policy {
return SuperMajorityStake
}
// We must maintain 2/3 quoroum, so whatever is 2/3 staked amount,
// we divide that out & you
// IsQuorumAchieved ..
func (v *stakedVoteWeight) IsQuorumAchieved(p Phase) bool {
// TODO Implement this logic
return true
}
// QuorumThreshold ..
func (v *stakedVoteWeight) QuorumThreshold() *big.Int {
return new(big.Int).Mul(v.totalEffectiveStakedAmount, twoThirds)
}
// RewardThreshold ..
func (v *stakedVoteWeight) IsRewardThresholdAchieved() bool {
// TODO Implement
return false
}
// HACK
var (
hSentinel = big.NewInt(0)
hEffectiveSentinel = numeric.ZeroDec()
)
// Award ..
func (v *stakedVoteWeight) Award(
Pie *big.Int, earners []common.Address, hook func(earner common.Address, due *big.Int),
) *big.Int {
// TODO Implement
return nil
}
// UpdateVotingPower called only at epoch change, prob need to move to CalculateShardState
// func (v *stakedVoteWeight) UpdateVotingPower(keeper effective.StakeKeeper) {
// TODO Implement
// }
func (v *stakedVoteWeight) ToggleActive(*bls.PublicKey) bool {
// TODO Implement
return true
}

@ -1,7 +1,12 @@
package quorum
import (
"fmt"
"math/big"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/shard"
// "github.com/harmony-one/harmony/staking/effective"
)
// Phase is a phase that needs quorum to proceed
@ -16,6 +21,19 @@ const (
ViewChange
)
var phaseNames = map[Phase]string{
Prepare: "Announce",
Commit: "Prepare",
ViewChange: "Commit",
}
func (p Phase) String() string {
if name, ok := phaseNames[p]; ok {
return name
}
return fmt.Sprintf("Unknown Quorum Phase %+v", byte(p))
}
// Policy is the rule we used to decide is quorum achieved
type Policy byte
@ -41,7 +59,7 @@ type SignatoryTracker interface {
ParticipantTracker
AddSignature(p Phase, PubKey *bls.PublicKey, sig *bls.Sign)
// Caller assumes concurrency protection
SignatoriesCount(Phase) int64
SignersCount(Phase) int64
Reset([]Phase)
}
@ -52,6 +70,23 @@ type SignatureReader interface {
ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign
}
// DependencyInjectionWriter ..
type DependencyInjectionWriter interface {
SetShardIDProvider(func() (uint32, error))
}
// Decider ..
type Decider interface {
SignatureReader
DependencyInjectionWriter
ToggleActive(*bls.PublicKey) bool
// UpdateVotingPower(keeper effective.StakeKeeper)
Policy() Policy
IsQuorumAchieved(Phase) bool
QuorumThreshold() *big.Int
IsRewardThresholdAchieved() bool
}
// These maps represent the signatories (validators), keys are BLS public keys
// and values are BLS private key signed signatures
type cIdentities struct {
@ -64,6 +99,14 @@ type cIdentities struct {
viewID map[string]*bls.Sign
}
type depInject struct {
shardIDProvider func() (uint32, error)
}
func (d *depInject) SetShardIDProvider(p func() (uint32, error)) {
d.shardIDProvider = p
}
func (s *cIdentities) IndexOf(pubKey *bls.PublicKey) int {
idx := -1
for k, v := range s.publicKeys {
@ -104,7 +147,7 @@ func (s *cIdentities) ParticipantsCount() int64 {
return int64(len(s.publicKeys))
}
func (s *cIdentities) SignatoriesCount(p Phase) int64 {
func (s *cIdentities) SignersCount(p Phase) int64 {
switch p {
case Prepare:
return int64(len(s.prepare))
@ -164,9 +207,7 @@ func (s *cIdentities) ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign {
}
func (s *cIdentities) ReadAllSignatures(p Phase) []*bls.Sign {
sigs := []*bls.Sign{}
m := map[string]*bls.Sign{}
switch p {
case Prepare:
m = s.prepare
@ -175,9 +216,9 @@ func (s *cIdentities) ReadAllSignatures(p Phase) []*bls.Sign {
case ViewChange:
m = s.viewID
}
for _, sig := range m {
sigs = append(sigs, sig)
sigs := make([]*bls.Sign, 0, len(m))
for _, value := range m {
sigs = append(sigs, value)
}
return sigs
}
@ -189,47 +230,22 @@ func newMapBackedSignatureReader() SignatureReader {
}
}
// Decider ..
type Decider interface {
SignatureReader
Policy() Policy
IsQuorumAchieved(Phase) bool
QuorumThreshold() int64
IsRewardThresholdAchieved() bool
}
type uniformVoteWeight struct {
SignatureReader
}
// NewDecider ..
func NewDecider(p Policy) Decider {
signatureStore := newMapBackedSignatureReader()
dependencies := &depInject{}
switch p {
case SuperMajorityVote:
return &uniformVoteWeight{newMapBackedSignatureReader()}
// case SuperMajorityStake:
return &uniformVoteWeight{signatureStore, dependencies}
case SuperMajorityStake:
return &stakedVoteWeight{
signatureStore,
dependencies,
map[[shard.PublicKeySizeInBytes]byte]stakedVoter{},
big.NewInt(0),
}
default:
// Should not be possible
return nil
}
}
// Policy ..
func (v *uniformVoteWeight) Policy() Policy {
return SuperMajorityVote
}
// IsQuorumAchieved ..
func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool {
return v.SignatoriesCount(p) >= v.QuorumThreshold()
}
// QuorumThreshold ..
func (v *uniformVoteWeight) QuorumThreshold() int64 {
return v.ParticipantsCount()*2/3 + 1
}
// RewardThreshold ..
func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool {
return v.SignatoriesCount(Commit) >= (v.ParticipantsCount() * 9 / 10)
}

@ -157,8 +157,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
if consensus.Decider.IsQuorumAchieved(quorum.ViewChange) {
utils.Logger().Debug().
Int64("have", consensus.Decider.SignatoriesCount(quorum.ViewChange)).
Int64("need", consensus.Decider.QuorumThreshold()).
Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.Decider.QuorumThreshold().Int64()).
Str("validatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()).
Msg("[onViewChange] Received Enough View Change Messages")
return
@ -282,7 +282,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
// check has 2f+1 signature in m1 type message
need := consensus.Decider.QuorumThreshold()
need := consensus.Decider.QuorumThreshold().Int64()
if count := utils.CountOneBits(mask.Bitmap); count < need {
utils.Logger().Debug().Int64("need", need).Int64("have", count).
Msg("[onViewChange] M1 Payload Not Have Enough Signature")
@ -345,8 +345,8 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
// Set the bitmap indicating that this validator signed.
consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true)
utils.Logger().Debug().
Int64("numSigs", consensus.Decider.SignatoriesCount(quorum.ViewChange)).
Int64("needed", consensus.Decider.QuorumThreshold()).
Int64("numSigs", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("needed", consensus.Decider.QuorumThreshold().Int64()).
Msg("[onViewChange]")
// received enough view change messages, change state to normal consensus
@ -446,7 +446,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
// check total number of sigs >= 2f+1
need := consensus.Decider.QuorumThreshold()
need := consensus.Decider.QuorumThreshold().Int64()
if count := utils.CountOneBits(m3Mask.Bitmap); count < need {
utils.Logger().Debug().Int64("need", need).Int64("have", count).
Msg("[onNewView] Not Have Enough M3 (ViewID) Signature")

@ -1935,12 +1935,12 @@ func (bc *BlockChain) GetShardState(epoch *big.Int) (shard.State, error) {
}
if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 {
shardState, err = committee.WithStakingEnabled.ReadFromComputation(
shardState, err = committee.WithStakingEnabled.Compute(
big.NewInt(GenesisEpoch), *bc.Config(), nil,
)
} else {
prevEpoch := new(big.Int).Sub(epoch, common.Big1)
shardState, err = committee.WithStakingEnabled.ReadFromChain(
shardState, err = committee.WithStakingEnabled.ReadFromDB(
prevEpoch, bc,
)
}

@ -167,7 +167,7 @@ func (e *engineImpl) Finalize(
func QuorumForBlock(chain engine.ChainReader, h *block.Header, reCalculate bool) (quorum int, err error) {
var ss shard.State
if reCalculate {
ss, _ = committee.WithStakingEnabled.ReadFromComputation(h.Epoch(), *chain.Config(), nil)
ss, _ = committee.WithStakingEnabled.Compute(h.Epoch(), *chain.Config(), nil)
} else {
ss, err = chain.ReadShardState(h.Epoch())
if err != nil {
@ -226,7 +226,7 @@ func GetPublicKeys(chain engine.ChainReader, header *block.Header, reCalculate b
var shardState shard.State
var err error
if reCalculate {
shardState, _ = committee.WithStakingEnabled.ReadFromComputation(header.Epoch(), *chain.Config(), nil)
shardState, _ = committee.WithStakingEnabled.Compute(header.Epoch(), *chain.Config(), nil)
} else {
shardState, err = chain.ReadShardState(header.Epoch())
if err != nil {

@ -50,7 +50,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
}
// check has 2f+1 signatures
need := node.Consensus.Decider.QuorumThreshold()
need := node.Consensus.Decider.QuorumThreshold().Int64()
if count := utils.CountOneBits(mask.Bitmap); count < need {
utils.Logger().Error().Int64("need", need).Int64("have", count).
Msg("[Explorer] not have enough signature")

@ -8,10 +8,8 @@ import (
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
blockfactory "github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/core"
@ -42,7 +40,7 @@ type genesisInitializer struct {
// InitChainDB sets up a new genesis block in the database for the given shard.
func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error {
shardState, _ := committee.WithStakingEnabled.ReadFromComputation(
shardState, _ := committee.WithStakingEnabled.Compute(
big.NewInt(core.GenesisEpoch), gi.node.chainConfig, nil,
)
if shardID != shard.BeaconChainShardID {

@ -127,7 +127,7 @@ func (node *Node) proposeShardStateWithoutBeaconSync(block *types.Block) shard.S
if block == nil || !shard.Schedule.IsLastBlock(block.Number().Uint64()) {
return nil
}
shardState, _ := committee.WithStakingEnabled.ReadFromComputation(
shardState, _ := committee.WithStakingEnabled.Compute(
new(big.Int).Add(block.Header().Epoch(), common.Big1), node.chainConfig, nil,
)
return shardState
@ -151,7 +151,7 @@ func (node *Node) proposeBeaconShardState(block *types.Block) error {
}
// TODO Use ReadFromComputation
prevEpoch := new(big.Int).Sub(block.Header().Epoch(), common.Big1)
shardState, err := committee.WithStakingEnabled.ReadFromChain(
shardState, err := committee.WithStakingEnabled.ReadFromDB(
prevEpoch, node.Blockchain(),
)
if err != nil {

@ -57,7 +57,7 @@ func (node *Node) validateNewShardState(block *types.Block) error {
// TODO ek – this may be called from regular shards,
// for vetting beacon chain blocks received during block syncing.
// DRand may or or may not get in the way. Test this out.
expected, err := committee.WithStakingEnabled.ReadFromChain(
expected, err := committee.WithStakingEnabled.ReadFromDB(
new(big.Int).Sub(block.Header().Epoch(), common.Big1),
node.Beaconchain(),
)

@ -368,7 +368,7 @@ func (w *Worker) ProposeShardStateWithoutBeaconSync() shard.State {
if !shard.Schedule.IsLastBlock(w.current.header.Number().Uint64()) {
return nil
}
shardState, _ := committee.WithStakingEnabled.ReadFromComputation(
shardState, _ := committee.WithStakingEnabled.Compute(
new(big.Int).Add(w.current.header.Epoch(), common.Big1), *w.config, nil,
)
return shardState

@ -21,12 +21,12 @@ import (
// a shardID parameter
const StateID = -1
// MembershipList ..
type MembershipList interface {
ReadFromComputation(
// ValidatorList ..
type ValidatorList interface {
Compute(
epoch *big.Int, config params.ChainConfig, reader StakingCandidatesReader,
) (shard.State, error)
ReadFromChain(epoch *big.Int, reader ChainReader) (shard.State, error)
ReadFromDB(epoch *big.Int, reader ChainReader) (shard.State, error)
}
// PublicKeys per epoch
@ -45,7 +45,7 @@ type PublicKeys interface {
// Reader ..
type Reader interface {
PublicKeys
MembershipList
ValidatorList
}
// StakingCandidatesReader ..
@ -243,14 +243,14 @@ func (def partialStakingEnabled) ComputePublicKeys(
return nil, nil
}
func (def partialStakingEnabled) ReadFromChain(
func (def partialStakingEnabled) ReadFromDB(
epoch *big.Int, reader ChainReader,
) (newSuperComm shard.State, err error) {
return reader.ReadShardState(epoch)
}
// ReadFromComputation is single entry point for reading the State of the network
func (def partialStakingEnabled) ReadFromComputation(
func (def partialStakingEnabled) Compute(
epoch *big.Int, config params.ChainConfig, stakerReader StakingCandidatesReader,
) (newSuperComm shard.State, err error) {
instance := shard.Schedule.InstanceForEpoch(epoch)

Loading…
Cancel
Save