Merge remote-tracking branch 'origin/t3'

pull/3084/head
Leo Chen 5 years ago
commit 1a6c09734a
  1. 10
      consensus/consensus_v2.go
  2. 4
      consensus/leader.go
  3. 6
      consensus/reward/schedule.go
  4. 7
      core/block_validator.go
  5. 39
      core/offchain.go
  6. 6
      core/tx_pool.go
  7. 20
      go.mod
  8. 89
      hmy/api_backend.go
  9. 4
      internal/chain/engine.go
  10. 7
      internal/configs/sharding/mainnet.go
  11. 6
      internal/params/config.go
  12. 147
      node/node.go
  13. 14
      node/node_handler.go
  14. 2
      node/node_syncing.go
  15. 63
      p2p/host.go
  16. 132
      scripts/node.sh
  17. 1
      staking/types/validator.go

@ -334,7 +334,7 @@ func (consensus *Consensus) Start(
blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{},
) {
go func() {
toStart := make(chan struct{}, 1)
toStart := false
isInitialLeader := consensus.IsLeader()
if isInitialLeader {
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Waiting for consensus start")
@ -342,7 +342,7 @@ func (consensus *Consensus) Start(
// this signal is consumed by node object to create a new block and in turn trigger a new consensus on it
go func() {
<-startChannel
toStart <- struct{}{}
toStart = true
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.ReadySignal <- struct{}{}
}()
@ -360,15 +360,11 @@ func (consensus *Consensus) Start(
vdfInProgress := false
// Set up next block due time.
consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod)
start := false
for {
select {
case <-toStart:
start = true
case <-ticker.C:
consensus.getLogger().Debug().Msg("[ConsensusMainLoop] Ticker")
if !start && isInitialLeader {
if !toStart && isInitialLeader {
continue
}
for k, v := range consensus.consensusTimeout {

@ -280,9 +280,9 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
}
if consensus.Decider.IsAllSigsCollected() {
go func() {
go func(viewID uint64) {
consensus.commitFinishChan <- viewID
logger.Info().Msg("[OnCommit] 100% Enough commits received")
}()
}(consensus.viewID)
}
}

@ -4,6 +4,7 @@ import (
"sort"
"time"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/numeric"
)
@ -142,5 +143,10 @@ func PercentageForTimeStamp(ts int64) numeric.Dec {
j++
}
utils.Logger().Info().
Str("percent of total-supply used", bucket.share.Mul(numeric.NewDec(100)).String()).
Str("for-time", time.Unix(ts, 0).String()).
Msg("Picked Percentage for timestamp")
return bucket.share
}

@ -116,7 +116,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.DB, re
if root := statedb.IntermediateRoot(v.config.IsS3(header.Epoch())); header.Root() != root {
dump, _ := rlp.EncodeToBytes(header)
const msg = "invalid merkle root (remote: %x local: %x, rlp dump %s)"
return errors.Errorf(msg, header.Root(), root, hex.EncodeToString(dump))
return fmt.Errorf(msg, header.Root(), root, hex.EncodeToString(dump))
}
return nil
}
@ -125,10 +125,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.DB, re
// given engine. Verifying the seal may be done optionally here, or explicitly
// via the VerifySeal method.
func (v *BlockValidator) ValidateHeader(block *types.Block, seal bool) error {
if h := block.Header(); h != nil {
return v.engine.VerifyHeader(v.bc, h, true)
}
return errors.New("header field was nil")
return v.engine.VerifyHeader(v.bc, block.Header(), true)
}
// ValidateHeaders verifies a batch of blocks' headers concurrently. The method returns a quit channel

@ -17,6 +17,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/staking/apr"
"github.com/harmony-one/harmony/staking/slash"
staking "github.com/harmony-one/harmony/staking/types"
"github.com/pkg/errors"
@ -213,6 +214,44 @@ func (bc *BlockChain) CommitOffChainData(
Err(err).
Msg("[UpdateValidatorVotingPower] Failed to decode shard state")
}
// fix the possible wrong apr in the staking epoch
stakingEpoch := bc.Config().StakingEpoch
secondStakingEpoch := big.NewInt(0).Add(stakingEpoch, common.Big1)
thirdStakingEpoch := big.NewInt(0).Add(secondStakingEpoch, common.Big1)
isThirdStakingEpoch := block.Epoch().Cmp(thirdStakingEpoch) == 0
if isThirdStakingEpoch {
// we have to do it for all validators, not only currently elected
if validators, err := bc.ReadValidatorList(); err == nil {
for _, addr := range validators {
// get wrapper from the second staking epoch
if snapshot, err := bc.ReadValidatorSnapshotAtEpoch(
secondStakingEpoch, addr,
); err == nil {
if block := bc.GetBlockByNumber(
shard.Schedule.EpochLastBlock(stakingEpoch.Uint64()),
); block != nil {
if aprComputed, err := apr.ComputeForValidator(
bc, block, snapshot.Validator,
); err == nil {
stats, ok := tempValidatorStats[addr]
if !ok {
stats, err = bc.ReadValidatorStats(addr)
if err != nil {
continue
}
}
for i := range stats.APRs {
if stats.APRs[i].Epoch.Cmp(stakingEpoch) == 0 {
stats.APRs[i] = staking.APREntry{stakingEpoch, *aprComputed}
}
}
tempValidatorStats[addr] = stats
}
}
}
}
}
}
}
// Update block reward accumulator and slashes

@ -938,7 +938,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
pool.priced.Put(tx)
pool.journalTx(from, tx)
logger.Warn().
logger.Info().
Str("hash", tx.Hash().Hex()).
Interface("from", from).
Interface("to", tx.To()).
@ -964,7 +964,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
}
pool.journalTx(from, tx)
logger.Warn().
logger.Info().
Str("hash", hash.Hex()).
Interface("from", from).
Interface("to", tx.To()).
@ -1275,7 +1275,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, tx) {
logger.Warn().Str("hash", hash.Hex()).Msg("Promoting queued transaction")
logger.Info().Str("hash", hash.Hex()).Msg("Promoting queued transaction")
promoted = append(promoted, tx)
}
}

@ -22,6 +22,7 @@ require (
github.com/golangci/golangci-lint v1.22.2
github.com/gorilla/handlers v1.4.0 // indirect
github.com/gorilla/mux v1.7.2
github.com/gorilla/websocket v1.4.2
github.com/harmony-ek/gencodec v0.0.0-20190215044613-e6740dbdd846
github.com/harmony-one/bls v0.0.6
github.com/harmony-one/taggedrlp v0.1.4
@ -34,18 +35,18 @@ require (
github.com/karalabe/hid v1.0.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/libp2p/go-addr-util v0.0.2 // indirect
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-core v0.5.3
github.com/libp2p/go-libp2p v0.9.2
github.com/libp2p/go-libp2p-core v0.5.6
github.com/libp2p/go-libp2p-crypto v0.1.0
github.com/libp2p/go-libp2p-discovery v0.3.0
github.com/libp2p/go-libp2p-discovery v0.4.0
github.com/libp2p/go-libp2p-host v0.1.0
github.com/libp2p/go-libp2p-kad-dht v0.5.0
github.com/libp2p/go-libp2p-net v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.3
github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200325112436-d3d43e32bef3
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-net v0.1.4
github.com/libp2p/go-libp2p-peerstore v0.2.4
github.com/libp2p/go-libp2p-pubsub v0.3.0
github.com/multiformats/go-multiaddr v0.2.2
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1
@ -60,12 +61,9 @@ require (
github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d
github.com/uber/jaeger-client-go v2.20.1+incompatible // indirect
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.uber.org/zap v1.14.1 // indirect
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
golang.org/x/lint v0.0.0-20200302205851-738671d3881b
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 // indirect
golang.org/x/tools v0.0.0-20200408032209-46bd65c8538f
google.golang.org/grpc v1.28.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15

@ -425,6 +425,7 @@ func (b *APIBackend) GetValidatorInformation(
wrapper.BlockReward,
wrapper.Counters,
zero,
nil,
},
}
@ -458,54 +459,62 @@ func (b *APIBackend) GetValidatorInformation(
return defaultReply, nil
}
latestAPR := numeric.ZeroDec()
l := len(stats.APRs)
if l > 0 {
latestAPR = stats.APRs[l-1].Value
}
defaultReply.Lifetime.APR = latestAPR
defaultReply.Lifetime.EpochAPRs = stats.APRs
// average apr cache keys
key := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64())
prevKey := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64()-1)
// key := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64())
// prevKey := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64()-1)
// delete entry for previous epoch
b.apiCache.Forget(prevKey)
// b.apiCache.Forget(prevKey)
// calculate last APRHistoryLength epochs for averaging APR
epochFrom := bc.Config().StakingEpoch
nowMinus := big.NewInt(0).Sub(now, big.NewInt(staking.APRHistoryLength))
if nowMinus.Cmp(epochFrom) > 0 {
epochFrom = nowMinus
}
if len(stats.APRs) > 0 && stats.APRs[0].Epoch.Cmp(epochFrom) > 0 {
epochFrom = stats.APRs[0].Epoch
}
epochToAPRs := map[int64]numeric.Dec{}
for i := 0; i < len(stats.APRs); i++ {
entry := stats.APRs[i]
epochToAPRs[entry.Epoch.Int64()] = entry.Value
}
// epochFrom := bc.Config().StakingEpoch
// nowMinus := big.NewInt(0).Sub(now, big.NewInt(staking.APRHistoryLength))
// if nowMinus.Cmp(epochFrom) > 0 {
// epochFrom = nowMinus
// }
// if len(stats.APRs) > 0 && stats.APRs[0].Epoch.Cmp(epochFrom) > 0 {
// epochFrom = stats.APRs[0].Epoch
// }
// epochToAPRs := map[int64]numeric.Dec{}
// for i := 0; i < len(stats.APRs); i++ {
// entry := stats.APRs[i]
// epochToAPRs[entry.Epoch.Int64()] = entry.Value
// }
// at this point, validator is active and has apr's for the recent 100 epochs
// compute average apr over history
if avgAPR, err := b.SingleFlightRequest(
key, func() (interface{}, error) {
total := numeric.ZeroDec()
count := 0
for i := epochFrom.Int64(); i < now.Int64(); i++ {
if apr, ok := epochToAPRs[i]; ok {
total = total.Add(apr)
}
count++
}
if count == 0 {
return nil, errors.New("no apr snapshots available")
}
return total.QuoInt64(int64(count)), nil
},
); err != nil {
// could not compute average apr from snapshot
// assign the latest apr available from stats
defaultReply.Lifetime.APR = numeric.ZeroDec()
} else {
defaultReply.Lifetime.APR = avgAPR.(numeric.Dec)
}
// if avgAPR, err := b.SingleFlightRequest(
// key, func() (interface{}, error) {
// total := numeric.ZeroDec()
// count := 0
// for i := epochFrom.Int64(); i < now.Int64(); i++ {
// if apr, ok := epochToAPRs[i]; ok {
// total = total.Add(apr)
// }
// count++
// }
// if count == 0 {
// return nil, errors.New("no apr snapshots available")
// }
// return total.QuoInt64(int64(count)), nil
// },
// ); err != nil {
// // could not compute average apr from snapshot
// // assign the latest apr available from stats
// defaultReply.Lifetime.APR = numeric.ZeroDec()
// } else {
// defaultReply.Lifetime.APR = avgAPR.(numeric.Dec)
// }
if defaultReply.CurrentlyInCommittee {
defaultReply.ComputedMetrics = stats

@ -189,8 +189,8 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header)
parentHash := header.ParentHash()
parentHeader := chain.GetHeader(parentHash, header.Number().Uint64()-1)
if parentHeader == nil {
return errors.Errorf(
"[VerifySeal] no parent header found for block %x at %d", header.Hash(), header.Number().Uint64(),
return errors.New(
"[VerifySeal] no parent header found",
)
}
if chain.Config().IsStaking(parentHeader.Epoch()) {

@ -28,6 +28,7 @@ const (
mainnetV1_3Epoch = 36
mainnetV1_4Epoch = 46
mainnetV1_5Epoch = 54
mainnetV2_0Epoch = 185 // prestaking epoch
// MainNetHTTPPattern is the http pattern for mainnet.
MainNetHTTPPattern = "https://api.s%d.t.hmny.io"
@ -42,6 +43,9 @@ type mainnetSchedule struct{}
func (mainnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
switch {
case epoch.Cmp(big.NewInt(mainnetV2_0Epoch)) >= 0:
// 185 resharding epoch (for shard 0) around 14/05/2020 ~15:00 PDT
return mainnetV2_0
case epoch.Cmp(big.NewInt(mainnetV1_5Epoch)) >= 0:
// 54 resharding epoch (for shard 0) around 23/10/2019 ~10:05 PDT
return mainnetV1_5
@ -137,7 +141,7 @@ func (ms mainnetSchedule) GetShardingStructure(numShard, shardID int) []map[stri
return genShardingStructure(numShard, shardID, MainNetHTTPPattern, MainNetWSPattern)
}
var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch), big.NewInt(mainnetV1Epoch), big.NewInt(mainnetV1_1Epoch), big.NewInt(mainnetV1_2Epoch), big.NewInt(mainnetV1_3Epoch), big.NewInt(mainnetV1_4Epoch), big.NewInt(mainnetV1_5Epoch)}
var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch), big.NewInt(mainnetV1Epoch), big.NewInt(mainnetV1_1Epoch), big.NewInt(mainnetV1_2Epoch), big.NewInt(mainnetV1_3Epoch), big.NewInt(mainnetV1_4Epoch), big.NewInt(mainnetV1_5Epoch), big.NewInt(mainnetV2_0Epoch)}
var (
mainnetV0 = MustNewInstance(4, 150, 112, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch())
@ -151,4 +155,5 @@ var (
mainnetV1_3 = MustNewInstance(4, 250, 170, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_3, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch())
mainnetV1_4 = MustNewInstance(4, 250, 170, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_4, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch())
mainnetV1_5 = MustNewInstance(4, 250, 170, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_5, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch())
mainnetV2_0 = MustNewInstance(4, 250, 170, numeric.MustNewDecFromStr("0.68"), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_5, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch())
)

@ -27,9 +27,9 @@ var (
MainnetChainConfig = &ChainConfig{
ChainID: MainnetChainID,
CrossTxEpoch: big.NewInt(28),
CrossLinkEpoch: EpochTBD,
StakingEpoch: EpochTBD,
PreStakingEpoch: EpochTBD,
CrossLinkEpoch: big.NewInt(186),
StakingEpoch: big.NewInt(186),
PreStakingEpoch: big.NewInt(185),
EIP155Epoch: big.NewInt(28),
S3Epoch: big.NewInt(28),
ReceiptLogEpoch: big.NewInt(101),

@ -8,7 +8,6 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@ -38,7 +37,6 @@ import (
"github.com/harmony-one/harmony/webhooks"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"golang.org/x/sync/semaphore"
)
@ -140,6 +138,8 @@ type Node struct {
ContractDeployerKey *ecdsa.PrivateKey
ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block
ContractAddresses []common.Address
// Duplicated Ping Message Received
duplicatedPing sync.Map
// Channel to notify consensus service to really start consensus
startConsensus chan struct{}
// node configuration, including group ID, shard ID, etc
@ -371,134 +371,43 @@ func (node *Node) Start() error {
if len(allTopics) == 0 {
return errors.New("have no topics to listen to")
}
const (
maxMessageHandlers = 200
threshold = maxMessageHandlers / 2
lastLine = 20
throttle = 100 * time.Millisecond
emrgThrottle = 250 * time.Millisecond
)
weighted := make([]*semaphore.Weighted, len(allTopics))
const maxMessageHandlers = 2000
ctx := context.Background()
ownID := node.host.GetID()
errChan := make(chan error)
for i := range allTopics {
sub, err := allTopics[i].Topic.Subscribe()
for i, topic := range allTopics {
sub, err := topic.Subscribe()
if err != nil {
return err
}
topicNamed := allTopics[i].Name
sem := semaphore.NewWeighted(maxMessageHandlers)
weighted[i] = semaphore.NewWeighted(maxMessageHandlers)
msgChan := make(chan *libp2p_pubsub.Message)
needThrottle, releaseThrottle :=
make(chan time.Duration), make(chan struct{})
go func() {
soFar := int32(maxMessageHandlers)
sampled := utils.Logger().Sample(
zerolog.LevelSampler{
DebugSampler: &zerolog.BurstSampler{
Burst: 1,
Period: 36 * time.Second,
NextSampler: &zerolog.BasicSampler{N: 1000},
},
},
).With().Str("pubsub-topic", topicNamed).Logger()
go func(msgChan chan *libp2p_pubsub.Message, sem *semaphore.Weighted) {
for msg := range msgChan {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
msg := msg
go func() {
defer cancel()
defer atomic.AddInt32(&soFar, 1)
current := atomic.AddInt32(&soFar, -1)
using := maxMessageHandlers - current
if using > 1 {
sampled.Info().
Int32("currently-using", using).
Msg("sampling message handling")
}
if current == 0 {
utils.Logger().Debug().Msg("no available semaphores to handle p2p messages")
return
}
var cost int64 = 1
if current <= threshold {
cost = 2
if current == threshold {
go func() {
needThrottle <- throttle
}()
} else if current == lastLine {
go func() {
needThrottle <- emrgThrottle
}()
}
} else {
if current == threshold+1 {
cost = 1
go func() {
releaseThrottle <- struct{}{}
}()
}
}
if sem.TryAcquire(cost) {
defer sem.Release(cost)
payload := msg.GetData()
if len(payload) < p2pMsgPrefixSize {
cancel()
// TODO understand why this happens
return
}
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
utils.Logger().Info().
Str("topic", topicNamed).Msg("exceeded deadline")
}
errChan <- ctx.Err()
default:
node.HandleMessage(
payload[p2pMsgPrefixSize:], msg.GetFrom(),
)
}
}
}()
payload := msg.GetData()
if len(payload) < p2pMsgPrefixSize {
continue
}
if sem.TryAcquire(1) {
go func() {
node.HandleMessage(
payload[p2pMsgPrefixSize:], msg.GetFrom(),
)
sem.Release(1)
}()
} else {
utils.Logger().Warn().
Msg("could not acquire semaphore to process incoming message")
}
}
}()
go func() {
slowDown, coolDown := false, throttle
}(msgChan, weighted[i])
go func(msgChan chan *libp2p_pubsub.Message) {
for {
select {
case s := <-needThrottle:
slowDown = true
coolDown = s
utils.Logger().Info().
Dur("throttle-delay-miliseconds", s.Round(time.Millisecond)).
Msg("throttle needed on acceptance of messages")
case <-releaseThrottle:
utils.Logger().Info().Msg("p2p throttle released")
slowDown = false
default:
if slowDown {
<-time.After(coolDown)
}
}
nextMsg, err := sub.Next(context.Background())
nextMsg, err := sub.Next(ctx)
if err != nil {
errChan <- err
continue
@ -508,7 +417,7 @@ func (node *Node) Start() error {
}
msgChan <- nextMsg
}
}()
}(msgChan)
}
for err := range errChan {

@ -57,6 +57,13 @@ func (node *Node) processSkippedMsgTypeByteValue(
// HandleMessage parses the message and dispatch the actions.
func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
// log in-coming metrics
node.host.LogRecvMessage(content)
utils.Logger().Debug().
Int64("TotalIn", node.host.GetBandwidthTotals().TotalIn).
Float64("RateIn", node.host.GetBandwidthTotals().RateIn).
Msg("[metrics][p2p] traffic in in bytes")
msgCategory, err := proto.GetMessageCategory(content)
if err != nil {
utils.Logger().Error().
@ -313,9 +320,6 @@ func (node *Node) BroadcastCrossLink() {
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are
// running consensus on
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
if newBlock == nil || newBlock.Header() == nil {
return errors.New("nil header or block asked to verify")
}
if err := node.Blockchain().Validator().ValidateHeader(newBlock, true); err != nil {
utils.Logger().Error().
Str("blockHash", newBlock.Hash().Hex()).
@ -468,6 +472,10 @@ func (node *Node) PostConsensusProcessing(
// Broadcast client requested missing cross shard receipts if there is any
node.BroadcastMissingCXReceipts()
// Clear metrics after one consensus cycle
node.host.ResetMetrics()
utils.Logger().Debug().Msg("[metrics][p2p] Reset after 1 consensus cycle")
// Update consensus keys at last so the change of leader status doesn't mess up normal flow
if len(newBlock.Header().ShardState()) > 0 {
node.Consensus.SetMode(node.Consensus.UpdateConsensusInformation())

@ -406,8 +406,6 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
// this is the out of sync node acts as grpc server side
case downloader_pb.DownloaderRequest_NEWBLOCK:
node.stateMutex.Lock()
defer node.stateMutex.Unlock()
if node.State != NodeNotInSync {
utils.Logger().Debug().
Str("state", node.State.String()).

@ -35,8 +35,12 @@ type Host interface {
ConnectHostPeer(Peer) error
// SendMessageToGroups sends a message to one or more multicast groups.
SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error
AllTopics() []NamedTopic
AllTopics() []*libp2p_pubsub.Topic
C() (int, int, int)
// libp2p.metrics related
GetBandwidthTotals() libp2p_metrics.Stats
LogRecvMessage(msg []byte)
ResetMetrics()
}
// Peer is the object for a p2p peer (node)
@ -66,14 +70,9 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) {
return nil, errors.Wrapf(err,
"cannot create listen multiaddr from port %#v", self.Port)
}
ctx := context.Background()
p2pHost, err := libp2p.New(ctx,
libp2p.ListenAddrs(listenAddr),
libp2p.Identity(key),
// libp2p.DisableRelay(),
libp2p.EnableNATService(),
libp2p.ForceReachabilityPublic(),
libp2p.ListenAddrs(listenAddr), libp2p.Identity(key),
)
if err != nil {
return nil, errors.Wrapf(err, "cannot initialize libp2p host")
@ -97,14 +96,17 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) {
self.PeerID = p2pHost.ID()
subLogger := utils.Logger().With().Str("hostID", p2pHost.ID().Pretty()).Logger()
newMetrics := libp2p_metrics.NewBandwidthCounter()
// has to save the private key for host
h := &HostV2{
h: p2pHost,
joiner: topicJoiner{pubsub},
joined: map[string]*libp2p_pubsub.Topic{},
self: *self,
priKey: key,
logger: &subLogger,
h: p2pHost,
joiner: topicJoiner{pubsub},
joined: map[string]*libp2p_pubsub.Topic{},
self: *self,
priKey: key,
logger: &subLogger,
metrics: newMetrics,
}
if err != nil {
@ -188,7 +190,13 @@ func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte)
err = e
continue
}
// log out-going metrics
host.metrics.LogSentMessage(int64(len(msg)))
}
host.logger.Debug().
Int64("TotalOut", host.GetBandwidthTotals().TotalOut).
Float64("RateOut", host.GetBandwidthTotals().RateOut).
Msg("[metrics][p2p] traffic out in bytes")
return err
}
@ -245,6 +253,21 @@ func (host *HostV2) GetPeerCount() int {
return host.h.Peerstore().Peers().Len()
}
// GetBandwidthTotals returns total bandwidth of a node
func (host *HostV2) GetBandwidthTotals() libp2p_metrics.Stats {
return host.metrics.GetBandwidthTotals()
}
// LogRecvMessage logs received message on node
func (host *HostV2) LogRecvMessage(msg []byte) {
host.metrics.LogRecvMessage(int64(len(msg)))
}
// ResetMetrics resets metrics counters
func (host *HostV2) ResetMetrics() {
host.metrics.Reset()
}
// ConnectHostPeer connects to peer host
func (host *HostV2) ConnectHostPeer(peer Peer) error {
ctx := context.Background()
@ -267,19 +290,13 @@ func (host *HostV2) ConnectHostPeer(peer Peer) error {
return nil
}
// NamedTopic ..
type NamedTopic struct {
Name string
Topic *libp2p_pubsub.Topic
}
// AllTopics ..
func (host *HostV2) AllTopics() []NamedTopic {
func (host *HostV2) AllTopics() []*libp2p_pubsub.Topic {
host.lock.Lock()
defer host.lock.Unlock()
var topics []NamedTopic
for k, g := range host.joined {
topics = append(topics, NamedTopic{k, g})
topics := []*libp2p_pubsub.Topic{}
for _, g := range host.joined {
topics = append(topics, g)
}
return topics
}

@ -1,6 +1,6 @@
#!/usr/bin/env bash
version="v1 20200507.0"
version="v1 20200521.0"
unset -v progname
progname="${0##*/}"
@ -23,6 +23,19 @@ err() {
exit "${code}"
}
# b: beginning
# r: range
# return random number between b ~ b+r
random() {
local b=$1
local r=$2
if [ $r -le 0 ]; then
r=100
fi
local rand=$(( $(od -A n -t d -N 3 /dev/urandom | grep -oE '[0-9]+') % r ))
echo $(( b + rand ))
}
# https://www.linuxjournal.com/content/validating-ip-address-bash-script
function valid_ip()
{
@ -171,7 +184,6 @@ options:
-n port specify the public base port of the node (default: 9000)
-T nodetype specify the node type (validator, explorer; default: validator)
-i shardid specify the shard id (valid only with explorer node; default: 1)
-b download harmony_db files from shard specified by -i <shardid> (default: off)
-a dbfile specify the db file to download (default:off)
-U FOLDER specify the upgrade folder to download binaries
-P enable public rpc end point (default:off)
@ -188,7 +200,8 @@ options:
-r address start a pprof profiling server listening on the specified address
-I use statically linked Harmony binary (default: true)
-R tracefile enable p2p trace using tracefile (default: off)
-L limit broadcasting of invalid transactions (default: off)
-l limit broadcasting of invalid transactions (default: off)
-L log_level logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: $log_level)
examples:
@ -239,8 +252,8 @@ usage() {
BUCKET=pub.harmony.one
OS=$(uname -s)
unset start_clean loop run_as_root blspass do_not_download download_only network node_type shard_id download_harmony_db db_file_to_dl
unset upgrade_rel public_rpc staking_mode pub_port multi_key blsfolder blacklist verify TRACEFILE minpeers max_bls_keys_per_node broadcast_invalid_tx
unset start_clean loop run_as_root blspass do_not_download download_only network node_type shard_id broadcast_invalid_tx
unset upgrade_rel public_rpc staking_mode pub_port multi_key blsfolder blacklist verify TRACEFILE minpeers max_bls_keys_per_node log_level
start_clean=false
loop=true
run_as_root=true
@ -249,7 +262,6 @@ download_only=false
network=mainnet
node_type=validator
shard_id=-1
download_harmony_db=false
public_rpc=false
staking_mode=false
multi_key=false
@ -262,17 +274,17 @@ verify=false
minpeers=6
max_bls_keys_per_node=10
broadcast_invalid_tx=true
log_level=3
${BLSKEYFILE=}
${TRACEFILE=}
unset OPTIND OPTARG opt
OPTIND=1
while getopts :1chk:sSp:dDN:T:i:ba:U:PvVyzn:MAIB:r:Y:f:R:m:L opt
while getopts :1chk:sSp:dDN:T:i:U:PvVyzn:MAIB:r:Y:f:R:m:L:l opt
do
case "${opt}" in
'?') usage "unrecognized option -${OPTARG}";;
':') usage "missing argument for -${OPTARG}";;
b) download_harmony_db=true;;
c) start_clean=true;;
1) loop=false;;
h) print_usage; exit 0;;
@ -290,7 +302,6 @@ do
T) node_type="${OPTARG}";;
i) shard_id="${OPTARG}";;
I) static=true;;
a) db_file_to_dl="${OPTARG}";;
U) upgrade_rel="${OPTARG}";;
P) public_rpc=true;;
B) blacklist="${OPTARG}";;
@ -304,7 +315,8 @@ do
y) staking_mode=false;;
A) archival=true;;
R) TRACEFILE="${OPTARG}";;
L) broadcast_invalid_tx=false;;
l) broadcast_invalid_tx=false;;
L) log_level="${OPTARG}";;
*) err 70 "unhandled option -${OPTARG}";; # EX_SOFTWARE
esac
done
@ -476,24 +488,10 @@ download_binaries() {
verify_checksum "${outdir}" "${bin}" md5sum.txt || return $?
msg "downloaded ${bin}"
done
chmod +x "${outdir}/harmony"
chmod +x "${outdir}/harmony" "${outdir}/node.sh"
(cd "${outdir}" && exec openssl sha256 $(cut -c35- md5sum.txt)) > "${outdir}/harmony-checksums.txt"
}
check_free_disk() {
local dir
dir="${1:-.}"
local free_disk=$(df -BG $dir | tail -n 1 | awk ' { print $4 } ' | tr -d G)
# need at least 50G free disk space
local need_disk=50
if [ $free_disk -gt $need_disk ]; then
return 0
else
return 1
fi
}
_curl_check_exist() {
local url=$1
local statuscode=$(curl -I --silent --output /dev/null --write-out "%{http_code}" $url)
@ -519,67 +517,17 @@ _curl_download() {
fi
}
download_harmony_db_file() {
local shard_id
shard_id="${1}"
local file_to_dl="${2}"
local outdir=db
if ! check_free_disk; then
err 70 "do not have enough free disk space to download db tarball"
fi
url="http://${BUCKET}.s3.amazonaws.com/${FOLDER}/db/md5sum.txt"
rm -f "${outdir}/md5sum.txt"
if ! _curl_download $url "${outdir}" md5sum.txt; then
err 70 "cannot download md5sum.txt"
fi
if [ -n "${file_to_dl}" ]; then
if grep -q "${file_to_dl}" "${outdir}/md5sum.txt"; then
url="http://${BUCKET}.s3.amazonaws.com/${FOLDER}/db/${file_to_dl}"
if _curl_download $url "${outdir}" ${file_to_dl}; then
verify_checksum "${outdir}" "${file_to_dl}" md5sum.txt || return $?
msg "downloaded ${file_to_dl}, extracting ..."
tar -C "${outdir}" -xvf "${outdir}/${file_to_dl}"
else
msg "can't download ${file_to_dl}"
fi
fi
return
fi
files=$(awk '{ print $2 }' ${outdir}/md5sum.txt)
echo "[available harmony db files for shard ${shard_id}]"
grep -oE "harmony_db_${shard_id}"-.*.tar "${outdir}/md5sum.txt"
echo
for file in $files; do
if [[ $file =~ "harmony_db_${shard_id}" ]]; then
echo -n "Do you want to download ${file} (choose one only) [y/n]?"
read yesno
if [[ "$yesno" = "y" || "$yesno" = "Y" ]]; then
url="http://${BUCKET}.s3.amazonaws.com/${FOLDER}/db/$file"
if _curl_download $url "${outdir}" $file; then
verify_checksum "${outdir}" "${file}" md5sum.txt || return $?
msg "downloaded $file, extracting ..."
tar -C "${outdir}" -xvf "${outdir}/${file}"
else
msg "can't download $file"
fi
break
fi
fi
done
}
any_new_binaries() {
local outdir
${do_not_download} && return 0
outdir="${1}"
mkdir -p "${outdir}"
curl -L https://harmony.one/pubkey -o "${outdir}/harmony_pubkey.pem"
if ! grep -q "BEGIN\ PUBLIC\ KEY" "${outdir}/harmony_pubkey.pem"; then
msg "failed to downloaded harmony public signing key"
return 1
if ${verify}; then
curl -L https://harmony.one/pubkey -o "${outdir}/harmony_pubkey.pem"
if ! grep -q "BEGIN\ PUBLIC\ KEY" "${outdir}/harmony_pubkey.pem"; then
msg "failed to downloaded harmony public signing key"
return 1
fi
fi
curl -sSf http://${BUCKET}.s3.amazonaws.com/${FOLDER}/md5sum.txt -o "${outdir}/md5sum.txt.new" || return $?
if diff $outdir/md5sum.txt.new md5sum.txt
@ -602,11 +550,6 @@ if ${download_only}; then
exit 0
fi
if ${download_harmony_db}; then
download_harmony_db_file "${shard_id}" "${db_file_to_dl}" || err 70 "download harmony_db file failed"
exit 0
fi
if ${run_as_root}; then
check_root
fi
@ -653,15 +596,11 @@ fi
NODE_PORT=${pub_port:-9000}
PUB_IP=
PUSHGATEWAY_IP=
PUSHGATEWAY_PORT=
if [ "$OS" == "Linux" ]; then
if ${run_as_root}; then
setup_env
fi
# Kill existing soldier/node
fuser -k -n tcp $NODE_PORT
fi
# find my public ip address
@ -846,8 +785,9 @@ rm_bls_pass() {
{
while ${loop}
do
msg "re-downloading binaries in 5m"
sleep 300
msg "re-downloading binaries in 5~10m"
redl_sec=$( random 300 300 )
sleep $redl_sec
if any_new_binaries staging
then
msg "binaries did not change"
@ -855,8 +795,9 @@ rm_bls_pass() {
fi
while ! download_binaries staging
do
msg "staging download failed; retrying in 30s"
sleep 30
msg "staging download failed; retrying in 30~60s"
retry_sec=$( random 30 30 )
sleep $retry_sec
done
if diff staging/harmony-checksums.txt harmony-checksums.txt
then
@ -896,6 +837,7 @@ do
-min_peers="${minpeers}"
-max_bls_keys_per_node="${max_bls_keys_per_node}"
-broadcast_invalid_tx="${broadcast_invalid_tx}"
-verbosity="${log_level}"
)
args+=(
-is_archival="${archival}"
@ -952,7 +894,7 @@ do
*) ld_path_var=LD_LIBRARY_PATH;;
esac
run() {
(sleep 30 && rm_bls_pass)&
(sleep 60 && rm_bls_pass)&
env "${ld_path_var}=$(pwd)" ./harmony "${args[@]}" "${@}"
}
case "${blspass:+set}" in

@ -157,6 +157,7 @@ type AccumulatedOverLifetime struct {
BlockReward *big.Int `json:"reward-accumulated"`
Signing counters `json:"blocks"`
APR numeric.Dec `json:"apr"`
EpochAPRs []APREntry `json:"epoch-apr"`
}
func (w ValidatorWrapper) String() string {

Loading…
Cancel
Save