From 2b256322194cf09392d59ebf8b5fb2cfa65066c9 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 13 May 2020 12:18:51 -0700 Subject: [PATCH 01/35] Revert "[node] Throttle acceptance of incoming messages (#2970)" This reverts commit c2f089f6181904225faab33f8a647f8f18ccdb98. --- consensus/consensus_v2.go | 10 +-- consensus/leader.go | 20 +++-- consensus/reward/schedule.go | 6 ++ core/block_validator.go | 7 +- go.mod | 4 +- node/node.go | 147 +++++++---------------------------- node/node_handler.go | 14 +++- node/node_syncing.go | 2 - p2p/host.go | 63 +++++++++------ 9 files changed, 104 insertions(+), 169 deletions(-) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index e3dd98b4c..63833393e 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -334,7 +334,7 @@ func (consensus *Consensus) Start( blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{}, ) { go func() { - toStart := make(chan struct{}, 1) + toStart := false isInitialLeader := consensus.IsLeader() if isInitialLeader { consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Waiting for consensus start") @@ -342,7 +342,7 @@ func (consensus *Consensus) Start( // this signal is consumed by node object to create a new block and in turn trigger a new consensus on it go func() { <-startChannel - toStart <- struct{}{} + toStart = true consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") consensus.ReadySignal <- struct{}{} }() @@ -360,15 +360,11 @@ func (consensus *Consensus) Start( vdfInProgress := false // Set up next block due time. consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) - start := false - for { select { - case <-toStart: - start = true case <-ticker.C: consensus.getLogger().Debug().Msg("[ConsensusMainLoop] Ticker") - if !start && isInitialLeader { + if !toStart && isInitialLeader { continue } for k, v := range consensus.consensusTimeout { diff --git a/consensus/leader.go b/consensus/leader.go index cdaa74537..0c002df84 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -262,24 +262,28 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { return } - viewID := consensus.viewID - quorumIsMet := consensus.Decider.IsQuorumAchieved(quorum.Commit) if !quorumWasMet && quorumIsMet { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") - next := consensus.NextBlockDue - time.AfterFunc(2*time.Second, func() { - <-time.After(time.Until(next)) + go func(viewID uint64) { + consensus.getLogger().Debug().Msg("[OnCommit] Starting Grace Period") + // Always wait for 2 seconds as minimum grace period + time.Sleep(2 * time.Second) + if n := time.Now(); n.Before(consensus.NextBlockDue) { + // Sleep to wait for the full block time + time.Sleep(consensus.NextBlockDue.Sub(n)) + } + logger.Debug().Msg("[OnCommit] Commit Grace Period Ended") consensus.commitFinishChan <- viewID - }) + }(consensus.viewID) consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED) } if consensus.Decider.IsAllSigsCollected() { - go func() { + go func(viewID uint64) { consensus.commitFinishChan <- viewID logger.Info().Msg("[OnCommit] 100% Enough commits received") - }() + }(consensus.viewID) } } diff --git a/consensus/reward/schedule.go b/consensus/reward/schedule.go index 3d824a573..c136695f8 100644 --- a/consensus/reward/schedule.go +++ b/consensus/reward/schedule.go @@ -4,6 +4,7 @@ import ( "sort" "time" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/numeric" ) @@ -142,5 +143,10 @@ func PercentageForTimeStamp(ts int64) numeric.Dec { j++ } + utils.Logger().Info(). + Str("percent of total-supply used", bucket.share.Mul(numeric.NewDec(100)).String()). + Str("for-time", time.Unix(ts, 0).String()). + Msg("Picked Percentage for timestamp") + return bucket.share } diff --git a/core/block_validator.go b/core/block_validator.go index f05a623d5..91957b0fb 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -116,7 +116,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.DB, re if root := statedb.IntermediateRoot(v.config.IsS3(header.Epoch())); header.Root() != root { dump, _ := rlp.EncodeToBytes(header) const msg = "invalid merkle root (remote: %x local: %x, rlp dump %s)" - return errors.Errorf(msg, header.Root(), root, hex.EncodeToString(dump)) + return fmt.Errorf(msg, header.Root(), root, hex.EncodeToString(dump)) } return nil } @@ -125,10 +125,7 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.DB, re // given engine. Verifying the seal may be done optionally here, or explicitly // via the VerifySeal method. func (v *BlockValidator) ValidateHeader(block *types.Block, seal bool) error { - if h := block.Header(); h != nil { - return v.engine.VerifyHeader(v.bc, h, true) - } - return errors.New("header field was nil") + return v.engine.VerifyHeader(v.bc, block.Header(), true) } // ValidateHeaders verifies a batch of blocks' headers concurrently. The method returns a quit channel diff --git a/go.mod b/go.mod index d416cca6f..313f5d875 100644 --- a/go.mod +++ b/go.mod @@ -34,8 +34,8 @@ require ( github.com/karalabe/hid v1.0.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/libp2p/go-addr-util v0.0.2 // indirect - github.com/libp2p/go-libp2p v0.8.2 - github.com/libp2p/go-libp2p-core v0.5.3 + github.com/libp2p/go-libp2p v0.7.4 + github.com/libp2p/go-libp2p-core v0.5.1 github.com/libp2p/go-libp2p-crypto v0.1.0 github.com/libp2p/go-libp2p-discovery v0.3.0 github.com/libp2p/go-libp2p-host v0.1.0 diff --git a/node/node.go b/node/node.go index fd03733d5..bf5325f5d 100644 --- a/node/node.go +++ b/node/node.go @@ -8,7 +8,6 @@ import ( "os" "strings" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -38,7 +37,6 @@ import ( "github.com/harmony-one/harmony/webhooks" libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" - "github.com/rs/zerolog" "golang.org/x/sync/semaphore" ) @@ -140,6 +138,8 @@ type Node struct { ContractDeployerKey *ecdsa.PrivateKey ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block ContractAddresses []common.Address + // Duplicated Ping Message Received + duplicatedPing sync.Map // Channel to notify consensus service to really start consensus startConsensus chan struct{} // node configuration, including group ID, shard ID, etc @@ -356,134 +356,43 @@ func (node *Node) Start() error { if len(allTopics) == 0 { return errors.New("have no topics to listen to") } - - const ( - maxMessageHandlers = 200 - threshold = maxMessageHandlers / 2 - lastLine = 20 - throttle = 100 * time.Millisecond - emrgThrottle = 250 * time.Millisecond - ) - + weighted := make([]*semaphore.Weighted, len(allTopics)) + const maxMessageHandlers = 200 + ctx := context.Background() ownID := node.host.GetID() errChan := make(chan error) - for i := range allTopics { - sub, err := allTopics[i].Topic.Subscribe() + for i, topic := range allTopics { + sub, err := topic.Subscribe() if err != nil { return err } - topicNamed := allTopics[i].Name - sem := semaphore.NewWeighted(maxMessageHandlers) + weighted[i] = semaphore.NewWeighted(maxMessageHandlers) msgChan := make(chan *libp2p_pubsub.Message) - needThrottle, releaseThrottle := - make(chan time.Duration), make(chan struct{}) - - go func() { - soFar := int32(maxMessageHandlers) - - sampled := utils.Logger().Sample( - zerolog.LevelSampler{ - DebugSampler: &zerolog.BurstSampler{ - Burst: 1, - Period: 36 * time.Second, - NextSampler: &zerolog.BasicSampler{N: 1000}, - }, - }, - ).With().Str("pubsub-topic", topicNamed).Logger() + go func(msgChan chan *libp2p_pubsub.Message, sem *semaphore.Weighted) { for msg := range msgChan { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - msg := msg - - go func() { - defer cancel() - defer atomic.AddInt32(&soFar, 1) - - current := atomic.AddInt32(&soFar, -1) - using := maxMessageHandlers - current - - if using > 1 { - sampled.Info(). - Int32("currently-using", using). - Msg("sampling message handling") - } - - if current == 0 { - utils.Logger().Debug().Msg("no available semaphores to handle p2p messages") - return - } - - var cost int64 = 1 - - if current <= threshold { - cost = 2 - if current == threshold { - go func() { - needThrottle <- throttle - }() - } else if current == lastLine { - go func() { - needThrottle <- emrgThrottle - }() - } - } else { - if current == threshold+1 { - cost = 1 - go func() { - releaseThrottle <- struct{}{} - }() - } - } - - if sem.TryAcquire(cost) { - defer sem.Release(cost) - payload := msg.GetData() - if len(payload) < p2pMsgPrefixSize { - cancel() - // TODO understand why this happens - return - } - select { - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - utils.Logger().Info(). - Str("topic", topicNamed).Msg("exceeded deadline") - } - errChan <- ctx.Err() - default: - node.HandleMessage( - payload[p2pMsgPrefixSize:], msg.GetFrom(), - ) - } - } - }() - - } - }() - - go func() { - slowDown, coolDown := false, throttle - - for { - - select { - case s := <-needThrottle: - slowDown = true - coolDown = s + payload := msg.GetData() + if len(payload) < p2pMsgPrefixSize { + continue + } + if sem.TryAcquire(1) { + go func() { + node.HandleMessage( + payload[p2pMsgPrefixSize:], msg.GetFrom(), + ) + sem.Release(1) + }() + } else { utils.Logger().Info(). - Dur("throttle-delay-miliseconds", s.Round(time.Millisecond)). - Msg("throttle needed on acceptance of messages") - case <-releaseThrottle: - utils.Logger().Info().Msg("p2p throttle released") - slowDown = false - default: - if slowDown { - <-time.After(coolDown) - } + Msg("could not acquire semaphore to process incoming message") } + } + }(msgChan, weighted[i]) - nextMsg, err := sub.Next(context.Background()) + go func(msgChan chan *libp2p_pubsub.Message) { + for { + nextMsg, err := sub.Next(ctx) if err != nil { errChan <- err continue @@ -493,7 +402,7 @@ func (node *Node) Start() error { } msgChan <- nextMsg } - }() + }(msgChan) } for err := range errChan { diff --git a/node/node_handler.go b/node/node_handler.go index 125c94693..c3fda4e2f 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -57,6 +57,13 @@ func (node *Node) processSkippedMsgTypeByteValue( // HandleMessage parses the message and dispatch the actions. func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) { + // log in-coming metrics + node.host.LogRecvMessage(content) + utils.Logger().Info(). + Int64("TotalIn", node.host.GetBandwidthTotals().TotalIn). + Float64("RateIn", node.host.GetBandwidthTotals().RateIn). + Msg("[metrics][p2p] traffic in in bytes") + msgCategory, err := proto.GetMessageCategory(content) if err != nil { utils.Logger().Error(). @@ -288,9 +295,6 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) { // VerifyNewBlock is called by consensus participants to verify the block (account model) they are // running consensus on func (node *Node) VerifyNewBlock(newBlock *types.Block) error { - if newBlock == nil || newBlock.Header() == nil { - return errors.New("nil header or block asked to verify") - } if err := node.Blockchain().Validator().ValidateHeader(newBlock, true); err != nil { utils.Logger().Error(). Str("blockHash", newBlock.Hash().Hex()). @@ -447,6 +451,10 @@ func (node *Node) PostConsensusProcessing( // Broadcast client requested missing cross shard receipts if there is any node.BroadcastMissingCXReceipts() + // Clear metrics after one consensus cycle + node.host.ResetMetrics() + utils.Logger().Info().Msg("[metrics][p2p] Reset after 1 consensus cycle") + // Update consensus keys at last so the change of leader status doesn't mess up normal flow if len(newBlock.Header().ShardState()) > 0 { node.Consensus.SetMode(node.Consensus.UpdateConsensusInformation()) diff --git a/node/node_syncing.go b/node/node_syncing.go index fa74789d4..641e27b11 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -410,8 +410,6 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in // this is the out of sync node acts as grpc server side case downloader_pb.DownloaderRequest_NEWBLOCK: - node.stateMutex.Lock() - defer node.stateMutex.Unlock() if node.State != NodeNotInSync { utils.Logger().Debug(). Str("state", node.State.String()). diff --git a/p2p/host.go b/p2p/host.go index fd91bcd5b..3e77d4303 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -35,8 +35,12 @@ type Host interface { ConnectHostPeer(Peer) error // SendMessageToGroups sends a message to one or more multicast groups. SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) error - AllTopics() []NamedTopic + AllTopics() []*libp2p_pubsub.Topic C() (int, int, int) + // libp2p.metrics related + GetBandwidthTotals() libp2p_metrics.Stats + LogRecvMessage(msg []byte) + ResetMetrics() } // Peer is the object for a p2p peer (node) @@ -66,14 +70,9 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { return nil, errors.Wrapf(err, "cannot create listen multiaddr from port %#v", self.Port) } - ctx := context.Background() p2pHost, err := libp2p.New(ctx, - libp2p.ListenAddrs(listenAddr), - libp2p.Identity(key), - // libp2p.DisableRelay(), - libp2p.EnableNATService(), - libp2p.ForceReachabilityPublic(), + libp2p.ListenAddrs(listenAddr), libp2p.Identity(key), ) if err != nil { return nil, errors.Wrapf(err, "cannot initialize libp2p host") @@ -97,14 +96,17 @@ func NewHost(self *Peer, key libp2p_crypto.PrivKey) (Host, error) { self.PeerID = p2pHost.ID() subLogger := utils.Logger().With().Str("hostID", p2pHost.ID().Pretty()).Logger() + newMetrics := libp2p_metrics.NewBandwidthCounter() + // has to save the private key for host h := &HostV2{ - h: p2pHost, - joiner: topicJoiner{pubsub}, - joined: map[string]*libp2p_pubsub.Topic{}, - self: *self, - priKey: key, - logger: &subLogger, + h: p2pHost, + joiner: topicJoiner{pubsub}, + joined: map[string]*libp2p_pubsub.Topic{}, + self: *self, + priKey: key, + logger: &subLogger, + metrics: newMetrics, } if err != nil { @@ -188,7 +190,13 @@ func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) err = e continue } + // log out-going metrics + host.metrics.LogSentMessage(int64(len(msg))) } + host.logger.Info(). + Int64("TotalOut", host.GetBandwidthTotals().TotalOut). + Float64("RateOut", host.GetBandwidthTotals().RateOut). + Msg("[metrics][p2p] traffic out in bytes") return err } @@ -245,6 +253,21 @@ func (host *HostV2) GetPeerCount() int { return host.h.Peerstore().Peers().Len() } +// GetBandwidthTotals returns total bandwidth of a node +func (host *HostV2) GetBandwidthTotals() libp2p_metrics.Stats { + return host.metrics.GetBandwidthTotals() +} + +// LogRecvMessage logs received message on node +func (host *HostV2) LogRecvMessage(msg []byte) { + host.metrics.LogRecvMessage(int64(len(msg))) +} + +// ResetMetrics resets metrics counters +func (host *HostV2) ResetMetrics() { + host.metrics.Reset() +} + // ConnectHostPeer connects to peer host func (host *HostV2) ConnectHostPeer(peer Peer) error { ctx := context.Background() @@ -267,19 +290,13 @@ func (host *HostV2) ConnectHostPeer(peer Peer) error { return nil } -// NamedTopic .. -type NamedTopic struct { - Name string - Topic *libp2p_pubsub.Topic -} - // AllTopics .. -func (host *HostV2) AllTopics() []NamedTopic { +func (host *HostV2) AllTopics() []*libp2p_pubsub.Topic { host.lock.Lock() defer host.lock.Unlock() - var topics []NamedTopic - for k, g := range host.joined { - topics = append(topics, NamedTopic{k, g}) + topics := []*libp2p_pubsub.Topic{} + for _, g := range host.joined { + topics = append(topics, g) } return topics } From b5cc34f1e66d71dc3d9fd214d4af575f345a304b Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 13 May 2020 19:33:54 +0000 Subject: [PATCH 02/35] [node.sh] chmod +x node.sh after download Signed-off-by: Leo Chen --- scripts/node.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/node.sh b/scripts/node.sh index e39d6e2a2..c35c5788b 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -493,7 +493,7 @@ download_binaries() { verify_checksum "${outdir}" "${bin}" md5sum.txt || return $? msg "downloaded ${bin}" done - chmod +x "${outdir}/harmony" + chmod +x "${outdir}/harmony" "${outdir}/node.sh" (cd "${outdir}" && exec openssl sha256 $(cut -c35- md5sum.txt)) > "${outdir}/harmony-checksums.txt" } From 7c3c5b1d0b4c2b9ef045d1a08582ebec406cdc83 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 13 May 2020 14:04:01 -0700 Subject: [PATCH 03/35] add 185 prestaking and 186 staking epoch --- internal/configs/sharding/mainnet.go | 7 ++++++- internal/params/config.go | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/configs/sharding/mainnet.go b/internal/configs/sharding/mainnet.go index dbeba768a..db5923ede 100644 --- a/internal/configs/sharding/mainnet.go +++ b/internal/configs/sharding/mainnet.go @@ -28,6 +28,7 @@ const ( mainnetV1_3Epoch = 36 mainnetV1_4Epoch = 46 mainnetV1_5Epoch = 54 + mainnetV2_0Epoch = 185 // prestaking epoch // MainNetHTTPPattern is the http pattern for mainnet. MainNetHTTPPattern = "https://api.s%d.t.hmny.io" @@ -42,6 +43,9 @@ type mainnetSchedule struct{} func (mainnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { switch { + case epoch.Cmp(big.NewInt(mainnetV2_0Epoch)) >= 0: + // 185 resharding epoch (for shard 0) around 14/05/2020 ~15:00 PDT + return mainnetV2_0 case epoch.Cmp(big.NewInt(mainnetV1_5Epoch)) >= 0: // 54 resharding epoch (for shard 0) around 23/10/2019 ~10:05 PDT return mainnetV1_5 @@ -137,7 +141,7 @@ func (ms mainnetSchedule) GetShardingStructure(numShard, shardID int) []map[stri return genShardingStructure(numShard, shardID, MainNetHTTPPattern, MainNetWSPattern) } -var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch), big.NewInt(mainnetV1Epoch), big.NewInt(mainnetV1_1Epoch), big.NewInt(mainnetV1_2Epoch), big.NewInt(mainnetV1_3Epoch), big.NewInt(mainnetV1_4Epoch), big.NewInt(mainnetV1_5Epoch)} +var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch), big.NewInt(mainnetV1Epoch), big.NewInt(mainnetV1_1Epoch), big.NewInt(mainnetV1_2Epoch), big.NewInt(mainnetV1_3Epoch), big.NewInt(mainnetV1_4Epoch), big.NewInt(mainnetV1_5Epoch), big.NewInt(mainnetV2_0Epoch)} var ( mainnetV0 = MustNewInstance(4, 150, 112, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch()) @@ -151,4 +155,5 @@ var ( mainnetV1_3 = MustNewInstance(4, 250, 170, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_3, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch()) mainnetV1_4 = MustNewInstance(4, 250, 170, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_4, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch()) mainnetV1_5 = MustNewInstance(4, 250, 170, numeric.OneDec(), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_5, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch()) + mainnetV2_0 = MustNewInstance(4, 250, 170, numeric.MustNewDecFromStr("0.68"), genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1_5, mainnetReshardingEpoch, MainnetSchedule.BlocksPerEpoch()) ) diff --git a/internal/params/config.go b/internal/params/config.go index a05e96f5b..32acd0117 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -27,9 +27,9 @@ var ( MainnetChainConfig = &ChainConfig{ ChainID: MainnetChainID, CrossTxEpoch: big.NewInt(28), - CrossLinkEpoch: EpochTBD, - StakingEpoch: EpochTBD, - PreStakingEpoch: EpochTBD, + CrossLinkEpoch: big.NewInt(186), + StakingEpoch: big.NewInt(185), + PreStakingEpoch: big.NewInt(185), EIP155Epoch: big.NewInt(28), S3Epoch: big.NewInt(28), ReceiptLogEpoch: big.NewInt(101), From 053ded89ee6aa61d70e8ab55bfce44d623453df8 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 13 May 2020 14:05:27 -0700 Subject: [PATCH 04/35] fix staking epoch number --- internal/params/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/params/config.go b/internal/params/config.go index 32acd0117..536ac8040 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -28,7 +28,7 @@ var ( ChainID: MainnetChainID, CrossTxEpoch: big.NewInt(28), CrossLinkEpoch: big.NewInt(186), - StakingEpoch: big.NewInt(185), + StakingEpoch: big.NewInt(186), PreStakingEpoch: big.NewInt(185), EIP155Epoch: big.NewInt(28), S3Epoch: big.NewInt(28), From 75d53190d1f7eef5a65656983eedc8d43cc3d49f Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 13 May 2020 21:11:01 +0000 Subject: [PATCH 05/35] [p2p] use debug level for p2p metrics data Signed-off-by: Leo Chen --- node/node_handler.go | 4 ++-- p2p/host.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index c3fda4e2f..fbcfde02e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -59,7 +59,7 @@ func (node *Node) processSkippedMsgTypeByteValue( func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) { // log in-coming metrics node.host.LogRecvMessage(content) - utils.Logger().Info(). + utils.Logger().Debug(). Int64("TotalIn", node.host.GetBandwidthTotals().TotalIn). Float64("RateIn", node.host.GetBandwidthTotals().RateIn). Msg("[metrics][p2p] traffic in in bytes") @@ -453,7 +453,7 @@ func (node *Node) PostConsensusProcessing( // Clear metrics after one consensus cycle node.host.ResetMetrics() - utils.Logger().Info().Msg("[metrics][p2p] Reset after 1 consensus cycle") + utils.Logger().Debug().Msg("[metrics][p2p] Reset after 1 consensus cycle") // Update consensus keys at last so the change of leader status doesn't mess up normal flow if len(newBlock.Header().ShardState()) > 0 { diff --git a/p2p/host.go b/p2p/host.go index 3e77d4303..a881da7c0 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -193,7 +193,7 @@ func (host *HostV2) SendMessageToGroups(groups []nodeconfig.GroupID, msg []byte) // log out-going metrics host.metrics.LogSentMessage(int64(len(msg))) } - host.logger.Info(). + host.logger.Debug(). Int64("TotalOut", host.GetBandwidthTotals().TotalOut). Float64("RateOut", host.GetBandwidthTotals().RateOut). Msg("[metrics][p2p] traffic out in bytes") From f5089b55634ebd4e99785fc2b8b9b52db7090ddf Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Wed, 13 May 2020 21:12:26 +0000 Subject: [PATCH 06/35] [node.sh] add -L to set logging verbosity level Signed-off-by: Leo Chen --- scripts/node.sh | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/scripts/node.sh b/scripts/node.sh index c35c5788b..f8f2fed25 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -version="v1 20200507.0" +version="v1 20200513.0" unset -v progname progname="${0##*/}" @@ -188,6 +188,7 @@ options: -r address start a pprof profiling server listening on the specified address -I use statically linked Harmony binary (default: true) -R tracefile enable p2p trace using tracefile (default: off) + -L log_level logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: $log_level) examples: @@ -239,7 +240,7 @@ BUCKET=pub.harmony.one OS=$(uname -s) unset start_clean loop run_as_root blspass do_not_download download_only network node_type shard_id download_harmony_db db_file_to_dl -unset upgrade_rel public_rpc staking_mode pub_port multi_key blsfolder blacklist verify TRACEFILE minpeers max_bls_keys_per_node +unset upgrade_rel public_rpc staking_mode pub_port multi_key blsfolder blacklist verify TRACEFILE minpeers max_bls_keys_per_node log_level start_clean=false loop=true run_as_root=true @@ -260,12 +261,13 @@ static=true verify=false minpeers=6 max_bls_keys_per_node=10 +log_level=3 ${BLSKEYFILE=} ${TRACEFILE=} unset OPTIND OPTARG opt OPTIND=1 -while getopts :1chk:sSp:dDN:T:i:ba:U:PvVyzn:MAIB:r:Y:f:R:m: opt +while getopts :1chk:sSp:dDN:T:i:ba:U:PvVyzn:MAIB:r:Y:f:R:m:L: opt do case "${opt}" in '?') usage "unrecognized option -${OPTARG}";; @@ -302,6 +304,7 @@ do y) staking_mode=false;; A) archival=true;; R) TRACEFILE="${OPTARG}";; + L) log_level="${OPTARG}";; *) err 70 "unhandled option -${OPTARG}";; # EX_SOFTWARE esac done @@ -911,6 +914,7 @@ do -blacklist="${blacklist}" -min_peers="${minpeers}" -max_bls_keys_per_node="${max_bls_keys_per_node}" + -verbosity="${log_level}" ) args+=( -is_archival="${archival}" From ac977cfd9d90473bf536a96e300a54d9561f5914 Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Wed, 13 May 2020 17:38:07 -0700 Subject: [PATCH 07/35] fix signers share computation to use only non-harmony nodes (#3011) --- internal/chain/reward.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/chain/reward.go b/internal/chain/reward.go index 62b04ee72..9e56d41b6 100644 --- a/internal/chain/reward.go +++ b/internal/chain/reward.go @@ -197,8 +197,11 @@ func AccumulateRewardsAndCountSigs( allSignersShare := numeric.ZeroDec() for j := range payable { - voterShare := votingPower.Voters[payable[j].BLSPublicKey].OverallPercent - allSignersShare = allSignersShare.Add(voterShare) + voter := votingPower.Voters[payable[j].BLSPublicKey] + if !voter.IsHarmonyNode { + voterShare := voter.OverallPercent + allSignersShare = allSignersShare.Add(voterShare) + } } for beaconMember := range payable { // TODO Give out whatever leftover to the last voter/handle @@ -298,8 +301,11 @@ func AccumulateRewardsAndCountSigs( allSignersShare := numeric.ZeroDec() for j := range payableSigners { - voterShare := votingPower.Voters[payableSigners[j].BLSPublicKey].OverallPercent - allSignersShare = allSignersShare.Add(voterShare) + voter := votingPower.Voters[payableSigners[j].BLSPublicKey] + if !voter.IsHarmonyNode { + voterShare := voter.OverallPercent + allSignersShare = allSignersShare.Add(voterShare) + } } for j := range payableSigners { From c09b3c3af70ea0441f93fd376cbb88c8021ca412 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Thu, 14 May 2020 09:09:20 -0700 Subject: [PATCH 08/35] Add LastEpochInCommittee at prestaking epoch too (#3016) --- internal/chain/engine.go | 3 ++- staking/availability/measure.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/chain/engine.go b/internal/chain/engine.go index 11926530f..85815eb41 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -248,11 +248,12 @@ func (e *engineImpl) Finalize( isBeaconChain := header.ShardID() == shard.BeaconChainShardID isNewEpoch := len(header.ShardState()) > 0 + inPreStakingEra := chain.Config().IsPreStaking(header.Epoch()) inStakingEra := chain.Config().IsStaking(header.Epoch()) // Process Undelegations, set LastEpochInCommittee and set EPoS status // Needs to be before AccumulateRewardsAndCountSigs - if isBeaconChain && isNewEpoch && inStakingEra { + if isBeaconChain && isNewEpoch && inPreStakingEra { if err := payoutUndelegations(chain, header, state); err != nil { return nil, nil, err } diff --git a/staking/availability/measure.go b/staking/availability/measure.go index db70880d6..1643254cd 100644 --- a/staking/availability/measure.go +++ b/staking/availability/measure.go @@ -205,6 +205,7 @@ func ComputeAndMutateEPOSStatus( wrapper.Status = effective.Inactive utils.Logger().Info(). Str("threshold", measure.String()). + Interface("computed", computed). Msg("validator failed availability threshold, set to inactive") default: // Default is no-op so validator who wants From 8703dcebb8a3e5607bf75ac5ac7cc2316fde4070 Mon Sep 17 00:00:00 2001 From: Daniel Van Der Maden Date: Thu, 14 May 2020 12:56:53 -0700 Subject: [PATCH 09/35] [txpool] Correct logging level --- core/tx_pool.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 73be23497..1cee48a92 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -920,7 +920,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { pool.priced.Put(tx) pool.journalTx(from, tx) - logger.Warn(). + logger.Info(). Str("hash", tx.Hash().Hex()). Interface("from", from). Interface("to", tx.To()). @@ -945,7 +945,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { } pool.journalTx(from, tx) - logger.Warn(). + logger.Info(). Str("hash", hash.Hex()). Interface("from", from). Interface("to", tx.To()). @@ -1220,7 +1220,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { nonce := pool.currentState.GetNonce(addr) for _, tx := range list.Forward(nonce) { hash := tx.Hash() - logger.Warn().Str("hash", hash.Hex()).Msg("Removed old queued transaction") + logger.Info().Str("hash", hash.Hex()).Msg("Removed old queued transaction") pool.all.Remove(hash) pool.priced.Removed() } @@ -1237,7 +1237,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { - logger.Warn().Str("hash", hash.Hex()).Msg("Promoting queued transaction") + logger.Info().Str("hash", hash.Hex()).Msg("Promoting queued transaction") promoted = append(promoted, tx) } } @@ -1391,7 +1391,7 @@ func (pool *TxPool) demoteUnexecutables() { // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(nonce) { hash := tx.Hash() - logger.Warn().Str("hash", hash.Hex()).Msg("Removed old pending transaction") + logger.Info().Str("hash", hash.Hex()).Msg("Removed old pending transaction") pool.all.Remove(hash) pool.priced.Removed() } @@ -1406,7 +1406,7 @@ func (pool *TxPool) demoteUnexecutables() { } for _, tx := range invalids { hash := tx.Hash() - logger.Warn().Str("hash", hash.Hex()).Msg("Demoting pending transaction") + logger.Info().Str("hash", hash.Hex()).Msg("Demoting pending transaction") if _, err := pool.enqueueTx(hash, tx); err != nil { pool.txErrorSink.Add(tx, err) } From 436de05beaf1c0f19055bd1db85282fc52c6c862 Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Thu, 14 May 2020 16:01:53 -0700 Subject: [PATCH 10/35] write validator stats at the end of prestaking epoch (#3020) * write validator stats at the end of prestaking epoch * also write out block reward accumulator --- core/offchain.go | 68 ++++++++++++++++++++++++++++------------------ hmy/api_backend.go | 9 ++++-- 2 files changed, 47 insertions(+), 30 deletions(-) diff --git a/core/offchain.go b/core/offchain.go index de987175e..71cb50864 100644 --- a/core/offchain.go +++ b/core/offchain.go @@ -35,6 +35,7 @@ func (bc *BlockChain) CommitOffChainData( rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) isBeaconChain := bc.CurrentHeader().ShardID() == shard.BeaconChainShardID isStaking := bc.chainConfig.IsStaking(block.Epoch()) + isPreStaking := bc.chainConfig.IsPreStaking(block.Epoch()) header := block.Header() isNewEpoch := len(header.ShardState()) > 0 // Cross-shard txns @@ -253,34 +254,8 @@ func (bc *BlockChain) CommitOffChainData( } } - type t struct { - addr common.Address - stats *staking.ValidatorStats - } - - sortedStats, i := make([]t, len(tempValidatorStats)), 0 - for key := range tempValidatorStats { - sortedStats[i] = t{key, tempValidatorStats[key]} - i++ - } - sort.SliceStable( - sortedStats, - func(i, j int) bool { - return bytes.Compare( - sortedStats[i].addr[:], sortedStats[j].addr[:], - ) == -1 - }, - ) - for _, stat := range sortedStats { - if err := rawdb.WriteValidatorStats( - batch, stat.addr, stat.stats, - ); err != nil { - utils.Logger().Info().Err(err). - Str("validator address", stat.addr.Hex()). - Msg("could not update stats for validator") - } - } + bc.writeValidatorStats(tempValidatorStats, batch) records := slash.Records{} if s := header.Slashes(); len(s) > 0 { @@ -292,6 +267,11 @@ func (bc *BlockChain) CommitOffChainData( } } } else { + if isNewEpoch && isPreStaking { + // if prestaking and last block, write out the validator stats + // so that it is available for the staking epoch + bc.writeValidatorStats(tempValidatorStats, batch) + } // block reward never accumulate before staking bc.WriteBlockRewardAccumulator(batch, common.Big0, block.Number().Uint64()) } @@ -300,6 +280,40 @@ func (bc *BlockChain) CommitOffChainData( return CanonStatTy, nil } +func (bc *BlockChain) writeValidatorStats( + tempValidatorStats map[common.Address]*staking.ValidatorStats, + batch rawdb.DatabaseWriter, +) { + type t struct { + addr common.Address + stats *staking.ValidatorStats + } + + sortedStats, i := make([]t, len(tempValidatorStats)), 0 + for key := range tempValidatorStats { + sortedStats[i] = t{key, tempValidatorStats[key]} + i++ + } + + sort.SliceStable( + sortedStats, + func(i, j int) bool { + return bytes.Compare( + sortedStats[i].addr[:], sortedStats[j].addr[:], + ) == -1 + }, + ) + for _, stat := range sortedStats { + if err := rawdb.WriteValidatorStats( + batch, stat.addr, stat.stats, + ); err != nil { + utils.Logger().Info().Err(err). + Str("validator address", stat.addr.Hex()). + Msg("could not update stats for validator") + } + } +} + func (bc *BlockChain) getNextBlockEpoch(header *block.Header) (*big.Int, error) { nextBlockEpoch := header.Epoch() if len(header.ShardState()) > 0 { diff --git a/hmy/api_backend.go b/hmy/api_backend.go index 0ebac5d4f..4ec576b71 100644 --- a/hmy/api_backend.go +++ b/hmy/api_backend.go @@ -433,6 +433,12 @@ func (b *APIBackend) GetValidatorInformation( ) % shard.Schedule.BlocksPerEpoch() computed.BlocksLeftInEpoch = shard.Schedule.BlocksPerEpoch() - beaconChainBlocks + if defaultReply.CurrentlyInCommittee { + defaultReply.Performance = &staking.CurrentEpochPerformance{ + CurrentSigningPercentage: *computed, + } + } + stats, err := bc.ReadValidatorStats(addr) if err != nil { // when validator has no stats, default boot-status to not booted @@ -491,9 +497,6 @@ func (b *APIBackend) GetValidatorInformation( } if defaultReply.CurrentlyInCommittee { - defaultReply.Performance = &staking.CurrentEpochPerformance{ - CurrentSigningPercentage: *computed, - } defaultReply.ComputedMetrics = stats defaultReply.EPoSWinningStake = &stats.TotalEffectiveStake } From 1b62e28073d676b6abb4b77dece7c0e4f732fc63 Mon Sep 17 00:00:00 2001 From: Edgar Aroutiounian Date: Sat, 16 May 2020 11:00:26 -0700 Subject: [PATCH 11/35] [node] Bump handlers by order of magnitude (#3026) --- node/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/node.go b/node/node.go index bf5325f5d..64c20bcd0 100644 --- a/node/node.go +++ b/node/node.go @@ -357,7 +357,7 @@ func (node *Node) Start() error { return errors.New("have no topics to listen to") } weighted := make([]*semaphore.Weighted, len(allTopics)) - const maxMessageHandlers = 200 + const maxMessageHandlers = 2000 ctx := context.Background() ownID := node.host.GetID() errChan := make(chan error) From db21d9205cb864bb36c82ee2c8812bd7fcccd554 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Sat, 16 May 2020 11:26:26 -0700 Subject: [PATCH 12/35] [log] use proper log level on messages Signed-off-by: Leo Chen --- core/offchain.go | 3 +-- node/node.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/offchain.go b/core/offchain.go index 71cb50864..85400151b 100644 --- a/core/offchain.go +++ b/core/offchain.go @@ -187,8 +187,7 @@ func (bc *BlockChain) CommitOffChainData( epoch, ).NumShards(); i < c; i++ { if err := bc.LastContinuousCrossLink(batch, i); err != nil { - utils.Logger().Info(). - Err(err).Msg("Could not roll up last continuous crosslink") + utils.Logger().Info().Msg("Could not roll up last continuous crosslink") } } } diff --git a/node/node.go b/node/node.go index 64c20bcd0..bda6054a7 100644 --- a/node/node.go +++ b/node/node.go @@ -384,7 +384,7 @@ func (node *Node) Start() error { sem.Release(1) }() } else { - utils.Logger().Info(). + utils.Logger().Warn(). Msg("could not acquire semaphore to process incoming message") } } From b8ab786244e91c3cf848e46eacf17bd9c5361d06 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sat, 16 May 2020 14:30:43 -0700 Subject: [PATCH 13/35] add nil check on getHeader (#3030) --- consensus/consensus_v2.go | 7 +++ core/blockchain.go | 70 ++++++++++++++++------------- internal/chain/engine.go | 8 ++++ internal/hmyapi/apiv1/blockchain.go | 3 ++ 4 files changed, 56 insertions(+), 32 deletions(-) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 63833393e..0fa5b7fb9 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -527,6 +527,9 @@ func (consensus *Consensus) GenerateVrfAndProof(newBlock *types.Block, vrfBlockN previousHeader := consensus.ChainReader.GetHeaderByNumber( newBlock.NumberU64() - 1, ) + if previousHeader == nil { + return vrfBlockNumbers + } previousHash := previousHeader.Hash() copy(blockHash[:], previousHash[:]) @@ -549,6 +552,10 @@ func (consensus *Consensus) ValidateVrfAndProof(headerObj *block.Header) bool { previousHeader := consensus.ChainReader.GetHeaderByNumber( headerObj.Number().Uint64() - 1, ) + if previousHeader == nil { + return false + } + previousHash := previousHeader.Hash() copy(blockHash[:], previousHash[:]) vrfProof := [96]byte{} diff --git a/core/blockchain.go b/core/blockchain.go index 2b9286244..000a99ab4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -833,14 +833,15 @@ func (bc *BlockChain) Stop() { for _, offset := range []uint64{0, 1, triesInMemory - 1} { if number := bc.CurrentBlock().NumberU64(); number > offset { recent := bc.GetHeaderByNumber(number - offset) - - utils.Logger().Info(). - Str("block", recent.Number().String()). - Str("hash", recent.Hash().Hex()). - Str("root", recent.Root().Hex()). - Msg("Writing cached state to disk") - if err := triedb.Commit(recent.Root(), true); err != nil { - utils.Logger().Error().Err(err).Msg("Failed to commit recent state trie") + if recent != nil { + utils.Logger().Info(). + Str("block", recent.Number().String()). + Str("hash", recent.Hash().Hex()). + Str("root", recent.Root().Hex()). + Msg("Writing cached state to disk") + if err := triedb.Commit(recent.Root(), true); err != nil { + utils.Logger().Error().Err(err).Msg("Failed to commit recent state trie") + } } } } @@ -1137,32 +1138,34 @@ func (bc *BlockChain) WriteBlockWithState( } // Find the next state trie we need to commit header := bc.GetHeaderByNumber(current - triesInMemory) - chosen := header.Number().Uint64() - - // If we exceeded out time allowance, flush an entire trie to disk - if bc.gcproc > bc.cacheConfig.TrieTimeLimit { - // If we're exceeding limits but haven't reached a large enough memory gap, - // warn the user that the system is becoming unstable. - if chosen < lastWrite+triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { - utils.Logger().Info(). - Dur("time", bc.gcproc). - Dur("allowance", bc.cacheConfig.TrieTimeLimit). - Float64("optimum", float64(chosen-lastWrite)/triesInMemory). - Msg("State in memory for too long, committing") + if header != nil { + chosen := header.Number().Uint64() + + // If we exceeded out time allowance, flush an entire trie to disk + if bc.gcproc > bc.cacheConfig.TrieTimeLimit { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < lastWrite+triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { + utils.Logger().Info(). + Dur("time", bc.gcproc). + Dur("allowance", bc.cacheConfig.TrieTimeLimit). + Float64("optimum", float64(chosen-lastWrite)/triesInMemory). + Msg("State in memory for too long, committing") + } + // Flush an entire trie and restart the counters + triedb.Commit(header.Root(), true) + lastWrite = chosen + bc.gcproc = 0 } - // Flush an entire trie and restart the counters - triedb.Commit(header.Root(), true) - lastWrite = chosen - bc.gcproc = 0 - } - // Garbage collect anything below our required write retention - for !bc.triegc.Empty() { - root, number := bc.triegc.Pop() - if uint64(-number) > chosen { - bc.triegc.Push(root, number) - break + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + triedb.Dereference(root.(common.Hash)) } - triedb.Dereference(root.(common.Hash)) } } } @@ -2128,6 +2131,9 @@ func (bc *BlockChain) IsSameLeaderAsPreviousBlock(block *types.Block) bool { } previousHeader := bc.GetHeaderByNumber(block.NumberU64() - 1) + if previousHeader == nil { + return false + } return block.Coinbase() == previousHeader.Coinbase() } diff --git a/internal/chain/engine.go b/internal/chain/engine.go index 85815eb41..88e62dea4 100644 --- a/internal/chain/engine.go +++ b/internal/chain/engine.go @@ -188,6 +188,11 @@ func (e *engineImpl) VerifySeal(chain engine.ChainReader, header *block.Header) } parentHash := header.ParentHash() parentHeader := chain.GetHeader(parentHash, header.Number().Uint64()-1) + if parentHeader == nil { + return errors.New( + "[VerifySeal] no parent header found", + ) + } if chain.Config().IsStaking(parentHeader.Epoch()) { slotList, err := chain.ReadShardState(parentHeader.Epoch()) if err != nil { @@ -554,6 +559,9 @@ func (e *engineImpl) VerifyHeaderWithSignature(chain engine.ChainReader, header func GetPublicKeys( chain engine.ChainReader, header *block.Header, reCalculate bool, ) ([]*bls.PublicKey, error) { + if header == nil { + return nil, errors.New("nil header provided") + } shardState := new(shard.State) var err error if reCalculate { diff --git a/internal/hmyapi/apiv1/blockchain.go b/internal/hmyapi/apiv1/blockchain.go index ff44a2bf5..a8129748d 100644 --- a/internal/hmyapi/apiv1/blockchain.go +++ b/internal/hmyapi/apiv1/blockchain.go @@ -447,6 +447,9 @@ func (s *PublicBlockChainAPI) GetBalance(ctx context.Context, address string, bl // BlockNumber returns the block number of the chain head. func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 { header, _ := s.b.HeaderByNumber(context.Background(), rpc.LatestBlockNumber) // latest header should always be available + if header == nil { + return 0 + } return hexutil.Uint64(header.Number().Uint64()) } From eb4fcc93ab922f371ba6b4948db7a8b70ea9aa9b Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sat, 16 May 2020 18:32:28 -0700 Subject: [PATCH 14/35] Optimize crosslink verification logic and add more cache (#3032) --- consensus/quorum/one-node-staked-vote.go | 13 ++- consensus/quorum/quorum.go | 25 +++-- node/node_cross_link.go | 119 ++++++++++++++++++++--- node/node_newblock.go | 9 +- staking/verify/verify.go | 11 +-- 5 files changed, 137 insertions(+), 40 deletions(-) diff --git a/consensus/quorum/one-node-staked-vote.go b/consensus/quorum/one-node-staked-vote.go index 1ad1b5046..44bcfb360 100644 --- a/consensus/quorum/one-node-staked-vote.go +++ b/consensus/quorum/one-node-staked-vote.go @@ -2,6 +2,7 @@ package quorum import ( "encoding/json" + errors2 "errors" "math/big" "github.com/harmony-one/harmony/consensus/votepower" @@ -78,8 +79,6 @@ func (v *stakedVoteWeight) IsQuorumAchievedByMask(mask *bls_cosi.Mask) bool { return (*currentTotalPower).GT(threshold) } func (v *stakedVoteWeight) computeCurrentTotalPower(p Phase) (*numeric.Dec, error) { - w := shard.BLSPublicKey{} - members := v.Participants() ballot := func() *voteBox { switch p { case Prepare: @@ -94,10 +93,14 @@ func (v *stakedVoteWeight) computeCurrentTotalPower(p Phase) (*numeric.Dec, erro } }() + members := v.Participants() + membersKeys := v.ParticipantsKeyBytes() + if len(members) != len(membersKeys) { + return nil, errors2.New("Participant keys are not matching") + } + for i := range members { - if err := w.FromLibBLSPublicKey(members[i]); err != nil { - return nil, err - } + w := membersKeys[i] if _, didVote := ballot.voters[w]; !didVote && v.ReadBallot(p, members[i]) != nil { ballot.currentTotal = ballot.currentTotal.Add( diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index 7bd0c2fea..71082884c 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -68,6 +68,7 @@ func (p Policy) String() string { // ParticipantTracker .. type ParticipantTracker interface { Participants() []*bls.PublicKey + ParticipantsKeyBytes() []shard.BLSPublicKey IndexOf(*bls.PublicKey) int ParticipantsCount() int64 NextAfter(*bls.PublicKey) (bool, *bls.PublicKey) @@ -145,9 +146,10 @@ type Transition struct { // and values are BLS private key signed signatures type cIdentities struct { // Public keys of the committee including leader and validators - publicKeys []*bls.PublicKey - prepare *votepower.Round - commit *votepower.Round + publicKeys []*bls.PublicKey + publicKeysByte []shard.BLSPublicKey + prepare *votepower.Round + commit *votepower.Round // viewIDSigs: every validator // sign on |viewID|blockHash| in view changing message viewChange *votepower.Round @@ -196,12 +198,20 @@ func (s *cIdentities) Participants() []*bls.PublicKey { return s.publicKeys } +func (s *cIdentities) ParticipantsKeyBytes() []shard.BLSPublicKey { + return s.publicKeysByte +} + func (s *cIdentities) UpdateParticipants(pubKeys []*bls.PublicKey) { + keyBytes := []shard.BLSPublicKey{} for i := range pubKeys { k := shard.BLSPublicKey{} k.FromLibBLSPublicKey(pubKeys[i]) + + keyBytes = append(keyBytes, k) } s.publicKeys = append(pubKeys[:0:0], pubKeys...) + s.publicKeysByte = keyBytes } func (s *cIdentities) ParticipantsCount() int64 { @@ -306,10 +316,11 @@ func (s *cIdentities) ReadAllBallots(p Phase) []*votepower.Ballot { func newBallotsBackedSignatureReader() *cIdentities { return &cIdentities{ - publicKeys: []*bls.PublicKey{}, - prepare: votepower.NewRound(), - commit: votepower.NewRound(), - viewChange: votepower.NewRound(), + publicKeys: []*bls.PublicKey{}, + publicKeysByte: []shard.BLSPublicKey{}, + prepare: votepower.NewRound(), + commit: votepower.NewRound(), + viewChange: votepower.NewRound(), } } diff --git a/node/node_cross_link.go b/node/node_cross_link.go index d14048aab..98852b25d 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -1,22 +1,32 @@ package node import ( + "fmt" + "math/big" + "time" + + common2 "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/consensus/quorum" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/shard" "github.com/harmony-one/harmony/staking/verify" "github.com/pkg/errors" + "golang.org/x/sync/singleflight" ) const ( maxPendingCrossLinkSize = 1000 - crossLinkBatchSize = 10 + crossLinkBatchSize = 3 ) var ( errAlreadyExist = errors.New("crosslink already exist") + deciderCache singleflight.Group + committeeCache singleflight.Group ) // VerifyBlockCrossLinks verifies the cross links of the block @@ -63,6 +73,11 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { return } + existingCLs := map[common2.Hash]struct{}{} + for _, pending := range pendingCLs { + existingCLs[pending.Hash()] = struct{}{} + } + crosslinks := []types.CrossLink{} if err := rlp.DecodeBytes(msgPayload, &crosslinks); err != nil { utils.Logger().Error(). @@ -79,6 +94,14 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { if i > crossLinkBatchSize { break } + + if _, ok := existingCLs[cl.Hash()]; ok { + utils.Logger().Err(err). + Msgf("[ProcessingCrossLink] Cross Link already exists in pending queue, pass. Beacon Epoch: %d, Block num: %d, Epoch: %d, shardID %d", + node.Blockchain().CurrentHeader().Epoch(), cl.Number(), cl.Epoch(), cl.ShardID()) + continue + } + exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64()) if err == nil && exist != nil { utils.Logger().Err(err). @@ -122,19 +145,6 @@ func (node *Node) VerifyCrossLink(cl types.CrossLink) error { ) } - // Verify signature of the new cross link header - // TODO: check whether to recalculate shard state - shardState, err := node.Blockchain().ReadShardState(cl.Epoch()) - if err != nil { - return err - } - - committee, err := shardState.FindCommitteeByID(cl.ShardID()) - - if err != nil { - return err - } - aggSig := &bls.Sign{} sig := cl.Signature() if err := aggSig.Deserialize(sig[:]); err != nil { @@ -144,7 +154,86 @@ func (node *Node) VerifyCrossLink(cl types.CrossLink) error { ) } + committee, err := node.lookupCommittee(cl.Epoch(), cl.ShardID()) + if err != nil { + return err + } + decider, err := node.lookupDecider(cl.Epoch(), cl.ShardID()) + if err != nil { + return err + } + return verify.AggregateSigForCommittee( - node.Blockchain(), committee, aggSig, cl.Hash(), cl.BlockNum(), cl.ViewID().Uint64(), cl.Epoch(), cl.Bitmap(), + node.Blockchain(), committee, decider, aggSig, cl.Hash(), cl.BlockNum(), cl.ViewID().Uint64(), cl.Epoch(), cl.Bitmap(), ) } + +func (node *Node) lookupDecider( + epoch *big.Int, shardID uint32, +) (quorum.Decider, error) { + + key := fmt.Sprintf("decider-%d-%d", epoch.Uint64(), shardID) + result, err, _ := deciderCache.Do( + key, func() (interface{}, error) { + + committee, err := node.lookupCommittee(epoch, shardID) + if err != nil { + return nil, err + } + + decider := quorum.NewDecider( + quorum.SuperMajorityStake, committee.ShardID, + ) + + decider.SetMyPublicKeyProvider(func() (*multibls.PublicKey, error) { + return nil, nil + }) + + if _, err := decider.SetVoters(committee, epoch); err != nil { + return nil, err + } + + go func() { + time.Sleep(120 * time.Minute) + deciderCache.Forget(key) + }() + return decider, nil + }, + ) + if err != nil { + return nil, err + } + + return result.(quorum.Decider), nil +} + +func (node *Node) lookupCommittee( + epoch *big.Int, shardID uint32, +) (*shard.Committee, error) { + + key := fmt.Sprintf("committee-%d-%d", epoch.Uint64(), shardID) + result, err, _ := committeeCache.Do( + key, func() (interface{}, error) { + shardState, err := node.Blockchain().ReadShardState(epoch) + if err != nil { + return nil, err + } + + committee, err := shardState.FindCommitteeByID(shardID) + if err != nil { + return nil, err + } + + go func() { + time.Sleep(120 * time.Minute) + committeeCache.Forget(key) + }() + return committee, nil + }, + ) + if err != nil { + return nil, err + } + + return result.(*shard.Committee), nil +} diff --git a/node/node_newblock.go b/node/node_newblock.go index dcbce6a66..5e2b862e3 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -186,12 +186,15 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { AnErr("[proposeNewBlock] pending crosslink is already committed onchain", err) continue } - if err := node.VerifyCrossLink(pending); err != nil { - invalidToDelete = append(invalidToDelete, pending) + + // Crosslink is already verified before it's accepted to pending, + // no need to verify again in proposal. + if !node.Blockchain().Config().IsCrossLink(pending.Epoch()) { utils.Logger().Debug(). - AnErr("[proposeNewBlock] pending crosslink verification failed", err) + AnErr("[proposeNewBlock] pending crosslink that's before crosslink epoch", err) continue } + crossLinksToPropose = append(crossLinksToPropose, pending) } utils.Logger().Debug(). diff --git a/staking/verify/verify.go b/staking/verify/verify.go index 642873a47..64d869d4c 100644 --- a/staking/verify/verify.go +++ b/staking/verify/verify.go @@ -9,7 +9,6 @@ import ( "github.com/harmony-one/harmony/consensus/signature" "github.com/harmony-one/harmony/core" bls_cosi "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/shard" "github.com/pkg/errors" ) @@ -23,6 +22,7 @@ var ( func AggregateSigForCommittee( chain *core.BlockChain, committee *shard.Committee, + decider quorum.Decider, aggSignature *bls.Sign, hash common.Hash, blockNum, viewID uint64, @@ -41,15 +41,6 @@ func AggregateSigForCommittee( return err } - decider := quorum.NewDecider( - quorum.SuperMajorityStake, committee.ShardID, - ) - decider.SetMyPublicKeyProvider(func() (*multibls.PublicKey, error) { - return nil, nil - }) - if _, err := decider.SetVoters(committee, epoch); err != nil { - return err - } if !decider.IsQuorumAchievedByMask(mask) { return errQuorumVerifyAggSign } From afa6ce3d5fa23b4b0a624a60a6f9637587830910 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Sun, 17 May 2020 12:51:39 -0700 Subject: [PATCH 15/35] make crosslink broadcast smarter and more efficient (#3036) --- node/node_cross_link.go | 2 +- node/node_handler.go | 45 ++++++++++++++++++++++++++++++----------- node/node_syncing.go | 4 ++++ 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/node/node_cross_link.go b/node/node_cross_link.go index 98852b25d..f0fdff560 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -91,7 +91,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks)) for i, cl := range crosslinks { - if i > crossLinkBatchSize { + if i > crossLinkBatchSize*10 { // A sanity check to prevent spamming break } diff --git a/node/node_handler.go b/node/node_handler.go index fbcfde02e..ea6b70a74 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -236,7 +236,18 @@ func (node *Node) BroadcastSlash(witness *slash.Record) { // BroadcastCrossLink is called by consensus leader to // send the new header as cross link to beacon chain. -func (node *Node) BroadcastCrossLink(newBlock *types.Block) { +func (node *Node) BroadcastCrossLink() { + curBlock := node.Blockchain().CurrentBlock() + if curBlock == nil { + return + } + + if node.NodeConfig.ShardID == shard.BeaconChainShardID || + !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { + // no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch + return + } + // no point to broadcast the crosslink if we aren't even in the right epoch yet if !node.Blockchain().Config().IsCrossLink( node.Blockchain().CurrentHeader().Epoch(), @@ -249,36 +260,50 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) { nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID), ) headers := []*block.Header{} - lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID()) + lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID()) var latestBlockNum uint64 // TODO chao: record the missing crosslink in local database instead of using latest crosslink // if cannot find latest crosslink, broadcast latest 3 block headers if err != nil { utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed") - header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2) + header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - header = node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 1) + header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - headers = append(headers, newBlock.Header()) + headers = append(headers, curBlock.Header()) } else { latestBlockNum = lastLink.BlockNum() - for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ { + + batchSize := crossLinkBatchSize + diff := curBlock.Number().Uint64() - latestBlockNum + + if diff > 100 { + // Increase batch size by 1 for every 100 blocks beyond + batchSize += int(diff-100) / 100 + } + + // Cap at a sane size to avoid overload network + if batchSize > crossLinkBatchSize*10 { + batchSize = crossLinkBatchSize * 10 + } + + for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ { header := node.Blockchain().GetHeaderByNumber(blockNum) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) - if len(headers) == crossLinkBatchSize { + if len(headers) == batchSize { break } } } } - utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) + utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, curBlock.NumberU64(), len(headers)) for _, header := range headers { utils.Logger().Debug().Msgf( "[BroadcastCrossLink] Broadcasting %d", @@ -419,10 +444,6 @@ func (node *Node) PostConsensusProcessing( if node.NodeConfig.ShardID == shard.BeaconChainShardID { node.BroadcastNewBlock(newBlock) } - if node.NodeConfig.ShardID != shard.BeaconChainShardID && - node.Blockchain().Config().IsCrossLink(newBlock.Epoch()) { - node.BroadcastCrossLink(newBlock) - } node.BroadcastCXReceipts(newBlock) } else { if node.Consensus.Mode() != consensus.Listening { diff --git a/node/node_syncing.go b/node/node_syncing.go index 641e27b11..c991c7b0b 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -168,6 +168,10 @@ func (node *Node) DoBeaconSyncing() { ) if err != nil { node.beaconSync.AddLastMileBlock(beaconBlock) + if node.Consensus.IsLeader() { + // Only leader broadcast crosslink to avoid spamming p2p + node.BroadcastCrossLink() + } } } } From 33ea179b60b51c5ad66b84065f6aec01bf6bfc1b Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 18 May 2020 10:48:14 -0700 Subject: [PATCH 16/35] Revert "make crosslink broadcast smarter and more efficient (#3036)" This reverts commit afa6ce3d5fa23b4b0a624a60a6f9637587830910. --- node/node_cross_link.go | 2 +- node/node_handler.go | 45 +++++++++++------------------------------ node/node_syncing.go | 4 ---- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/node/node_cross_link.go b/node/node_cross_link.go index f0fdff560..98852b25d 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -91,7 +91,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks)) for i, cl := range crosslinks { - if i > crossLinkBatchSize*10 { // A sanity check to prevent spamming + if i > crossLinkBatchSize { break } diff --git a/node/node_handler.go b/node/node_handler.go index ea6b70a74..fbcfde02e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -236,18 +236,7 @@ func (node *Node) BroadcastSlash(witness *slash.Record) { // BroadcastCrossLink is called by consensus leader to // send the new header as cross link to beacon chain. -func (node *Node) BroadcastCrossLink() { - curBlock := node.Blockchain().CurrentBlock() - if curBlock == nil { - return - } - - if node.NodeConfig.ShardID == shard.BeaconChainShardID || - !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { - // no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch - return - } - +func (node *Node) BroadcastCrossLink(newBlock *types.Block) { // no point to broadcast the crosslink if we aren't even in the right epoch yet if !node.Blockchain().Config().IsCrossLink( node.Blockchain().CurrentHeader().Epoch(), @@ -260,50 +249,36 @@ func (node *Node) BroadcastCrossLink() { nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID), ) headers := []*block.Header{} - lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID()) + lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID()) var latestBlockNum uint64 // TODO chao: record the missing crosslink in local database instead of using latest crosslink // if cannot find latest crosslink, broadcast latest 3 block headers if err != nil { utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed") - header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2) + header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1) + header = node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 1) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - headers = append(headers, curBlock.Header()) + headers = append(headers, newBlock.Header()) } else { latestBlockNum = lastLink.BlockNum() - - batchSize := crossLinkBatchSize - diff := curBlock.Number().Uint64() - latestBlockNum - - if diff > 100 { - // Increase batch size by 1 for every 100 blocks beyond - batchSize += int(diff-100) / 100 - } - - // Cap at a sane size to avoid overload network - if batchSize > crossLinkBatchSize*10 { - batchSize = crossLinkBatchSize * 10 - } - - for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ { + for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ { header := node.Blockchain().GetHeaderByNumber(blockNum) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) - if len(headers) == batchSize { + if len(headers) == crossLinkBatchSize { break } } } } - utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, curBlock.NumberU64(), len(headers)) + utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) for _, header := range headers { utils.Logger().Debug().Msgf( "[BroadcastCrossLink] Broadcasting %d", @@ -444,6 +419,10 @@ func (node *Node) PostConsensusProcessing( if node.NodeConfig.ShardID == shard.BeaconChainShardID { node.BroadcastNewBlock(newBlock) } + if node.NodeConfig.ShardID != shard.BeaconChainShardID && + node.Blockchain().Config().IsCrossLink(newBlock.Epoch()) { + node.BroadcastCrossLink(newBlock) + } node.BroadcastCXReceipts(newBlock) } else { if node.Consensus.Mode() != consensus.Listening { diff --git a/node/node_syncing.go b/node/node_syncing.go index c991c7b0b..641e27b11 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -168,10 +168,6 @@ func (node *Node) DoBeaconSyncing() { ) if err != nil { node.beaconSync.AddLastMileBlock(beaconBlock) - if node.Consensus.IsLeader() { - // Only leader broadcast crosslink to avoid spamming p2p - node.BroadcastCrossLink() - } } } } From cdf98123c2b49944de753a29809df6492dcd214e Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 May 2020 11:07:06 -0700 Subject: [PATCH 17/35] Revert "Revert "make crosslink broadcast smarter and more efficient (#3036)"" This reverts commit 33ea179b60b51c5ad66b84065f6aec01bf6bfc1b. --- node/node_cross_link.go | 2 +- node/node_handler.go | 45 ++++++++++++++++++++++++++++++----------- node/node_syncing.go | 4 ++++ 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/node/node_cross_link.go b/node/node_cross_link.go index 98852b25d..f0fdff560 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -91,7 +91,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks)) for i, cl := range crosslinks { - if i > crossLinkBatchSize { + if i > crossLinkBatchSize*10 { // A sanity check to prevent spamming break } diff --git a/node/node_handler.go b/node/node_handler.go index fbcfde02e..ea6b70a74 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -236,7 +236,18 @@ func (node *Node) BroadcastSlash(witness *slash.Record) { // BroadcastCrossLink is called by consensus leader to // send the new header as cross link to beacon chain. -func (node *Node) BroadcastCrossLink(newBlock *types.Block) { +func (node *Node) BroadcastCrossLink() { + curBlock := node.Blockchain().CurrentBlock() + if curBlock == nil { + return + } + + if node.NodeConfig.ShardID == shard.BeaconChainShardID || + !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { + // no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch + return + } + // no point to broadcast the crosslink if we aren't even in the right epoch yet if !node.Blockchain().Config().IsCrossLink( node.Blockchain().CurrentHeader().Epoch(), @@ -249,36 +260,50 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) { nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID), ) headers := []*block.Header{} - lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID()) + lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID()) var latestBlockNum uint64 // TODO chao: record the missing crosslink in local database instead of using latest crosslink // if cannot find latest crosslink, broadcast latest 3 block headers if err != nil { utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed") - header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2) + header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - header = node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 1) + header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - headers = append(headers, newBlock.Header()) + headers = append(headers, curBlock.Header()) } else { latestBlockNum = lastLink.BlockNum() - for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ { + + batchSize := crossLinkBatchSize + diff := curBlock.Number().Uint64() - latestBlockNum + + if diff > 100 { + // Increase batch size by 1 for every 100 blocks beyond + batchSize += int(diff-100) / 100 + } + + // Cap at a sane size to avoid overload network + if batchSize > crossLinkBatchSize*10 { + batchSize = crossLinkBatchSize * 10 + } + + for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ { header := node.Blockchain().GetHeaderByNumber(blockNum) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) - if len(headers) == crossLinkBatchSize { + if len(headers) == batchSize { break } } } } - utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) + utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, curBlock.NumberU64(), len(headers)) for _, header := range headers { utils.Logger().Debug().Msgf( "[BroadcastCrossLink] Broadcasting %d", @@ -419,10 +444,6 @@ func (node *Node) PostConsensusProcessing( if node.NodeConfig.ShardID == shard.BeaconChainShardID { node.BroadcastNewBlock(newBlock) } - if node.NodeConfig.ShardID != shard.BeaconChainShardID && - node.Blockchain().Config().IsCrossLink(newBlock.Epoch()) { - node.BroadcastCrossLink(newBlock) - } node.BroadcastCXReceipts(newBlock) } else { if node.Consensus.Mode() != consensus.Listening { diff --git a/node/node_syncing.go b/node/node_syncing.go index 641e27b11..c991c7b0b 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -168,6 +168,10 @@ func (node *Node) DoBeaconSyncing() { ) if err != nil { node.beaconSync.AddLastMileBlock(beaconBlock) + if node.Consensus.IsLeader() { + // Only leader broadcast crosslink to avoid spamming p2p + node.BroadcastCrossLink() + } } } } From dfd5a9f09e9451e619629c094ef58feb322297b8 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 May 2020 11:09:40 -0700 Subject: [PATCH 18/35] adjust catch up speed to be a sane number --- node/node_cross_link.go | 2 +- node/node_handler.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/node/node_cross_link.go b/node/node_cross_link.go index f0fdff560..0d076524e 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -91,7 +91,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks)) for i, cl := range crosslinks { - if i > crossLinkBatchSize*10 { // A sanity check to prevent spamming + if i > crossLinkBatchSize*2 { // A sanity check to prevent spamming break } diff --git a/node/node_handler.go b/node/node_handler.go index ea6b70a74..98224c95e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -288,8 +288,8 @@ func (node *Node) BroadcastCrossLink() { } // Cap at a sane size to avoid overload network - if batchSize > crossLinkBatchSize*10 { - batchSize = crossLinkBatchSize * 10 + if batchSize > crossLinkBatchSize*2 { + batchSize = crossLinkBatchSize * 2 } for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ { From 9663128d0a883c3f5346789757cfc0347c9a7c65 Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Mon, 18 May 2020 12:05:38 -0700 Subject: [PATCH 19/35] Fix crosslink broadcast condition (#3041) --- node/node_syncing.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/node/node_syncing.go b/node/node_syncing.go index c991c7b0b..f66350cac 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -168,10 +168,9 @@ func (node *Node) DoBeaconSyncing() { ) if err != nil { node.beaconSync.AddLastMileBlock(beaconBlock) - if node.Consensus.IsLeader() { - // Only leader broadcast crosslink to avoid spamming p2p - node.BroadcastCrossLink() - } + } else if node.Consensus.IsLeader() { + // Only leader broadcast crosslink to avoid spamming p2p + node.BroadcastCrossLink() } } } From c0d84f7a30193120f2e6c76bde23570e754ee7ad Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Mon, 18 May 2020 16:05:13 -0700 Subject: [PATCH 20/35] use snapshot total delegation instead of current total delegation --- staking/apr/compute.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/staking/apr/compute.go b/staking/apr/compute.go index 5b016eedd..8afd293e2 100644 --- a/staking/apr/compute.go +++ b/staking/apr/compute.go @@ -20,6 +20,8 @@ var ( ErrInsufficientEpoch = errors.New("insufficient past epochs to compute apr") // ErrCouldNotRetreiveHeaderByNumber is returned when fail to retrieve header by number ErrCouldNotRetreiveHeaderByNumber = errors.New("could not retrieve header by number") + // ErrZeroStakeOneEpochAgo is returned when total delegation is zero for one epoch ago + ErrZeroStakeOneEpochAgo = errors.New("zero total delegation one epoch ago") ) // Reader .. @@ -132,7 +134,16 @@ func ComputeForValidator( return &zero, nil } - total := numeric.NewDecFromBigInt(validatorNow.TotalDelegation()) + total := numeric.NewDecFromBigInt(oneSnapshotAgo.Validator.TotalDelegation()) + if total.IsZero() { + return nil, errors.Wrapf( + ErrZeroStakeOneEpochAgo, + "current epoch %d, one-epoch-ago %d", + block.Epoch().Uint64(), + oneEpochAgo.Uint64(), + ) + } + result := numeric.NewDecFromBigInt(estimatedRewardPerYear).Quo( total, ) From 8d76969579086a8732b59191f214fa959a5b2837 Mon Sep 17 00:00:00 2001 From: Daniel Van Der Maden Date: Mon, 18 May 2020 20:15:50 -0700 Subject: [PATCH 21/35] [t3] Tx pool error report fix & local build fix (#3047) * [tx_list] Add transaction after you can fetch tx cost * [tx_pool] Add error sink reports for removed tx * Correct log levels for known transactions * Remove redundant hash to `enqueue` and `add` signatures * Update tests for signature changes * [build] Fix local build for MacOs * [tx_pool] Correct typo & logging level for known tx * [tx_pool] Fix for go imports --- Makefile | 2 +- core/tx_list.go | 2 +- core/tx_pool.go | 96 ++++++++++++++++++++++++---------- core/tx_pool_test.go | 26 ++++----- scripts/go_executable_build.sh | 2 +- test/deploy.sh | 2 +- 6 files changed, 85 insertions(+), 45 deletions(-) diff --git a/Makefile b/Makefile index db3dff4f2..0bac63457 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ export GO111MODULE:=on .PHONY: all libs exe test all: libs - ./scripts/go_executable_build.sh + ./scripts/go_executable_build.sh -S libs: make -C $(TOP)/mcl -j8 diff --git a/core/tx_list.go b/core/tx_list.go index b35d049cf..0aa78f841 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -262,11 +262,11 @@ func (l *txList) Add(tx types.PoolTransaction, priceBump uint64) (bool, types.Po } } // Otherwise overwrite the old transaction with the current one - l.txs.Put(tx) cost, err := tx.Cost() if err != nil { return false, nil } + l.txs.Put(tx) if l.costcap.Cmp(cost) < 0 { l.costcap = cost } diff --git a/core/tx_pool.go b/core/tx_pool.go index 1cee48a92..f06127e15 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -369,8 +369,13 @@ func (pool *TxPool) loop() { } // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { + b32addr, err := hmyCommon.AddressToBech32(addr) + if err != nil { + b32addr = "unknown" + } for _, tx := range pool.queue[addr].Flatten() { pool.removeTx(tx.Hash(), true) + pool.txErrorSink.Add(tx, fmt.Errorf("removed transaction for inactive account %v", b32addr)) } } } @@ -557,6 +562,8 @@ func (pool *TxPool) SetGasPrice(price *big.Int) { pool.gasPrice = price for _, tx := range pool.priced.Cap(price, pool.locals) { pool.removeTx(tx.Hash(), false) + pool.txErrorSink.Add(tx, + fmt.Errorf("dropped transaction below new gas price threshold of %v", price.String())) } utils.Logger().Info().Str("price", price.String()).Msg("Transaction pool price threshold updated") } @@ -868,7 +875,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { - logger.Warn().Str("hash", hash.Hex()).Msg("Discarding already known transaction") + logger.Info().Str("hash", hash.Hex()).Msg("Discarding already known transaction") return false, errors.WithMessagef(ErrKnownTransaction, "transaction hash %x", hash) } // If the transaction fails basic validation, discard it @@ -893,12 +900,16 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { // New transaction is better than our worse ones, make room for it drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { + gasPrice := new(big.Float).SetInt64(tx.GasPrice().Int64()) + gasPrice = gasPrice.Mul(gasPrice, new(big.Float).SetFloat64(1e-9)) // Gas-price is in Nano + pool.removeTx(tx.Hash(), false) + underpricedTxCounter.Inc(1) + pool.txErrorSink.Add(tx, + errors.WithMessagef(ErrUnderpriced, "transaction gas-price is %.18f ONE in full transaction pool", gasPrice)) logger.Warn(). Str("hash", tx.Hash().Hex()). Str("price", tx.GasPrice().String()). Msg("Discarding freshly underpriced transaction") - underpricedTxCounter.Inc(1) - pool.removeTx(tx.Hash(), false) } } // If the transaction is replacing an already pending one, do directly @@ -915,15 +926,23 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { pool.all.Remove(old.Hash()) pool.priced.Removed() pendingReplaceCounter.Inc(1) + pool.txErrorSink.Add(old, + fmt.Errorf("replaced transaction, new transaction %v has same nonce & higher price", tx.Hash().String())) + logger.Info(). + Str("hash", old.Hash().String()). + Str("new-tx-hash", tx.Hash().String()). + Str("price", old.GasPrice().String()). + Msg("Replaced transaction") } pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) - logger.Info(). + logger.Warn(). Str("hash", tx.Hash().Hex()). Interface("from", from). Interface("to", tx.To()). + Str("price", tx.GasPrice().String()). Msg("Pooled new executable transaction") // We've directly injected a replacement transaction, notify subsystems @@ -932,7 +951,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { return old != nil, nil } // New transaction isn't replacing a pending one, push into queue - replace, err := pool.enqueueTx(hash, tx) + replace, err := pool.enqueueTx(tx) if err != nil { return false, err } @@ -945,7 +964,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { } pool.journalTx(from, tx) - logger.Info(). + logger.Warn(). Str("hash", hash.Hex()). Interface("from", from). Interface("to", tx.To()). @@ -956,7 +975,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) enqueueTx(hash common.Hash, tx types.PoolTransaction) (bool, error) { +func (pool *TxPool) enqueueTx(tx types.PoolTransaction) (bool, error) { // Try to insert the transaction into the future queue from, _ := tx.SenderAddress() // already validated if pool.queue[from] == nil { @@ -973,8 +992,15 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx types.PoolTransaction) (bool, pool.all.Remove(old.Hash()) pool.priced.Removed() queuedReplaceCounter.Inc(1) - } - if pool.all.Get(hash) == nil { + pool.txErrorSink.Add(old, + fmt.Errorf("replaced enqueued non-executable transaction, new transaction %v has same nonce & higher price", tx.Hash().String())) + utils.Logger().Info(). + Str("hash", old.Hash().String()). + Str("new-tx-hash", tx.Hash().String()). + Str("price", old.GasPrice().String()). + Msg("Replaced enqueued non-executable transaction") + } + if pool.all.Get(tx.Hash()) == nil { pool.all.Add(tx) pool.priced.Put(tx) } @@ -997,7 +1023,7 @@ func (pool *TxPool) journalTx(from common.Address, tx types.PoolTransaction) { // and returns whether it was inserted or an older was better. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx types.PoolTransaction) bool { +func (pool *TxPool) promoteTx(addr common.Address, tx types.PoolTransaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { pool.pending[addr] = newTxList(true) @@ -1007,21 +1033,29 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx types.Po inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this - pool.all.Remove(hash) + pool.all.Remove(tx.Hash()) pool.priced.Removed() - pendingDiscardCounter.Inc(1) + pool.txErrorSink.Add(tx, fmt.Errorf("could not promote to executable")) + utils.Logger().Info(). + Str("hash", tx.Hash().String()). + Msg("Could not promote to executable") return false } // Otherwise discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed() - pendingReplaceCounter.Inc(1) + pool.txErrorSink.Add(old, + fmt.Errorf("did not promote to executable, existing transaction %v has same nonce & higher price", tx.Hash().String())) + utils.Logger().Info(). + Str("hash", old.Hash().String()). + Str("existing-tx-hash", tx.Hash().String()). + Msg("Did not promote to executable, new transaction has higher price") } // Failsafe to work around direct pending inserts (tests) - if pool.all.Get(hash) == nil { + if pool.all.Get(tx.Hash()) == nil { pool.all.Add(tx) pool.priced.Put(tx) } @@ -1069,6 +1103,7 @@ func (pool *TxPool) addTx(tx types.PoolTransaction, local bool) error { replace, err := pool.add(tx, local) if err != nil { errCause := errors.Cause(err) + // Ignore known transaction for tx rebroadcast case. if errCause != ErrKnownTransaction { pool.txErrorSink.Add(tx, err) } @@ -1104,6 +1139,7 @@ func (pool *TxPool) addTxsLocked(txs types.PoolTransactions, local bool) []error dirty[from] = struct{}{} } errCause := errors.Cause(err) + // Ignore known transaction for tx rebroadcast case. if err != nil && errCause != ErrKnownTransaction { pool.txErrorSink.Add(tx, err) } @@ -1173,7 +1209,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } // Postpone any invalidated transactions for _, tx := range invalids { - if _, err := pool.enqueueTx(tx.Hash(), tx); err != nil { + if _, err := pool.enqueueTx(tx); err != nil { pool.txErrorSink.Add(tx, err) } } @@ -1220,24 +1256,26 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { nonce := pool.currentState.GetNonce(addr) for _, tx := range list.Forward(nonce) { hash := tx.Hash() - logger.Info().Str("hash", hash.Hex()).Msg("Removed old queued transaction") pool.all.Remove(hash) pool.priced.Removed() + logger.Info().Str("hash", hash.Hex()).Msg("Removed old queued transaction") + // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. } // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() - logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction") pool.all.Remove(hash) pool.priced.Removed() queuedNofundsCounter.Inc(1) + pool.txErrorSink.Add(tx, fmt.Errorf("removed unpayable queued transaction")) + logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable queued transaction") } // Gather all executable transactions and promote them for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() - if pool.promoteTx(addr, hash, tx) { - logger.Info().Str("hash", hash.Hex()).Msg("Promoting queued transaction") + if pool.promoteTx(addr, tx) { + logger.Warn().Str("hash", hash.Hex()).Msg("Promoting queued transaction") promoted = append(promoted, tx) } } @@ -1245,11 +1283,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() - logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction") - pool.txErrorSink.Add(tx, fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String())) pool.all.Remove(hash) pool.priced.Removed() queuedRateLimitCounter.Inc(1) + pool.txErrorSink.Add(tx, fmt.Errorf("exceeds cap for queued transactions for account %s", addr.String())) + logger.Warn().Str("hash", hash.Hex()).Msg("Removed cap-exceeding queued transaction") } } // Delete the entire queue entry if it became empty. @@ -1295,9 +1333,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for _, tx := range list.Cap(list.Len() - 1) { // Drop the transaction from the global pools too hash := tx.Hash() - pool.txErrorSink.Add(tx, fmt.Errorf("fairness-exceeding pending transaction")) pool.all.Remove(hash) pool.priced.Removed() + pool.txErrorSink.Add(tx, fmt.Errorf("fairness-exceeding pending transaction")) // Update the account nonce to the dropped transaction if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { @@ -1318,9 +1356,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for _, tx := range list.Cap(list.Len() - 1) { // Drop the transaction from the global pools too hash := tx.Hash() - pool.txErrorSink.Add(tx, fmt.Errorf("fairness-exceeding pending transaction")) pool.all.Remove(hash) pool.priced.Removed() + pool.txErrorSink.Add(tx, fmt.Errorf("fairness-exceeding pending transaction")) // Update the account nonce to the dropped transaction if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { @@ -1391,23 +1429,25 @@ func (pool *TxPool) demoteUnexecutables() { // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(nonce) { hash := tx.Hash() - logger.Info().Str("hash", hash.Hex()).Msg("Removed old pending transaction") pool.all.Remove(hash) pool.priced.Removed() + logger.Info().Str("hash", hash.Hex()).Msg("Removed old pending transaction") + // Do not report to error sink as old txs are on chain or meaningful error caught elsewhere. } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() - logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable pending transaction") pool.all.Remove(hash) pool.priced.Removed() pendingNofundsCounter.Inc(1) + pool.txErrorSink.Add(tx, fmt.Errorf("removed unpayable pending transaction")) + logger.Warn().Str("hash", hash.Hex()).Msg("Removed unpayable pending transaction") } for _, tx := range invalids { hash := tx.Hash() - logger.Info().Str("hash", hash.Hex()).Msg("Demoting pending transaction") - if _, err := pool.enqueueTx(hash, tx); err != nil { + logger.Warn().Str("hash", hash.Hex()).Msg("Demoting pending transaction") + if _, err := pool.enqueueTx(tx); err != nil { pool.txErrorSink.Add(tx, err) } } @@ -1416,7 +1456,7 @@ func (pool *TxPool) demoteUnexecutables() { for _, tx := range list.Cap(0) { hash := tx.Hash() logger.Error().Str("hash", hash.Hex()).Msg("Demoting invalidated transaction") - if _, err := pool.enqueueTx(hash, tx); err != nil { + if _, err := pool.enqueueTx(tx); err != nil { pool.txErrorSink.Add(tx, err) } } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f7a478b91..55a999a1d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -505,7 +505,7 @@ func TestTransactionQueue(t *testing.T) { from, _ := deriveSender(tx) pool.currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset(nil, nil) - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx) pool.promoteExecutables([]common.Address{from}) if len(pool.pending) != 1 { @@ -515,7 +515,7 @@ func TestTransactionQueue(t *testing.T) { tx = transaction(0, 1, 100, key) from, _ = deriveSender(tx) pool.currentState.SetNonce(from, 2) - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx) pool.promoteExecutables([]common.Address{from}) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") @@ -535,9 +535,9 @@ func TestTransactionQueue(t *testing.T) { pool.currentState.AddBalance(from, big.NewInt(1000)) pool.lockedReset(nil, nil) - pool.enqueueTx(tx.Hash(), tx1) - pool.enqueueTx(tx.Hash(), tx2) - pool.enqueueTx(tx.Hash(), tx3) + pool.enqueueTx(tx1) + pool.enqueueTx(tx2) + pool.enqueueTx(tx3) pool.promoteExecutables([]common.Address{from}) @@ -718,12 +718,12 @@ func TestTransactionDropping(t *testing.T) { tx11 = transaction(0, 11, 200, key) tx12 = transaction(0, 12, 300, key) ) - pool.promoteTx(account, tx0.Hash(), tx0) - pool.promoteTx(account, tx1.Hash(), tx1) - pool.promoteTx(account, tx2.Hash(), tx2) - pool.enqueueTx(tx10.Hash(), tx10) - pool.enqueueTx(tx11.Hash(), tx11) - pool.enqueueTx(tx12.Hash(), tx12) + pool.promoteTx(account, tx0) + pool.promoteTx(account, tx1) + pool.promoteTx(account, tx2) + pool.enqueueTx(tx10) + pool.enqueueTx(tx11) + pool.enqueueTx(tx12) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { @@ -1524,7 +1524,7 @@ func benchmarkPendingDemotion(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(0, uint64(i), 100000, key) - pool.promoteTx(account, tx.Hash(), tx) + pool.promoteTx(account, tx) } // Benchmark the speed of pool validation b.ResetTimer() @@ -1549,7 +1549,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(0, uint64(1+i), 100000, key) - pool.enqueueTx(tx.Hash(), tx) + pool.enqueueTx(tx) } // Benchmark the speed of pool validation b.ResetTimer() diff --git a/scripts/go_executable_build.sh b/scripts/go_executable_build.sh index cef44c7ba..d42e0c513 100755 --- a/scripts/go_executable_build.sh +++ b/scripts/go_executable_build.sh @@ -244,7 +244,7 @@ function release ################################ MAIN FUNCTION ############################## -while getopts "hp:a:o:b:f:rtvsd" option; do +while getopts "hp:a:o:b:f:rtvsdS" option; do case $option in h) usage ;; p) PROFILE=$OPTARG ;; diff --git a/test/deploy.sh b/test/deploy.sh index d1045dcc4..c1d33d3e8 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -126,7 +126,7 @@ cleanup # Also it's recommended to use `go build` for testing the whole exe. if [ "${NOBUILD}" != "true" ]; then pushd $ROOT - scripts/go_executable_build.sh + scripts/go_executable_build.sh -S popd fi From cc965edba48fab52ddd059b03be79771b8640430 Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Mon, 18 May 2020 20:36:18 -0700 Subject: [PATCH 22/35] should not use snapshot from last epoch, use current epoch snapshot (#3049) Co-authored-by: Rongjian Lan --- staking/apr/compute.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/staking/apr/compute.go b/staking/apr/compute.go index 8afd293e2..2d5f21c37 100644 --- a/staking/apr/compute.go +++ b/staking/apr/compute.go @@ -48,12 +48,12 @@ var ( func expectedRewardPerYear( now, oneEpochAgo *block.Header, - curValidator, snapshotLastEpoch *staking.ValidatorWrapper, + wrapper, snapshot *staking.ValidatorWrapper, ) (*big.Int, error) { timeNow, oneTAgo := now.Time(), oneEpochAgo.Time() diffTime, diffReward := new(big.Int).Sub(timeNow, oneTAgo), - new(big.Int).Sub(curValidator.BlockReward, snapshotLastEpoch.BlockReward) + new(big.Int).Sub(wrapper.BlockReward, snapshot.BlockReward) // impossibility but keep sane if diffTime.Sign() == -1 { @@ -66,7 +66,7 @@ func expectedRewardPerYear( // TODO some more sanity checks of some sort? expectedValue := new(big.Int).Div(diffReward, diffTime) expectedPerYear := new(big.Int).Mul(expectedValue, oneYear) - utils.Logger().Info().Interface("now", curValidator).Interface("before", snapshotLastEpoch). + utils.Logger().Info().Interface("now", wrapper).Interface("before", snapshot). Uint64("diff-reward", diffReward.Uint64()). Uint64("diff-time", diffTime.Uint64()). Interface("expected-value", expectedValue). @@ -79,7 +79,7 @@ func expectedRewardPerYear( func ComputeForValidator( bc Reader, block *types.Block, - validatorNow *staking.ValidatorWrapper, + wrapper *staking.ValidatorWrapper, ) (*numeric.Dec, error) { oneEpochAgo, zero := new(big.Int).Sub(block.Epoch(), common.Big1), @@ -90,9 +90,9 @@ func ComputeForValidator( Uint64("one-epoch-ago", oneEpochAgo.Uint64()). Msg("apr - begin compute for validator ") - oneSnapshotAgo, err := bc.ReadValidatorSnapshotAtEpoch( - oneEpochAgo, - validatorNow.Address, + snapshot, err := bc.ReadValidatorSnapshotAtEpoch( + block.Epoch(), + wrapper.Address, ) if err != nil { @@ -123,7 +123,7 @@ func ComputeForValidator( estimatedRewardPerYear, err := expectedRewardPerYear( block.Header(), headerOneEpochAgo, - validatorNow, oneSnapshotAgo.Validator, + wrapper, snapshot.Validator, ) if err != nil { @@ -134,7 +134,7 @@ func ComputeForValidator( return &zero, nil } - total := numeric.NewDecFromBigInt(oneSnapshotAgo.Validator.TotalDelegation()) + total := numeric.NewDecFromBigInt(snapshot.Validator.TotalDelegation()) if total.IsZero() { return nil, errors.Wrapf( ErrZeroStakeOneEpochAgo, From 3c6b574b08ed5485575ae5939ddc6c48bff0f59d Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Tue, 19 May 2020 11:32:40 -0700 Subject: [PATCH 23/35] [multi-bls] validator sign and send message in parallel (#3054) --- consensus/validator.go | 87 ++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 37 deletions(-) diff --git a/consensus/validator.go b/consensus/validator.go index c52b291ff..70d2c01d9 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -3,6 +3,7 @@ package consensus import ( "bytes" "encoding/hex" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -58,30 +59,36 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } func (consensus *Consensus) prepare() { + var wg sync.WaitGroup + wg.Add(len(consensus.PubKey.PublicKey)) groupID := []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))} for i, key := range consensus.PubKey.PublicKey { - networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) - if err != nil { - consensus.getLogger().Err(err). - Str("message-type", msg_pb.MessageType_PREPARE.String()). - Msg("could not construct message") - return - } + go func(i int) { + defer wg.Done() + networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) + if err != nil { + consensus.getLogger().Err(err). + Str("message-type", msg_pb.MessageType_PREPARE.String()). + Msg("could not construct message") + return + } - // TODO: this will not return immediatey, may block - if consensus.current.Mode() != Listening { - if err := consensus.msgSender.SendWithoutRetry( - groupID, - p2p.ConstructMessage(networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") - } else { - consensus.getLogger().Info(). - Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). - Msg("[OnAnnounce] Sent Prepare Message!!") + // TODO: this will not return immediatey, may block + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + p2p.ConstructMessage(networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") + } else { + consensus.getLogger().Info(). + Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). + Msg("[OnAnnounce] Sent Prepare Message!!") + } } - } + }(i) } + wg.Wait() consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", FBFTPrepare.String()). @@ -207,27 +214,33 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { groupID := []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), } + var wg sync.WaitGroup + wg.Add(len(consensus.PubKey.PublicKey)) for i, key := range consensus.PubKey.PublicKey { - networkMessage, _ := consensus.construct( - msg_pb.MessageType_COMMIT, - commitPayload, - key, consensus.priKey.PrivateKey[i], - ) - - if consensus.current.Mode() != Listening { - if err := consensus.msgSender.SendWithoutRetry( - groupID, - p2p.ConstructMessage(networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") - } else { - consensus.getLogger().Info(). - Uint64("blockNum", consensus.blockNum). - Hex("blockHash", consensus.blockHash[:]). - Msg("[OnPrepared] Sent Commit Message!!") + go func(i int) { + defer wg.Done() + networkMessage, _ := consensus.construct( + msg_pb.MessageType_COMMIT, + commitPayload, + key, consensus.priKey.PrivateKey[i], + ) + + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + p2p.ConstructMessage(networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") + } else { + consensus.getLogger().Info(). + Uint64("blockNum", consensus.blockNum). + Hex("blockHash", consensus.blockHash[:]). + Msg("[OnPrepared] Sent Commit Message!!") + } } - } + }(i) } + wg.Wait() consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", FBFTCommit.String()). From e85c1f306462c9816494d04c161778b3a6db984a Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Tue, 19 May 2020 20:44:33 -0700 Subject: [PATCH 24/35] Fix log levels (#3058) --- api/service/syncing/downloader/client.go | 2 +- api/service/syncing/syncing.go | 2 +- consensus/consensus_service.go | 2 +- consensus/leader.go | 10 +++++----- consensus/threshold.go | 2 +- consensus/validator.go | 4 ++-- core/offchain.go | 4 +--- node/node.go | 4 +++- node/node_cross_link.go | 4 ++-- node/node_cross_shard.go | 4 ++-- node/node_explorer.go | 8 +++++--- node/node_newblock.go | 6 +++--- 12 files changed, 27 insertions(+), 25 deletions(-) diff --git a/api/service/syncing/downloader/client.go b/api/service/syncing/downloader/client.go index 17c90171c..7e44e857a 100644 --- a/api/service/syncing/downloader/client.go +++ b/api/service/syncing/downloader/client.go @@ -27,7 +27,7 @@ func ClientSetup(ip, port string) *Client { utils.Logger().Error().Err(err).Str("ip", ip).Msg("[SYNC] client.go:ClientSetup fail to dial") return nil } - utils.Logger().Info().Str("ip", ip).Msg("[SYNC] grpc connect successfully") + utils.Logger().Debug().Str("ip", ip).Msg("[SYNC] grpc connect successfully") client.dlClient = pb.NewDownloaderClient(client.conn) return &client } diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index e1fe91e6f..8a04c4503 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -780,7 +780,7 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, isBeac isBeacon, bc.ShardID(), otherHeight, currentHeight) return } - utils.Logger().Debug(). + utils.Logger().Info(). Msgf("[SYNC] Node is OUT OF SYNC (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)", isBeacon, bc.ShardID(), otherHeight, currentHeight) diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index c6047a061..b90851933 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -93,7 +93,7 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys []*bls.PublicKey) int64 { consensus.Decider.UpdateParticipants(pubKeys) utils.Logger().Info().Msg("My Committee updated") for i := range pubKeys { - utils.Logger().Info(). + utils.Logger().Debug(). Int("index", i). Str("BLSPubKey", pubKeys[i].SerializeToHexStr()). Msg("Member") diff --git a/consensus/leader.go b/consensus/leader.go index 0c002df84..cba41c2ca 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -164,7 +164,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { logger = logger.With(). Int64("NumReceivedSoFar", consensus.Decider.SignersCount(quorum.Prepare)). Int64("PublicKeys", consensus.Decider.ParticipantsCount()).Logger() - logger.Info().Msg("[OnPrepare] Received New Prepare Signature") + logger.Debug().Msg("[OnPrepare] Received New Prepare Signature") if _, err := consensus.Decider.SubmitVote( quorum.Prepare, validatorPubKey, &sign, recvMsg.BlockHash, @@ -224,7 +224,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { // Must have the corresponding block to verify committed message. blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) if blockObj == nil { - consensus.getLogger().Debug(). + consensus.getLogger().Info(). Uint64("blockNum", recvMsg.BlockNum). Uint64("viewID", recvMsg.ViewID). Str("blockHash", recvMsg.BlockHash.Hex()). @@ -246,7 +246,7 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { logger = logger.With(). Int64("numReceivedSoFar", consensus.Decider.SignersCount(quorum.Commit)). Logger() - logger.Info().Msg("[OnCommit] Received new commit message") + logger.Debug().Msg("[OnCommit] Received new commit message") if _, err := consensus.Decider.SubmitVote( quorum.Commit, validatorPubKey, @@ -266,14 +266,14 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { if !quorumWasMet && quorumIsMet { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") go func(viewID uint64) { - consensus.getLogger().Debug().Msg("[OnCommit] Starting Grace Period") + consensus.getLogger().Info().Msg("[OnCommit] Starting Grace Period") // Always wait for 2 seconds as minimum grace period time.Sleep(2 * time.Second) if n := time.Now(); n.Before(consensus.NextBlockDue) { // Sleep to wait for the full block time time.Sleep(consensus.NextBlockDue.Sub(n)) } - logger.Debug().Msg("[OnCommit] Commit Grace Period Ended") + logger.Info().Msg("[OnCommit] Commit Grace Period Ended") consensus.commitFinishChan <- viewID }(consensus.viewID) diff --git a/consensus/threshold.go b/consensus/threshold.go index f0b223a3b..4c4367065 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -13,7 +13,7 @@ import ( func (consensus *Consensus) didReachPrepareQuorum() error { logger := utils.Logger() - logger.Debug().Msg("[OnPrepare] Received Enough Prepare Signatures") + logger.Info().Msg("[OnPrepare] Received Enough Prepare Signatures") leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() if err != nil { utils.Logger().Warn().Err(err).Msg("[OnPrepare] leader not found") diff --git a/consensus/validator.go b/consensus/validator.go index 70d2c01d9..f11e721b8 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -299,7 +299,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.commitBitmap = mask if recvMsg.BlockNum-consensus.blockNum > consensusBlockNumBuffer { - consensus.getLogger().Debug().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") + consensus.getLogger().Info().Uint64("MsgBlockNum", recvMsg.BlockNum).Msg("[OnCommitted] OUT OF SYNC") go func() { select { case consensus.BlockNumLowChan <- struct{}{}: @@ -315,7 +315,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.tryCatchup() if consensus.current.Mode() == ViewChanging { - consensus.getLogger().Debug().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!") + consensus.getLogger().Info().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!") return } diff --git a/core/offchain.go b/core/offchain.go index 85400151b..3302b4cf3 100644 --- a/core/offchain.go +++ b/core/offchain.go @@ -186,9 +186,7 @@ func (bc *BlockChain) CommitOffChainData( for i, c := uint32(0), shard.Schedule.InstanceForEpoch( epoch, ).NumShards(); i < c; i++ { - if err := bc.LastContinuousCrossLink(batch, i); err != nil { - utils.Logger().Info().Msg("Could not roll up last continuous crosslink") - } + bc.LastContinuousCrossLink(batch, i) } } diff --git a/node/node.go b/node/node.go index bda6054a7..7660951ac 100644 --- a/node/node.go +++ b/node/node.go @@ -225,10 +225,11 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) []error { pendingCount, queueCount := node.TxPool.Stats() utils.Logger().Info(). + Interface("err", errs). Int("length of newTxs", len(newTxs)). Int("totalPending", pendingCount). Int("totalQueued", queueCount). - Msg("Got more transactions") + Msg("[addPendingTransactions] Adding more transactions") return errs } @@ -276,6 +277,7 @@ func (node *Node) AddPendingTransaction(newTx *types.Transaction) error { errs := node.addPendingTransactions(types.Transactions{newTx}) for i := range errs { if errs[i] != nil { + utils.Logger().Info().Err(errs[i]).Msg("[AddPendingTransaction] Failed adding new transaction") return errs[i] } } diff --git a/node/node_cross_link.go b/node/node_cross_link.go index 0d076524e..55e274117 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -96,7 +96,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { } if _, ok := existingCLs[cl.Hash()]; ok { - utils.Logger().Err(err). + utils.Logger().Debug().Err(err). Msgf("[ProcessingCrossLink] Cross Link already exists in pending queue, pass. Beacon Epoch: %d, Block num: %d, Epoch: %d, shardID %d", node.Blockchain().CurrentHeader().Epoch(), cl.Number(), cl.Epoch(), cl.ShardID()) continue @@ -104,7 +104,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64()) if err == nil && exist != nil { - utils.Logger().Err(err). + utils.Logger().Debug().Err(err). Msgf("[ProcessingCrossLink] Cross Link already exists, pass. Beacon Epoch: %d, Block num: %d, Epoch: %d, shardID %d", node.Blockchain().CurrentHeader().Epoch(), cl.Number(), cl.Epoch(), cl.ShardID()) continue } diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 970ef0aa0..8738ecbb1 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -46,7 +46,7 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block) { // BroadcastCXReceiptsWithShardID broadcasts cross shard receipts to given ToShardID func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig []byte, commitBitmap []byte, toShardID uint32) { myShardID := node.Consensus.ShardID - utils.Logger().Info(). + utils.Logger().Debug(). Uint32("toShardID", toShardID). Uint32("myShardID", myShardID). Uint64("blockNum", block.NumberU64()). @@ -54,7 +54,7 @@ func (node *Node) BroadcastCXReceiptsWithShardID(block *types.Block, commitSig [ cxReceipts, err := node.Blockchain().ReadCXReceipts(toShardID, block.NumberU64(), block.Hash()) if err != nil || len(cxReceipts) == 0 { - utils.Logger().Info().Uint32("ToShardID", toShardID). + utils.Logger().Debug().Uint32("ToShardID", toShardID). Int("numCXReceipts", len(cxReceipts)). Msg("[CXMerkleProof] No receipts found for the destination shard") return diff --git a/node/node_explorer.go b/node/node_explorer.go index 574bf3fdc..88f94a650 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -118,7 +118,7 @@ func (node *Node) ExplorerMessageHandler(payload []byte) { // AddNewBlockForExplorer add new block for explorer. func (node *Node) AddNewBlockForExplorer(block *types.Block) { - utils.Logger().Debug().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node") + utils.Logger().Info().Uint64("blockHeight", block.NumberU64()).Msg("[Explorer] Adding new block for explorer node") if _, err := node.Blockchain().InsertChain([]*types.Block{block}, true); err == nil { if len(block.Header().ShardState()) > 0 { node.Consensus.UpdateConsensusInformation() @@ -165,7 +165,8 @@ func (node *Node) GetTransactionsHistory(address, txType, order string) ([]commo key := explorer.GetAddressKey(address) bytes, err := explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, false).GetDB().Get([]byte(key), nil) if err != nil { - utils.Logger().Error().Err(err).Msg("[Explorer] Cannot get storage db instance") + utils.Logger().Debug().Err(err). + Msgf("[Explorer] Error retrieving transaction history for address %s", address) return make([]common.Hash, 0), nil } if err = rlp.DecodeBytes(bytes, &addressData); err != nil { @@ -197,7 +198,8 @@ func (node *Node) GetStakingTransactionsHistory(address, txType, order string) ( key := explorer.GetAddressKey(address) bytes, err := explorer.GetStorageInstance(node.SelfPeer.IP, node.SelfPeer.Port, false).GetDB().Get([]byte(key), nil) if err != nil { - utils.Logger().Error().Err(err).Msg("[Explorer] Cannot get storage db instance") + utils.Logger().Debug().Err(err). + Msgf("[Explorer] Staking transaction history for address %s not found", address) return make([]common.Hash, 0), nil } if err = rlp.DecodeBytes(bytes, &addressData); err != nil { diff --git a/node/node_newblock.go b/node/node_newblock.go index 5e2b862e3..61668822b 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -45,7 +45,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch for node.Consensus != nil && node.Consensus.IsLeader() { time.Sleep(SleepPeriod) - utils.Logger().Debug(). + utils.Logger().Info(). Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1). Msg("PROPOSING NEW BLOCK ------------------------------------------------") @@ -56,7 +56,7 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan struct{}, stopChan ch err = node.Blockchain().Validator().ValidateHeader(newBlock, true) if err == nil { - utils.Logger().Debug(). + utils.Logger().Info(). Uint64("blockNum", newBlock.NumberU64()). Uint64("epoch", newBlock.Epoch().Uint64()). Uint64("viewID", newBlock.Header().ViewID().Uint64()). @@ -197,7 +197,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { crossLinksToPropose = append(crossLinksToPropose, pending) } - utils.Logger().Debug(). + utils.Logger().Info(). Msgf("[proposeNewBlock] Proposed %d crosslinks from %d pending crosslinks", len(crossLinksToPropose), len(allPending), ) From 756c7750b703726c950199ea0685d4b4e2046a3a Mon Sep 17 00:00:00 2001 From: Janet Liang <56005637+janet-harmony@users.noreply.github.com> Date: Tue, 19 May 2020 20:45:28 -0700 Subject: [PATCH 25/35] [main] Enable beacon sync for explorer nodes (#3059) --- cmd/harmony/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index c374c45e2..2889e42c2 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -714,8 +714,7 @@ func main() { } }() - if nodeConfig.ShardID != shard.BeaconChainShardID && - currentNode.NodeConfig.Role() != nodeconfig.ExplorerNode { + if nodeConfig.ShardID != shard.BeaconChainShardID { utils.Logger().Info(). Uint32("shardID", currentNode.Blockchain().ShardID()). Uint32("shardID", nodeConfig.ShardID).Msg("SupportBeaconSyncing") From 734d9c11d45078116bb629a8fcfad401d80f8e4c Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Tue, 19 May 2020 23:05:07 -0700 Subject: [PATCH 26/35] Revert "[multi-bls] validator sign and send message in parallel (#3054)" This reverts commit 3c6b574b08ed5485575ae5939ddc6c48bff0f59d. --- consensus/validator.go | 87 ++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 50 deletions(-) diff --git a/consensus/validator.go b/consensus/validator.go index f11e721b8..cf5a4aec7 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -3,7 +3,6 @@ package consensus import ( "bytes" "encoding/hex" - "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -59,36 +58,30 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } func (consensus *Consensus) prepare() { - var wg sync.WaitGroup - wg.Add(len(consensus.PubKey.PublicKey)) groupID := []nodeconfig.GroupID{nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))} for i, key := range consensus.PubKey.PublicKey { - go func(i int) { - defer wg.Done() - networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) - if err != nil { - consensus.getLogger().Err(err). - Str("message-type", msg_pb.MessageType_PREPARE.String()). - Msg("could not construct message") - return - } + networkMessage, err := consensus.construct(msg_pb.MessageType_PREPARE, nil, key, consensus.priKey.PrivateKey[i]) + if err != nil { + consensus.getLogger().Err(err). + Str("message-type", msg_pb.MessageType_PREPARE.String()). + Msg("could not construct message") + return + } - // TODO: this will not return immediatey, may block - if consensus.current.Mode() != Listening { - if err := consensus.msgSender.SendWithoutRetry( - groupID, - p2p.ConstructMessage(networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") - } else { - consensus.getLogger().Info(). - Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). - Msg("[OnAnnounce] Sent Prepare Message!!") - } + // TODO: this will not return immediatey, may block + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + p2p.ConstructMessage(networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") + } else { + consensus.getLogger().Info(). + Str("blockHash", hex.EncodeToString(consensus.blockHash[:])). + Msg("[OnAnnounce] Sent Prepare Message!!") } - }(i) + } } - wg.Wait() consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", FBFTPrepare.String()). @@ -214,33 +207,27 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { groupID := []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), } - var wg sync.WaitGroup - wg.Add(len(consensus.PubKey.PublicKey)) for i, key := range consensus.PubKey.PublicKey { - go func(i int) { - defer wg.Done() - networkMessage, _ := consensus.construct( - msg_pb.MessageType_COMMIT, - commitPayload, - key, consensus.priKey.PrivateKey[i], - ) - - if consensus.current.Mode() != Listening { - if err := consensus.msgSender.SendWithoutRetry( - groupID, - p2p.ConstructMessage(networkMessage.Bytes), - ); err != nil { - consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") - } else { - consensus.getLogger().Info(). - Uint64("blockNum", consensus.blockNum). - Hex("blockHash", consensus.blockHash[:]). - Msg("[OnPrepared] Sent Commit Message!!") - } + networkMessage, _ := consensus.construct( + msg_pb.MessageType_COMMIT, + commitPayload, + key, consensus.priKey.PrivateKey[i], + ) + + if consensus.current.Mode() != Listening { + if err := consensus.msgSender.SendWithoutRetry( + groupID, + p2p.ConstructMessage(networkMessage.Bytes), + ); err != nil { + consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") + } else { + consensus.getLogger().Info(). + Uint64("blockNum", consensus.blockNum). + Hex("blockHash", consensus.blockHash[:]). + Msg("[OnPrepared] Sent Commit Message!!") } - }(i) + } } - wg.Wait() consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", FBFTCommit.String()). From 5d46fce22521b5280c8bbb32a8ed009889cbc01c Mon Sep 17 00:00:00 2001 From: Nye Liu Date: Wed, 20 May 2020 18:30:33 +0000 Subject: [PATCH 27/35] Change common messages to Info() to prevent false positive alerts Was Warn(), now Info(): "Pooled new executable transaction" "Pooled new future transaction" "Promoting queued transaction" --- core/tx_pool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index f06127e15..a9e3c0d40 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -938,7 +938,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { pool.priced.Put(tx) pool.journalTx(from, tx) - logger.Warn(). + logger.Info(). Str("hash", tx.Hash().Hex()). Interface("from", from). Interface("to", tx.To()). @@ -964,7 +964,7 @@ func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) { } pool.journalTx(from, tx) - logger.Warn(). + logger.Info(). Str("hash", hash.Hex()). Interface("from", from). Interface("to", tx.To()). @@ -1275,7 +1275,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() if pool.promoteTx(addr, tx) { - logger.Warn().Str("hash", hash.Hex()).Msg("Promoting queued transaction") + logger.Info().Str("hash", hash.Hex()).Msg("Promoting queued transaction") promoted = append(promoted, tx) } } From 44f6cd65769a778799a68c5d902f2318a0ea76bf Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Thu, 21 May 2020 10:35:11 -0700 Subject: [PATCH 28/35] [apr] fix wrong staking apr in the third staking epoch (#3070) * [apr] fix wrong staking apr in the third staking epoch * fixing wrong condition * update only first staking epoch apr * iterate over APR entires and fix the staking epoch apr --- core/offchain.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/core/offchain.go b/core/offchain.go index 3302b4cf3..100c0bd27 100644 --- a/core/offchain.go +++ b/core/offchain.go @@ -17,6 +17,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/shard" + "github.com/harmony-one/harmony/staking/apr" "github.com/harmony-one/harmony/staking/slash" staking "github.com/harmony-one/harmony/staking/types" "github.com/pkg/errors" @@ -213,6 +214,44 @@ func (bc *BlockChain) CommitOffChainData( Err(err). Msg("[UpdateValidatorVotingPower] Failed to decode shard state") } + // fix the possible wrong apr in the staking epoch + stakingEpoch := bc.Config().StakingEpoch + secondStakingEpoch := big.NewInt(0).Add(stakingEpoch, common.Big1) + thirdStakingEpoch := big.NewInt(0).Add(secondStakingEpoch, common.Big1) + isThirdStakingEpoch := block.Epoch().Cmp(thirdStakingEpoch) == 0 + if isThirdStakingEpoch { + // we have to do it for all validators, not only currently elected + if validators, err := bc.ReadValidatorList(); err == nil { + for _, addr := range validators { + // get wrapper from the second staking epoch + if snapshot, err := bc.ReadValidatorSnapshotAtEpoch( + secondStakingEpoch, addr, + ); err == nil { + if block := bc.GetBlockByNumber( + shard.Schedule.EpochLastBlock(stakingEpoch.Uint64()), + ); block != nil { + if aprComputed, err := apr.ComputeForValidator( + bc, block, snapshot.Validator, + ); err == nil { + stats, ok := tempValidatorStats[addr] + if !ok { + stats, err = bc.ReadValidatorStats(addr) + if err != nil { + continue + } + } + for i := range stats.APRs { + if stats.APRs[i].Epoch.Cmp(stakingEpoch) == 0 { + stats.APRs[i] = staking.APREntry{stakingEpoch, *aprComputed} + } + } + tempValidatorStats[addr] = stats + } + } + } + } + } + } } // Update block reward accumulator and slashes From 75da8cb42446a47a4e91d896cd37bbce878a68d9 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 21 May 2020 16:54:00 -0700 Subject: [PATCH 29/35] [node.sh] randomize auto-update time Signed-off-by: Leo Chen --- scripts/node.sh | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/scripts/node.sh b/scripts/node.sh index f8f2fed25..506f43278 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -23,6 +23,19 @@ err() { exit "${code}" } +# b: beginning +# r: range +# return random number between b ~ b+r +random() { + local b=$1 + local r=$2 + if [ $r -le 0 ]; then + r=100 + fi + local rand=$(( $(od -A n -t d -N 3 /dev/urandom | grep -oE '[0-9]+') % r )) + echo $(( b + rand )) +} + # https://www.linuxjournal.com/content/validating-ip-address-bash-script function valid_ip() { @@ -596,10 +609,12 @@ any_new_binaries() { ${do_not_download} && return 0 outdir="${1}" mkdir -p "${outdir}" - curl -L https://harmony.one/pubkey -o "${outdir}/harmony_pubkey.pem" - if ! grep -q "BEGIN\ PUBLIC\ KEY" "${outdir}/harmony_pubkey.pem"; then - msg "failed to downloaded harmony public signing key" - return 1 + if ${verify}; then + curl -L https://harmony.one/pubkey -o "${outdir}/harmony_pubkey.pem" + if ! grep -q "BEGIN\ PUBLIC\ KEY" "${outdir}/harmony_pubkey.pem"; then + msg "failed to downloaded harmony public signing key" + return 1 + fi fi curl -sSf http://${BUCKET}.s3.amazonaws.com/${FOLDER}/md5sum.txt -o "${outdir}/md5sum.txt.new" || return $? if diff $outdir/md5sum.txt.new md5sum.txt @@ -865,8 +880,9 @@ rm_bls_pass() { { while ${loop} do - msg "re-downloading binaries in 5m" - sleep 300 + msg "re-downloading binaries in 5~10m" + redl_sec=$( random 300 300 ) + sleep $redl_sec if any_new_binaries staging then msg "binaries did not change" @@ -874,8 +890,9 @@ rm_bls_pass() { fi while ! download_binaries staging do - msg "staging download failed; retrying in 30s" - sleep 30 + msg "staging download failed; retrying in 30~60s" + retry_sec=$( random 30 30 ) + sleep $retry_sec done if diff staging/harmony-checksums.txt harmony-checksums.txt then @@ -971,7 +988,7 @@ do *) ld_path_var=LD_LIBRARY_PATH;; esac run() { - (sleep 30 && rm_bls_pass)& + (sleep 60 && rm_bls_pass)& env "${ld_path_var}=$(pwd)" ./harmony "${args[@]}" "${@}" } case "${blspass:+set}" in From c2b271cde6a00bcac926bf5d73bab724649c7558 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 21 May 2020 16:55:54 -0700 Subject: [PATCH 30/35] [node.sh] remove un-supported download db function Signed-off-by: Leo Chen --- scripts/node.sh | 66 ++----------------------------------------------- 1 file changed, 2 insertions(+), 64 deletions(-) diff --git a/scripts/node.sh b/scripts/node.sh index 506f43278..e0e4ead82 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -184,7 +184,6 @@ options: -n port specify the public base port of the node (default: 9000) -T nodetype specify the node type (validator, explorer; default: validator) -i shardid specify the shard id (valid only with explorer node; default: 1) - -b download harmony_db files from shard specified by -i (default: off) -a dbfile specify the db file to download (default:off) -U FOLDER specify the upgrade folder to download binaries -P enable public rpc end point (default:off) @@ -252,7 +251,7 @@ usage() { BUCKET=pub.harmony.one OS=$(uname -s) -unset start_clean loop run_as_root blspass do_not_download download_only network node_type shard_id download_harmony_db db_file_to_dl +unset start_clean loop run_as_root blspass do_not_download download_only network node_type shard_id db_file_to_dl unset upgrade_rel public_rpc staking_mode pub_port multi_key blsfolder blacklist verify TRACEFILE minpeers max_bls_keys_per_node log_level start_clean=false loop=true @@ -262,7 +261,6 @@ download_only=false network=mainnet node_type=validator shard_id=-1 -download_harmony_db=false public_rpc=false staking_mode=false multi_key=false @@ -280,12 +278,11 @@ ${TRACEFILE=} unset OPTIND OPTARG opt OPTIND=1 -while getopts :1chk:sSp:dDN:T:i:ba:U:PvVyzn:MAIB:r:Y:f:R:m:L: opt +while getopts :1chk:sSp:dDN:T:i:a:U:PvVyzn:MAIB:r:Y:f:R:m:L: opt do case "${opt}" in '?') usage "unrecognized option -${OPTARG}";; ':') usage "missing argument for -${OPTARG}";; - b) download_harmony_db=true;; c) start_clean=true;; 1) loop=false;; h) print_usage; exit 0;; @@ -552,58 +549,6 @@ _curl_download() { fi } -download_harmony_db_file() { - local shard_id - shard_id="${1}" - local file_to_dl="${2}" - local outdir=db - if ! check_free_disk; then - err 70 "do not have enough free disk space to download db tarball" - fi - - url="http://${BUCKET}.s3.amazonaws.com/${FOLDER}/db/md5sum.txt" - rm -f "${outdir}/md5sum.txt" - if ! _curl_download $url "${outdir}" md5sum.txt; then - err 70 "cannot download md5sum.txt" - fi - - if [ -n "${file_to_dl}" ]; then - if grep -q "${file_to_dl}" "${outdir}/md5sum.txt"; then - url="http://${BUCKET}.s3.amazonaws.com/${FOLDER}/db/${file_to_dl}" - if _curl_download $url "${outdir}" ${file_to_dl}; then - verify_checksum "${outdir}" "${file_to_dl}" md5sum.txt || return $? - msg "downloaded ${file_to_dl}, extracting ..." - tar -C "${outdir}" -xvf "${outdir}/${file_to_dl}" - else - msg "can't download ${file_to_dl}" - fi - fi - return - fi - - files=$(awk '{ print $2 }' ${outdir}/md5sum.txt) - echo "[available harmony db files for shard ${shard_id}]" - grep -oE "harmony_db_${shard_id}"-.*.tar "${outdir}/md5sum.txt" - echo - for file in $files; do - if [[ $file =~ "harmony_db_${shard_id}" ]]; then - echo -n "Do you want to download ${file} (choose one only) [y/n]?" - read yesno - if [[ "$yesno" = "y" || "$yesno" = "Y" ]]; then - url="http://${BUCKET}.s3.amazonaws.com/${FOLDER}/db/$file" - if _curl_download $url "${outdir}" $file; then - verify_checksum "${outdir}" "${file}" md5sum.txt || return $? - msg "downloaded $file, extracting ..." - tar -C "${outdir}" -xvf "${outdir}/${file}" - else - msg "can't download $file" - fi - break - fi - fi - done -} - any_new_binaries() { local outdir ${do_not_download} && return 0 @@ -637,11 +582,6 @@ if ${download_only}; then exit 0 fi -if ${download_harmony_db}; then - download_harmony_db_file "${shard_id}" "${db_file_to_dl}" || err 70 "download harmony_db file failed" - exit 0 -fi - if ${run_as_root}; then check_root fi @@ -688,8 +628,6 @@ fi NODE_PORT=${pub_port:-9000} PUB_IP= -PUSHGATEWAY_IP= -PUSHGATEWAY_PORT= if [ "$OS" == "Linux" ]; then if ${run_as_root}; then From 8a370c64ff865a2c01ff1c51e871c95b4538efb3 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 21 May 2020 16:57:12 -0700 Subject: [PATCH 31/35] [node.sh] update version of node.sh Signed-off-by: Leo Chen --- scripts/node.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/node.sh b/scripts/node.sh index e0e4ead82..16c0860f4 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -version="v1 20200513.0" +version="v1 20200521.0" unset -v progname progname="${0##*/}" From 7839091fa9525ff226433832f2eedb265561a4a1 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 21 May 2020 16:57:51 -0700 Subject: [PATCH 32/35] [node.sh] remove unused fuser Signed-off-by: Leo Chen --- scripts/node.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/scripts/node.sh b/scripts/node.sh index 16c0860f4..c8c4259f1 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -633,8 +633,6 @@ if [ "$OS" == "Linux" ]; then if ${run_as_root}; then setup_env fi -# Kill existing soldier/node - fuser -k -n tcp $NODE_PORT fi # find my public ip address From cde64e52b2dc5358445e64c8da53b02cf93093ba Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 21 May 2020 23:58:24 -0700 Subject: [PATCH 33/35] [node.sh] remove unused function Signed-off-by: Leo Chen --- scripts/node.sh | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/scripts/node.sh b/scripts/node.sh index c8c4259f1..e64e6f2e3 100755 --- a/scripts/node.sh +++ b/scripts/node.sh @@ -510,20 +510,6 @@ download_binaries() { (cd "${outdir}" && exec openssl sha256 $(cut -c35- md5sum.txt)) > "${outdir}/harmony-checksums.txt" } -check_free_disk() { - local dir - dir="${1:-.}" - local free_disk=$(df -BG $dir | tail -n 1 | awk ' { print $4 } ' | tr -d G) - # need at least 50G free disk space - local need_disk=50 - - if [ $free_disk -gt $need_disk ]; then - return 0 - else - return 1 - fi -} - _curl_check_exist() { local url=$1 local statuscode=$(curl -I --silent --output /dev/null --write-out "%{http_code}" $url) From a6c29d24751feec6940bc685f6591f188f93bc91 Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Fri, 22 May 2020 11:07:06 -0700 Subject: [PATCH 34/35] [apr][rpc] validator information return all apr entries (#3082) * [apr][rpc] validator information return all entries * disable average logic and use latest apr --- hmy/api_backend.go | 89 +++++++++++++++++++++----------------- staking/types/validator.go | 1 + 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/hmy/api_backend.go b/hmy/api_backend.go index 4ec576b71..0d53debc6 100644 --- a/hmy/api_backend.go +++ b/hmy/api_backend.go @@ -414,6 +414,7 @@ func (b *APIBackend) GetValidatorInformation( wrapper.BlockReward, wrapper.Counters, zero, + nil, }, } @@ -447,54 +448,62 @@ func (b *APIBackend) GetValidatorInformation( return defaultReply, nil } + latestAPR := numeric.ZeroDec() + l := len(stats.APRs) + if l > 0 { + latestAPR = stats.APRs[l-1].Value + } + defaultReply.Lifetime.APR = latestAPR + defaultReply.Lifetime.EpochAPRs = stats.APRs + // average apr cache keys - key := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64()) - prevKey := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64()-1) + // key := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64()) + // prevKey := fmt.Sprintf("apr-%s-%d", addr.Hex(), now.Uint64()-1) // delete entry for previous epoch - b.apiCache.Forget(prevKey) + // b.apiCache.Forget(prevKey) // calculate last APRHistoryLength epochs for averaging APR - epochFrom := bc.Config().StakingEpoch - nowMinus := big.NewInt(0).Sub(now, big.NewInt(staking.APRHistoryLength)) - if nowMinus.Cmp(epochFrom) > 0 { - epochFrom = nowMinus - } - - if len(stats.APRs) > 0 && stats.APRs[0].Epoch.Cmp(epochFrom) > 0 { - epochFrom = stats.APRs[0].Epoch - } - - epochToAPRs := map[int64]numeric.Dec{} - for i := 0; i < len(stats.APRs); i++ { - entry := stats.APRs[i] - epochToAPRs[entry.Epoch.Int64()] = entry.Value - } + // epochFrom := bc.Config().StakingEpoch + // nowMinus := big.NewInt(0).Sub(now, big.NewInt(staking.APRHistoryLength)) + // if nowMinus.Cmp(epochFrom) > 0 { + // epochFrom = nowMinus + // } + + // if len(stats.APRs) > 0 && stats.APRs[0].Epoch.Cmp(epochFrom) > 0 { + // epochFrom = stats.APRs[0].Epoch + // } + + // epochToAPRs := map[int64]numeric.Dec{} + // for i := 0; i < len(stats.APRs); i++ { + // entry := stats.APRs[i] + // epochToAPRs[entry.Epoch.Int64()] = entry.Value + // } // at this point, validator is active and has apr's for the recent 100 epochs // compute average apr over history - if avgAPR, err := b.SingleFlightRequest( - key, func() (interface{}, error) { - total := numeric.ZeroDec() - count := 0 - for i := epochFrom.Int64(); i < now.Int64(); i++ { - if apr, ok := epochToAPRs[i]; ok { - total = total.Add(apr) - } - count++ - } - if count == 0 { - return nil, errors.New("no apr snapshots available") - } - return total.QuoInt64(int64(count)), nil - }, - ); err != nil { - // could not compute average apr from snapshot - // assign the latest apr available from stats - defaultReply.Lifetime.APR = numeric.ZeroDec() - } else { - defaultReply.Lifetime.APR = avgAPR.(numeric.Dec) - } + // if avgAPR, err := b.SingleFlightRequest( + // key, func() (interface{}, error) { + // total := numeric.ZeroDec() + // count := 0 + // for i := epochFrom.Int64(); i < now.Int64(); i++ { + // if apr, ok := epochToAPRs[i]; ok { + // total = total.Add(apr) + // } + // count++ + // } + // if count == 0 { + // return nil, errors.New("no apr snapshots available") + // } + // return total.QuoInt64(int64(count)), nil + // }, + // ); err != nil { + // // could not compute average apr from snapshot + // // assign the latest apr available from stats + // defaultReply.Lifetime.APR = numeric.ZeroDec() + // } else { + // defaultReply.Lifetime.APR = avgAPR.(numeric.Dec) + // } if defaultReply.CurrentlyInCommittee { defaultReply.ComputedMetrics = stats diff --git a/staking/types/validator.go b/staking/types/validator.go index 178297b53..f2697e71f 100644 --- a/staking/types/validator.go +++ b/staking/types/validator.go @@ -156,6 +156,7 @@ type AccumulatedOverLifetime struct { BlockReward *big.Int `json:"reward-accumulated"` Signing counters `json:"blocks"` APR numeric.Dec `json:"apr"` + EpochAPRs []APREntry `json:"epoch-apr"` } func (w ValidatorWrapper) String() string { From 433792662f2d98c051d0463aba724a9bdeeb3e4b Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 21 May 2020 10:44:29 +0000 Subject: [PATCH 35/35] [go.mod] update go.mod for libp2p Signed-off-by: Leo Chen --- go.mod | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 313f5d875..8dfa9e063 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/golangci/golangci-lint v1.22.2 github.com/gorilla/handlers v1.4.0 // indirect github.com/gorilla/mux v1.7.2 + github.com/gorilla/websocket v1.4.2 github.com/harmony-ek/gencodec v0.0.0-20190215044613-e6740dbdd846 github.com/harmony-one/bls v0.0.6 github.com/harmony-one/taggedrlp v0.1.4 @@ -34,18 +35,18 @@ require ( github.com/karalabe/hid v1.0.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/libp2p/go-addr-util v0.0.2 // indirect - github.com/libp2p/go-libp2p v0.7.4 - github.com/libp2p/go-libp2p-core v0.5.1 + github.com/libp2p/go-libp2p v0.9.2 + github.com/libp2p/go-libp2p-core v0.5.6 github.com/libp2p/go-libp2p-crypto v0.1.0 - github.com/libp2p/go-libp2p-discovery v0.3.0 + github.com/libp2p/go-libp2p-discovery v0.4.0 github.com/libp2p/go-libp2p-host v0.1.0 github.com/libp2p/go-libp2p-kad-dht v0.5.0 github.com/libp2p/go-libp2p-net v0.1.0 github.com/libp2p/go-libp2p-peer v0.2.0 - github.com/libp2p/go-libp2p-peerstore v0.2.3 - github.com/libp2p/go-libp2p-pubsub v0.2.7-0.20200325112436-d3d43e32bef3 - github.com/multiformats/go-multiaddr v0.2.1 - github.com/multiformats/go-multiaddr-net v0.1.4 + github.com/libp2p/go-libp2p-peerstore v0.2.4 + github.com/libp2p/go-libp2p-pubsub v0.3.0 + github.com/multiformats/go-multiaddr v0.2.2 + github.com/multiformats/go-multiaddr-net v0.1.5 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 @@ -60,12 +61,9 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d github.com/uber/jaeger-client-go v2.20.1+incompatible // indirect github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.uber.org/zap v1.14.1 // indirect - golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 + golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 golang.org/x/lint v0.0.0-20200302205851-738671d3881b - golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a - golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 // indirect golang.org/x/tools v0.0.0-20200408032209-46bd65c8538f google.golang.org/grpc v1.28.1 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15