pull/1854/head
Ganesha Upadhyaya 5 years ago
commit bf0590244e
  1. 2
      consensus/consensus_v2.go
  2. 13
      consensus/quorum/one-node-one-vote.go
  3. 207
      consensus/quorum/one-node-staked-vote.go
  4. 13
      consensus/quorum/quorum.go
  5. 8
      consensus/view_change.go
  6. 21
      go.mod
  7. 64
      internal/utils/gomock_reflect_069400606/prog.go
  8. 64
      internal/utils/gomock_reflect_579506979/prog.go
  9. 2
      node/node_explorer.go
  10. 21
      node/node_handler.go
  11. 84
      p2p/host/hostv2/hostv2.go
  12. 87
      p2p/host/hostv2/hostv2_mock_for_test.go
  13. 261
      p2p/host/hostv2/hostv2_test.go
  14. 1
      scripts/list_harmony_go_files.sh
  15. 2
      shard/shard_state.go

@ -884,7 +884,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) {
switch consensus.Decider.Policy() {
case quorum.SuperMajorityVote:
threshold := consensus.Decider.QuorumThreshold().Int64()
threshold := consensus.Decider.TwoThirdsSignersCount()
if count := utils.CountOneBits(mask.Bitmap); int64(count) < threshold {
utils.Logger().Warn().
Int64("need", threshold).

@ -7,8 +7,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
// "github.com/harmony-one/harmony/staking/effective"
)
type uniformVoteWeight struct {
@ -24,7 +24,7 @@ func (v *uniformVoteWeight) Policy() Policy {
// IsQuorumAchieved ..
func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool {
r := v.SignersCount(p) >= v.QuorumThreshold().Int64()
r := v.SignersCount(p) >= v.TwoThirdsSignersCount()
utils.Logger().Info().Str("phase", p.String()).
Int64("signers-count", v.SignersCount(p)).
Int64("threshold", v.QuorumThreshold().Int64()).
@ -34,8 +34,8 @@ func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool {
}
// QuorumThreshold ..
func (v *uniformVoteWeight) QuorumThreshold() *big.Int {
return big.NewInt(v.ParticipantsCount()*2/3 + 1)
func (v *uniformVoteWeight) QuorumThreshold() numeric.Dec {
return numeric.NewDec(v.TwoThirdsSignersCount())
}
// RewardThreshold ..
@ -43,8 +43,11 @@ func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool {
return v.SignersCount(Commit) >= (v.ParticipantsCount() * 9 / 10)
}
func (v *uniformVoteWeight) UpdateVotingPower(shard.SlotList) {
func (v *uniformVoteWeight) SetVoters(
shard.SlotList,
) (*TallyResult, error) {
// NO-OP do not add anything here
return nil, nil
}
// ToggleActive for uniform vote is a no-op, always says that voter is active

@ -10,17 +10,32 @@ import (
"github.com/harmony-one/harmony/numeric"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/staking/slash"
"github.com/pkg/errors"
)
var (
twoThirds = numeric.NewDec(2).QuoInt64(3)
hSentinel = numeric.ZeroDec()
twoThird = numeric.NewDec(2).Quo(numeric.NewDec(3))
ninetyPercent = numeric.MustNewDecFromStr("0.90")
harmonysShare = numeric.MustNewDecFromStr("0.68")
stakersShare = numeric.MustNewDecFromStr("0.32")
totalShare = numeric.MustNewDecFromStr("1.00")
)
// TODO Test the case where we have 33 nodes, 68/33 will give precision hell and it should trigger
// the 100% mismatch err.
// TallyResult is the result of when we calculate voting power,
// recall that it happens to us at epoch change
type TallyResult struct {
ourPercent numeric.Dec
theirPercent numeric.Dec
}
type stakedVoter struct {
isActive, isHarmonyNode bool
earningAccount common.Address
effective numeric.Dec
effectivePercent numeric.Dec
rawStake numeric.Dec
}
type stakedVoteWeight struct {
@ -28,8 +43,11 @@ type stakedVoteWeight struct {
DependencyInjectionWriter
DependencyInjectionReader
slash.ThresholdDecider
validatorStakes map[shard.BlsPublicKey]stakedVoter
total numeric.Dec
voters map[shard.BlsPublicKey]stakedVoter
ourVotingPowerTotal numeric.Dec
theirVotingPowerTotal numeric.Dec
stakedTotal numeric.Dec
hmySlotCount int64
}
// Policy ..
@ -39,31 +57,43 @@ func (v *stakedVoteWeight) Policy() Policy {
// IsQuorumAchieved ..
func (v *stakedVoteWeight) IsQuorumAchieved(p Phase) bool {
// TODO Implement this logic w/Chao
// soFar := numeric.ZeroDec()
t := v.QuorumThreshold()
currentTotalPower := v.computeCurrentTotalPower(p)
utils.Logger().Info().
Str("policy", v.Policy().String()).
Str("phase", p.String()).
Str("threshold", t.String()).
Str("total-power-of-signers", currentTotalPower.String()).
Msg("Attempt to reach quorum")
return currentTotalPower.GT(t)
}
func (v *stakedVoteWeight) computeCurrentTotalPower(p Phase) numeric.Dec {
w := shard.BlsPublicKey{}
members := v.Participants()
currentTotalPower := numeric.ZeroDec()
for i := range members {
w.FromLibBLSPublicKey(members[i])
// isHMY := v.validatorStakes[w].isHarmonyNode
if v.ReadSignature(p, members[i]) == nil {
// TODO TODO finish this logic
if v.ReadSignature(p, members[i]) != nil {
w.FromLibBLSPublicKey(members[i])
currentTotalPower = currentTotalPower.Add(
v.voters[w].effectivePercent,
)
}
}
return true
return currentTotalPower
}
// QuorumThreshold ..
func (v *stakedVoteWeight) QuorumThreshold() *big.Int {
return v.total.Mul(twoThirds).Ceil().RoundInt()
func (v *stakedVoteWeight) QuorumThreshold() numeric.Dec {
return twoThird
}
// RewardThreshold ..
func (v *stakedVoteWeight) IsRewardThresholdAchieved() bool {
// TODO Implement
return true
return v.computeCurrentTotalPower(Commit).GTE(ninetyPercent)
}
// Award ..
@ -73,13 +103,13 @@ func (v *stakedVoteWeight) Award(
payout := big.NewInt(0)
last := big.NewInt(0)
count := big.NewInt(int64(len(earners)))
proportional := map[common.Address]numeric.Dec{}
// proportional := map[common.Address]numeric.Dec{}
for _, details := range v.validatorStakes {
if details.isHarmonyNode == false {
proportional[details.earningAccount] = details.effective.QuoTruncate(
v.total,
)
for _, voter := range v.voters {
if voter.isHarmonyNode == false {
// proportional[details.earningAccount] = details.effective.QuoTruncate(
// v.stakedTotal,
// )
}
}
// TODO Finish implementing this logic w/Chao
@ -101,38 +131,87 @@ func (v *stakedVoteWeight) Award(
return payout
}
func (v *stakedVoteWeight) UpdateVotingPower(staked shard.SlotList) {
s, _ := v.ShardIDProvider()()
var (
errSumOfVotingPowerNotOne = errors.New("sum of total votes do not sum to 100%")
errSumOfOursAndTheirsNotOne = errors.New(
"sum of hmy nodes and stakers do not sum to 100%",
)
)
v.validatorStakes = map[shard.BlsPublicKey]stakedVoter{}
func (v *stakedVoteWeight) SetVoters(
staked shard.SlotList,
) (*TallyResult, error) {
s, _ := v.ShardIDProvider()()
v.voters = map[shard.BlsPublicKey]stakedVoter{}
v.Reset([]Phase{Prepare, Commit, ViewChange})
v.hmySlotCount = 0
v.stakedTotal = numeric.ZeroDec()
for i := range staked {
if staked[i].StakeWithDelegationApplied != nil {
v.validatorStakes[staked[i].BlsPublicKey] = stakedVoter{
true, false, staked[i].EcdsaAddress, *staked[i].StakeWithDelegationApplied,
}
v.total = v.total.Add(*staked[i].StakeWithDelegationApplied)
if staked[i].StakeWithDelegationApplied == nil {
v.hmySlotCount++
} else {
v.validatorStakes[staked[i].BlsPublicKey] = stakedVoter{
true, true, staked[i].EcdsaAddress, hSentinel,
}
v.stakedTotal = v.stakedTotal.Add(*staked[i].StakeWithDelegationApplied)
}
}
ourCount := numeric.NewDec(v.hmySlotCount)
ourPercentage := numeric.ZeroDec()
theirPercentage := numeric.ZeroDec()
totalStakedPercent := numeric.ZeroDec()
for i := range staked {
member := stakedVoter{
isActive: true,
isHarmonyNode: true,
earningAccount: staked[i].EcdsaAddress,
effectivePercent: numeric.ZeroDec(),
}
// Real Staker
if staked[i].StakeWithDelegationApplied != nil {
member.isHarmonyNode = false
member.effectivePercent = staked[i].StakeWithDelegationApplied.
Quo(v.stakedTotal).
Mul(stakersShare)
theirPercentage = theirPercentage.Add(member.effectivePercent)
} else { // Our node
member.effectivePercent = harmonysShare.Quo(ourCount)
ourPercentage = ourPercentage.Add(member.effectivePercent)
}
totalStakedPercent = totalStakedPercent.Add(member.effectivePercent)
v.voters[staked[i].BlsPublicKey] = member
}
utils.Logger().Info().
Str("our-percentage", ourPercentage.String()).
Str("their-percentage", theirPercentage.String()).
Uint32("on-shard", s).
Str("Staked", v.total.String()).
Str("Raw-Staked", v.stakedTotal.String()).
Msg("Total staked")
switch {
case totalStakedPercent.Equal(totalShare) == false:
return nil, errSumOfVotingPowerNotOne
case ourPercentage.Add(theirPercentage).Equal(totalShare) == false:
return nil, errSumOfOursAndTheirsNotOne
}
// Hold onto this calculation
v.ourVotingPowerTotal = ourPercentage
v.theirVotingPowerTotal = theirPercentage
return &TallyResult{ourPercentage, theirPercentage}, nil
}
func (v *stakedVoteWeight) ToggleActive(k *bls.PublicKey) bool {
w := shard.BlsPublicKey{}
w.FromLibBLSPublicKey(k)
g := v.validatorStakes[w]
g := v.voters[w]
g.isActive = !g.isActive
v.validatorStakes[w] = g
return v.validatorStakes[w].isActive
v.voters[w] = g
return v.voters[w].isActive
}
func (v *stakedVoteWeight) ShouldSlash(key shard.BlsPublicKey) bool {
@ -148,30 +227,48 @@ func (v *stakedVoteWeight) ShouldSlash(key shard.BlsPublicKey) bool {
func (v *stakedVoteWeight) JSON() string {
s, _ := v.ShardIDProvider()()
type u struct {
IsHarmony bool `json:"is-harmony-slot"`
Identity string `json:"bls-public-key"`
VotingPower string `json:"voting-power-%"`
RawStake string `json:"raw-stake,omitempty"`
}
type t struct {
Policy string `json"policy"`
ShardID uint32 `json:"shard-id"`
Count int `json:"count"`
Participants []string `json:"committee-members"`
TotalStaked string `json:"total-staked"`
Policy string `json"policy"`
ShardID uint32 `json:"shard-id"`
Count int `json:"count"`
Participants []u `json:"committee-members"`
HmyVotingPower string `json:"hmy-voting-power"`
StakedVotingPower string `json:"staked-voting-power"`
TotalStaked string `json:"total-raw-staked"`
}
members := v.DumpParticipants()
parts := []string{}
for i := range members {
k := bls.PublicKey{}
k.DeserializeHexStr(members[i])
w := shard.BlsPublicKey{}
w.FromLibBLSPublicKey(&k)
staker := v.validatorStakes[w]
if staker.isHarmonyNode {
parts = append(parts, members[i])
} else {
parts = append(parts, members[i]+"-"+staker.effective.String())
parts := make([]u, len(v.voters))
i := 0
for identity, voter := range v.voters {
member := u{
voter.isHarmonyNode,
identity.Hex(),
voter.effectivePercent.String(),
"",
}
if !voter.isHarmonyNode {
member.RawStake = voter.rawStake.String()
}
parts[i] = member
i++
}
b1, _ := json.Marshal(t{
v.Policy().String(), s, len(members), parts, v.total.String(),
v.Policy().String(),
s,
len(v.voters),
parts,
v.ourVotingPowerTotal.String(),
v.theirVotingPowerTotal.String(),
v.stakedTotal.String(),
})
return string(b1)
}

@ -2,7 +2,6 @@ package quorum
import (
"fmt"
"math/big"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/numeric"
@ -83,6 +82,7 @@ type SignatureReader interface {
SignatoryTracker
ReadAllSignatures(Phase) []*bls.Sign
ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign
TwoThirdsSignersCount() int64
}
// DependencyInjectionWriter ..
@ -107,10 +107,10 @@ type Decider interface {
slash.Slasher
WithJSONDump
ToggleActive(*bls.PublicKey) bool
UpdateVotingPower(shard.SlotList)
SetVoters(shard.SlotList) (*TallyResult, error)
Policy() Policy
IsQuorumAchieved(Phase) bool
QuorumThreshold() *big.Int
QuorumThreshold() numeric.Dec
IsRewardThresholdAchieved() bool
}
@ -222,6 +222,10 @@ func (s *cIdentities) Reset(ps []Phase) {
}
}
func (s *cIdentities) TwoThirdsSignersCount() int64 {
return s.ParticipantsCount()*2/3 + 1
}
func (s *cIdentities) ReadSignature(p Phase, PubKey *bls.PublicKey) *bls.Sign {
m := map[string]*bls.Sign{}
hex := PubKey.SerializeToHexStr()
@ -299,6 +303,9 @@ func NewDecider(p Policy) Decider {
c.SignatureReader.(slash.ThresholdDecider),
map[shard.BlsPublicKey]stakedVoter{},
numeric.ZeroDec(),
numeric.ZeroDec(),
numeric.ZeroDec(),
0,
}
default:
// Should not be possible

@ -158,7 +158,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
if consensus.Decider.IsQuorumAchieved(quorum.ViewChange) {
utils.Logger().Debug().
Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("need", consensus.Decider.QuorumThreshold().Int64()).
Int64("need", consensus.Decider.TwoThirdsSignersCount()).
Str("validatorPubKey", recvMsg.SenderPubkey.SerializeToHexStr()).
Msg("[onViewChange] Received Enough View Change Messages")
return
@ -282,7 +282,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
return
}
// check has 2f+1 signature in m1 type message
need := consensus.Decider.QuorumThreshold().Int64()
need := consensus.Decider.TwoThirdsSignersCount()
if count := utils.CountOneBits(mask.Bitmap); count < need {
utils.Logger().Debug().Int64("need", need).Int64("have", count).
Msg("[onViewChange] M1 Payload Not Have Enough Signature")
@ -346,7 +346,7 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) {
consensus.viewIDBitmap.SetKey(recvMsg.SenderPubkey, true)
utils.Logger().Debug().
Int64("numSigs", consensus.Decider.SignersCount(quorum.ViewChange)).
Int64("needed", consensus.Decider.QuorumThreshold().Int64()).
Int64("needed", consensus.Decider.TwoThirdsSignersCount()).
Msg("[onViewChange]")
// received enough view change messages, change state to normal consensus
@ -446,7 +446,7 @@ func (consensus *Consensus) onNewView(msg *msg_pb.Message) {
viewIDBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(viewIDBytes, recvMsg.ViewID)
// check total number of sigs >= 2f+1
need := consensus.Decider.QuorumThreshold().Int64()
need := consensus.Decider.TwoThirdsSignersCount()
if count := utils.CountOneBits(m3Mask.Bitmap); count < need {
utils.Logger().Debug().Int64("need", need).Int64("have", count).
Msg("[onNewView] Not Have Enough M3 (ViewID) Signature")

@ -32,17 +32,19 @@ require (
github.com/karalabe/hid v1.0.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libp2p/go-libp2p v0.3.1
github.com/libp2p/go-libp2p-core v0.2.2
github.com/libp2p/go-libp2p-core v0.2.4
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-discovery v0.1.0
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-host v0.1.0
github.com/libp2p/go-libp2p-kad-dht v0.2.0
github.com/libp2p/go-libp2p-net v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.3
github.com/libp2p/go-libp2p-pubsub v0.1.1
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-net v0.0.1
github.com/libp2p/go-libp2p-pubsub v0.2.3
github.com/libp2p/go-ws-transport v0.1.1 // indirect
github.com/multiformats/go-multiaddr v0.1.1
github.com/multiformats/go-multiaddr-dns v0.1.1 // indirect
github.com/multiformats/go-multiaddr-net v0.1.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.8.1
@ -57,11 +59,14 @@ require (
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/uber/jaeger-client-go v2.20.1+incompatible // indirect
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443
golang.org/x/lint v0.0.0-20190409202823-959b441ac422
go.uber.org/atomic v1.5.1 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/lint v0.0.0-20190930215403-16217165b5de
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 // indirect
golang.org/x/tools v0.0.0-20190924052046-3ac2a5bbd98a
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c
google.golang.org/appengine v1.4.0 // indirect
google.golang.org/grpc v1.22.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127

@ -1,64 +0,0 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/ethereum/go-ethereum/log"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"Logger", reflect.TypeOf((*pkg_.Logger)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/ethereum/go-ethereum/log"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

@ -1,64 +0,0 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/ethereum/go-ethereum/log"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"Handler", reflect.TypeOf((*pkg_.Handler)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/ethereum/go-ethereum/log"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

@ -50,7 +50,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) {
}
// check has 2f+1 signatures
need := node.Consensus.Decider.QuorumThreshold().Int64()
need := node.Consensus.Decider.TwoThirdsSignersCount()
if count := utils.CountOneBits(mask.Bitmap); count < need {
utils.Logger().Error().Int64("need", need).Int64("have", count).
Msg("[Explorer] not have enough signature")

@ -390,9 +390,26 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBit
s, _ := committee.WithStakingEnabled.Compute(
next, node.chainConfig, node.Consensus.ChainReader,
)
node.Consensus.Decider.UpdateVotingPower(
prevSubCommitteeDump := node.Consensus.Decider.JSON()
if _, err := node.Consensus.Decider.SetVoters(
s.FindCommitteeByID(node.Consensus.ShardID).Slots,
)
); err != nil {
utils.Logger().Error().
Err(err).
Uint32("shard", node.Consensus.ShardID).
Msg("Error when updating voting power")
return
}
utils.Logger().Info().
Uint64("block-number", newBlock.Number().Uint64()).
Uint64("epoch", newBlock.Epoch().Uint64()).
Uint32("shard-id", node.Consensus.ShardID).
RawJSON("prev-subcommittee", []byte(prevSubCommitteeDump)).
RawJSON("current-subcommittee", []byte(node.Consensus.Decider.JSON())).
Msg("changing committee")
}
// TODO Need to refactor UpdateConsensusInformation so can fold the following logic
// into UCI - todo because UCI mutates state & called in overloaded contexts

@ -1,12 +1,13 @@
package hostv2
//go:generate mockgen -source hostv2.go -destination=mock/hostv2_mock.go
//go:generate mockgen -source=hostv2.go -package=hostv2 -destination=hostv2_mock_for_test.go
import (
"context"
"fmt"
"sync"
"github.com/pkg/errors"
"github.com/rs/zerolog"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -33,16 +34,45 @@ const (
//numOutgoing = 16
)
// pubsub captures the pubsub interface we expect from libp2p.
type pubsub interface {
Publish(topic string, data []byte) error
Subscribe(topic string, opts ...libp2p_pubsub.SubOpt) (*libp2p_pubsub.Subscription, error)
// topicHandle is a pubsub topic handle.
type topicHandle interface {
Publish(ctx context.Context, data []byte) error
Subscribe() (subscription, error)
}
type topicHandleImpl struct {
t *libp2p_pubsub.Topic
}
func (th topicHandleImpl) Publish(ctx context.Context, data []byte) error {
return th.t.Publish(ctx, data)
}
func (th topicHandleImpl) Subscribe() (subscription, error) {
return th.t.Subscribe()
}
type topicJoiner interface {
JoinTopic(topic string) (topicHandle, error)
}
type topicJoinerImpl struct {
pubsub *libp2p_pubsub.PubSub
}
func (tj topicJoinerImpl) JoinTopic(topic string) (topicHandle, error) {
th, err := tj.pubsub.Join(topic)
if err != nil {
return nil, err
}
return topicHandleImpl{th}, nil
}
// HostV2 is the version 2 p2p host
type HostV2 struct {
h libp2p_host.Host
pubsub pubsub
joiner topicJoiner
joined map[string]topicHandle
self p2p.Peer
priKey libp2p_crypto.PrivKey
lock sync.Mutex
@ -54,16 +84,36 @@ type HostV2 struct {
logger *zerolog.Logger
}
func (host *HostV2) getTopic(topic string) (topicHandle, error) {
host.lock.Lock()
defer host.lock.Unlock()
if t, ok := host.joined[topic]; ok {
return t, nil
} else if t, err := host.joiner.JoinTopic(topic); err != nil {
return nil, errors.Wrapf(err, "cannot join pubsub topic %x", topic)
} else {
host.joined[topic] = t
return t, nil
}
}
// SendMessageToGroups sends a message to one or more multicast groups.
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error {
var error error
// It returns a nil error if and only if it has succeeded to schedule the given
// message for sending.
func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) (err error) {
for _, group := range groups {
err := host.pubsub.Publish(string(group), msg)
if err != nil {
error = err
t, e := host.getTopic(string(group))
if e != nil {
err = e
continue
}
e = t.Publish(context.Background(), msg)
if e != nil {
err = e
continue
}
}
return error
return err
}
// subscription captures the subscription interface we expect from libp2p.
@ -104,10 +154,14 @@ func (r *GroupReceiverImpl) Receive(ctx context.Context) (
func (host *HostV2) GroupReceiver(group nodeconfig.GroupID) (
receiver p2p.GroupReceiver, err error,
) {
sub, err := host.pubsub.Subscribe(string(group))
t, err := host.getTopic(string(group))
if err != nil {
return nil, err
}
sub, err := t.Subscribe()
if err != nil {
return nil, errors.Wrapf(err, "cannot subscribe to topic %x", group)
}
return &GroupReceiverImpl{sub: sub}, nil
}
@ -171,7 +225,6 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) *HostV2 {
)
catchError(err)
pubsub, err := libp2p_pubsub.NewGossipSub(ctx, p2pHost)
// pubsub, err := libp2p_pubsub.NewFloodSub(ctx, p2pHost)
catchError(err)
self.PeerID = p2pHost.ID()
@ -180,7 +233,8 @@ func New(self *p2p.Peer, priKey libp2p_crypto.PrivKey) *HostV2 {
// has to save the private key for host
h := &HostV2{
h: p2pHost,
pubsub: pubsub,
joiner: topicJoinerImpl{pubsub},
joined: map[string]topicHandle{},
self: *self,
priKey: priKey,
logger: &subLogger,

@ -1,8 +1,8 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: hostv2.go
// Package mock_hostv2 is a generated GoMock package.
package mock_hostv2
// Package hostv2 is a generated GoMock package.
package hostv2
import (
context "context"
@ -11,61 +11,94 @@ import (
reflect "reflect"
)
// Mockpubsub is a mock of pubsub interface
type Mockpubsub struct {
// MocktopicHandle is a mock of topicHandle interface
type MocktopicHandle struct {
ctrl *gomock.Controller
recorder *MockpubsubMockRecorder
recorder *MocktopicHandleMockRecorder
}
// MockpubsubMockRecorder is the mock recorder for Mockpubsub
type MockpubsubMockRecorder struct {
mock *Mockpubsub
// MocktopicHandleMockRecorder is the mock recorder for MocktopicHandle
type MocktopicHandleMockRecorder struct {
mock *MocktopicHandle
}
// NewMockpubsub creates a new mock instance
func NewMockpubsub(ctrl *gomock.Controller) *Mockpubsub {
mock := &Mockpubsub{ctrl: ctrl}
mock.recorder = &MockpubsubMockRecorder{mock}
// NewMocktopicHandle creates a new mock instance
func NewMocktopicHandle(ctrl *gomock.Controller) *MocktopicHandle {
mock := &MocktopicHandle{ctrl: ctrl}
mock.recorder = &MocktopicHandleMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *Mockpubsub) EXPECT() *MockpubsubMockRecorder {
func (m *MocktopicHandle) EXPECT() *MocktopicHandleMockRecorder {
return m.recorder
}
// Publish mocks base method
func (m *Mockpubsub) Publish(topic string, data []byte) error {
func (m *MocktopicHandle) Publish(ctx context.Context, data []byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Publish", topic, data)
ret := m.ctrl.Call(m, "Publish", ctx, data)
ret0, _ := ret[0].(error)
return ret0
}
// Publish indicates an expected call of Publish
func (mr *MockpubsubMockRecorder) Publish(topic, data interface{}) *gomock.Call {
func (mr *MocktopicHandleMockRecorder) Publish(ctx, data interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*Mockpubsub)(nil).Publish), topic, data)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MocktopicHandle)(nil).Publish), ctx, data)
}
// Subscribe mocks base method
func (m *Mockpubsub) Subscribe(topic string, opts ...go_libp2p_pubsub.SubOpt) (*go_libp2p_pubsub.Subscription, error) {
func (m *MocktopicHandle) Subscribe() (subscription, error) {
m.ctrl.T.Helper()
varargs := []interface{}{topic}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Subscribe", varargs...)
ret0, _ := ret[0].(*go_libp2p_pubsub.Subscription)
ret := m.ctrl.Call(m, "Subscribe")
ret0, _ := ret[0].(subscription)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Subscribe indicates an expected call of Subscribe
func (mr *MockpubsubMockRecorder) Subscribe(topic interface{}, opts ...interface{}) *gomock.Call {
func (mr *MocktopicHandleMockRecorder) Subscribe() *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{topic}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*Mockpubsub)(nil).Subscribe), varargs...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MocktopicHandle)(nil).Subscribe))
}
// MocktopicJoiner is a mock of topicJoiner interface
type MocktopicJoiner struct {
ctrl *gomock.Controller
recorder *MocktopicJoinerMockRecorder
}
// MocktopicJoinerMockRecorder is the mock recorder for MocktopicJoiner
type MocktopicJoinerMockRecorder struct {
mock *MocktopicJoiner
}
// NewMocktopicJoiner creates a new mock instance
func NewMocktopicJoiner(ctrl *gomock.Controller) *MocktopicJoiner {
mock := &MocktopicJoiner{ctrl: ctrl}
mock.recorder = &MocktopicJoinerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MocktopicJoiner) EXPECT() *MocktopicJoinerMockRecorder {
return m.recorder
}
// JoinTopic mocks base method
func (m *MocktopicJoiner) JoinTopic(topic string) (topicHandle, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "JoinTopic", topic)
ret0, _ := ret[0].(topicHandle)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// JoinTopic indicates an expected call of JoinTopic
func (mr *MocktopicJoinerMockRecorder) JoinTopic(topic interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JoinTopic", reflect.TypeOf((*MocktopicJoiner)(nil).JoinTopic), topic)
}
// Mocksubscription is a mock of subscription interface

@ -7,43 +7,87 @@ import (
"testing"
"github.com/golang/mock/gomock"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2p_pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
mock "github.com/harmony-one/harmony/p2p/host/hostv2/mock"
)
func TestHostV2_SendMessageToGroups(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
groups := []nodeconfig.GroupID{"ABC", "DEF"}
okTopic := NewMocktopicHandle(mc)
newTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"OK", "New"}
data := []byte{1, 2, 3}
pubsub := mock.NewMockpubsub(mc)
joined := map[string]topicHandle{"OK": okTopic}
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: joined}
gomock.InOrder(
pubsub.EXPECT().Publish("ABC", data),
pubsub.EXPECT().Publish("DEF", data),
// okTopic is already in joined map, JoinTopic shouldn't be called
joiner.EXPECT().JoinTopic("OK").Times(0),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
// newTopic is not in joined map, JoinTopic should be called
joiner.EXPECT().JoinTopic("New").Return(newTopic, nil),
newTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
host := &HostV2{pubsub: pubsub}
if err := host.SendMessageToGroups(groups, data); err != nil {
err := host.SendMessageToGroups(groups, data)
if err != nil {
t.Errorf("expected no error; got %v", err)
}
})
t.Run("Error", func(t *testing.T) {
t.Run("JoinError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
okTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"Error", "OK"}
data := []byte{1, 2, 3}
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
// Make first join return an error
joiner.EXPECT().JoinTopic("Error").Return(nil, errors.New("join error")),
// Subsequent topics should still be processed after an error
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
err := host.SendMessageToGroups(groups, data)
if err == nil {
t.Error("expected an error; got nil")
}
})
t.Run("PublishError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
groups := []nodeconfig.GroupID{"ABC", "DEF"}
okTopic := NewMocktopicHandle(mc)
erringTopic := NewMocktopicHandle(mc)
groups := []nodeconfig.GroupID{"Error", "OK"}
data := []byte{1, 2, 3}
pubsub := mock.NewMockpubsub(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
pubsub.EXPECT().Publish("ABC", data).Return(errors.New("FIAL")),
pubsub.EXPECT().Publish("DEF", data), // Should not early-return
// Make first publish return an error
joiner.EXPECT().JoinTopic("Error").Return(erringTopic, nil),
erringTopic.EXPECT().Publish(context.TODO(), data).Return(errors.New("publish error")),
// Subsequent topics should still be processed after an error
joiner.EXPECT().JoinTopic("OK").Return(okTopic, nil),
okTopic.EXPECT().Publish(context.TODO(), data).Return(nil),
)
host := &HostV2{pubsub: pubsub}
if err := host.SendMessageToGroups(groups, data); err == nil {
t.Error("expected an error but got none")
t.Error("expected an error; got nil")
}
})
}
@ -51,10 +95,14 @@ func TestHostV2_SendMessageToGroups(t *testing.T) {
func TestGroupReceiver_Close(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
sub := mock.NewMocksubscription(mc)
sub := NewMocksubscription(mc)
sub.EXPECT().Cancel()
receiver := GroupReceiverImpl{sub: sub}
if err := receiver.Close(); err != nil {
err := receiver.Close()
if err != nil {
t.Errorf("expected no error but got %v", err)
}
}
@ -65,46 +113,71 @@ func pubsubMessage(from libp2p_peer.ID, data []byte) *libp2p_pubsub.Message {
}
func TestGroupReceiver_Receive(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
sub := mock.NewMocksubscription(mc)
ctx := context.Background()
gomock.InOrder(
sub.EXPECT().Next(ctx).Return(pubsubMessage("ABC", []byte{1, 2, 3}), nil),
sub.EXPECT().Next(ctx).Return(pubsubMessage("DEF", []byte{4, 5, 6}), nil),
sub.EXPECT().Next(ctx).Return(nil, errors.New("FIAL")),
)
receiver := GroupReceiverImpl{sub: sub}
verify := func(sender libp2p_peer.ID, msg []byte, shouldError bool) {
t.Run("OK", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
ctx := context.Background()
sub := NewMocksubscription(mc)
receiver := GroupReceiverImpl{sub: sub}
wantSender := libp2p_peer.ID("OK")
wantMsg := []byte{1, 2, 3}
sub.EXPECT().Next(ctx).Return(pubsubMessage(wantSender, wantMsg), nil)
gotMsg, gotSender, err := receiver.Receive(ctx)
if (err != nil) != shouldError {
if shouldError {
t.Error("expected an error but got none")
} else {
t.Errorf("expected no error but got %v", err)
}
if err != nil {
t.Errorf("expected no error; got %v", err)
}
if gotSender != sender {
t.Errorf("expected sender %v but got %v", sender, gotSender)
if gotSender != wantSender {
t.Errorf("expected sender %v; got %v", wantSender, gotSender)
}
if !reflect.DeepEqual(gotMsg, msg) {
t.Errorf("expected message %v but got %v", msg, gotMsg)
if !reflect.DeepEqual(gotMsg, wantMsg) {
t.Errorf("expected message %v; got %v", wantMsg, gotMsg)
}
}
verify("ABC", []byte{1, 2, 3}, false)
verify("DEF", []byte{4, 5, 6}, false)
verify("", nil, true)
})
t.Run("Error", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
ctx := context.Background()
sub := NewMocksubscription(mc)
receiver := GroupReceiverImpl{sub: sub}
sub.EXPECT().Next(ctx).Return(nil, errors.New("receive error"))
msg, sender, err := receiver.Receive(ctx)
if err == nil {
t.Error("expected an error; got nil")
}
if sender != "" {
t.Errorf("expected empty sender; got %v", sender)
}
if len(msg) > 0 {
t.Errorf("expected empty message; got %v", msg)
}
})
}
func TestHostV2_GroupReceiver(t *testing.T) {
t.Run("Basic", func(t *testing.T) {
t.Run("New", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
sub := &libp2p_pubsub.Subscription{}
pubsub := mock.NewMockpubsub(mc)
pubsub.EXPECT().Subscribe("ABC").Return(sub, nil)
host := &HostV2{pubsub: pubsub}
topic := NewMocktopicHandle(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
topic.EXPECT().Subscribe().Return(sub, nil),
)
gotReceiver, err := host.GroupReceiver("ABC")
if r, ok := gotReceiver.(*GroupReceiverImpl); !ok {
t.Errorf("expected a hostv2 GroupReceiverImpl; got %v", gotReceiver)
} else if r.sub != sub {
@ -114,13 +187,39 @@ func TestHostV2_GroupReceiver(t *testing.T) {
t.Errorf("expected no error; got %v", err)
}
})
t.Run("Error", func(t *testing.T) {
t.Run("JoinError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("join error"))
gotReceiver, err := host.GroupReceiver("ABC")
if gotReceiver != nil {
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
}
if err == nil {
t.Error("expected an error; got none")
}
})
t.Run("SubscribeError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
pubsub := mock.NewMockpubsub(mc)
pubsub.EXPECT().Subscribe("ABC").Return(nil, errors.New("FIAL"))
host := &HostV2{pubsub: pubsub}
topic := NewMocktopicHandle(mc)
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
gomock.InOrder(
joiner.EXPECT().JoinTopic("ABC").Return(topic, nil),
topic.EXPECT().Subscribe().Return(nil, errors.New("subscription error")),
)
gotReceiver, err := host.GroupReceiver("ABC")
if gotReceiver != nil {
t.Errorf("expected a nil hostv2 GroupReceiverImpl; got %v", gotReceiver)
}
@ -136,3 +235,65 @@ func TestHostV2_GroupReceiver(t *testing.T) {
}
})
}
func TestHostV2_getTopic(t *testing.T) {
t.Run("NewOK", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
want := NewMocktopicHandle(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(want, nil)
got, err := host.getTopic("ABC")
if err != nil {
t.Errorf("want nil error; got %v", err)
}
if got != want {
t.Errorf("want topic handle %v; got %v", want, got)
}
if _, ok := host.joined["ABC"]; !ok {
t.Error("topic not found in joined map")
}
})
t.Run("NewError", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{}}
joiner.EXPECT().JoinTopic("ABC").Return(nil, errors.New("OMG"))
got, err := host.getTopic("ABC")
if err == nil {
t.Error("want non-nil error; got nil")
}
if got != nil {
t.Errorf("want nil handle; got %v", got)
}
})
t.Run("Existing", func(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()
joiner := NewMocktopicJoiner(mc)
want := NewMocktopicHandle(mc)
host := &HostV2{joiner: joiner, joined: map[string]topicHandle{"ABC": want}}
joiner.EXPECT().JoinTopic("ABC").Times(0)
got, err := host.getTopic("ABC")
if err != nil {
t.Errorf("want nil error; got %v", err)
}
if got != want {
t.Errorf("want topic handle %v; got %v", want, got)
}
})
}

@ -6,4 +6,5 @@ exec git ls-files '*.go' | grep -v \
-e '/host_mock\.go' \
-e '/mock/[^/]*\.go' \
-e '/mock_[^/]*/[^/]*\.go' \
-e '_mock_for_test\.go' \
-e '/gen_[^/]*\.go'

@ -38,7 +38,7 @@ type BlsPublicKey [PublicKeySizeInBytes]byte
type Slot struct {
EcdsaAddress common.Address `json:"ecdsa-address"`
BlsPublicKey BlsPublicKey `json:"bls-pubkey"`
// nil means our node, 0 means not active, >= 0 means staked node
// nil means our node, 0 means not active, > 0 means staked node
StakeWithDelegationApplied *numeric.Dec `json:"staked-validator" rlp:"nil"`
}

Loading…
Cancel
Save