From 33ea179b60b51c5ad66b84065f6aec01bf6bfc1b Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Mon, 18 May 2020 10:48:14 -0700 Subject: [PATCH] Revert "make crosslink broadcast smarter and more efficient (#3036)" This reverts commit afa6ce3d5fa23b4b0a624a60a6f9637587830910. --- node/node_cross_link.go | 2 +- node/node_handler.go | 45 +++++++++++------------------------------ node/node_syncing.go | 4 ---- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/node/node_cross_link.go b/node/node_cross_link.go index f0fdff560..98852b25d 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -91,7 +91,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks)) for i, cl := range crosslinks { - if i > crossLinkBatchSize*10 { // A sanity check to prevent spamming + if i > crossLinkBatchSize { break } diff --git a/node/node_handler.go b/node/node_handler.go index ea6b70a74..fbcfde02e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -236,18 +236,7 @@ func (node *Node) BroadcastSlash(witness *slash.Record) { // BroadcastCrossLink is called by consensus leader to // send the new header as cross link to beacon chain. -func (node *Node) BroadcastCrossLink() { - curBlock := node.Blockchain().CurrentBlock() - if curBlock == nil { - return - } - - if node.NodeConfig.ShardID == shard.BeaconChainShardID || - !node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) { - // no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch - return - } - +func (node *Node) BroadcastCrossLink(newBlock *types.Block) { // no point to broadcast the crosslink if we aren't even in the right epoch yet if !node.Blockchain().Config().IsCrossLink( node.Blockchain().CurrentHeader().Epoch(), @@ -260,50 +249,36 @@ func (node *Node) BroadcastCrossLink() { nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID), ) headers := []*block.Header{} - lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID()) + lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID()) var latestBlockNum uint64 // TODO chao: record the missing crosslink in local database instead of using latest crosslink // if cannot find latest crosslink, broadcast latest 3 block headers if err != nil { utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed") - header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2) + header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1) + header = node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 1) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) } - headers = append(headers, curBlock.Header()) + headers = append(headers, newBlock.Header()) } else { latestBlockNum = lastLink.BlockNum() - - batchSize := crossLinkBatchSize - diff := curBlock.Number().Uint64() - latestBlockNum - - if diff > 100 { - // Increase batch size by 1 for every 100 blocks beyond - batchSize += int(diff-100) / 100 - } - - // Cap at a sane size to avoid overload network - if batchSize > crossLinkBatchSize*10 { - batchSize = crossLinkBatchSize * 10 - } - - for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ { + for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ { header := node.Blockchain().GetHeaderByNumber(blockNum) if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) { headers = append(headers, header) - if len(headers) == batchSize { + if len(headers) == crossLinkBatchSize { break } } } } - utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, curBlock.NumberU64(), len(headers)) + utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) for _, header := range headers { utils.Logger().Debug().Msgf( "[BroadcastCrossLink] Broadcasting %d", @@ -444,6 +419,10 @@ func (node *Node) PostConsensusProcessing( if node.NodeConfig.ShardID == shard.BeaconChainShardID { node.BroadcastNewBlock(newBlock) } + if node.NodeConfig.ShardID != shard.BeaconChainShardID && + node.Blockchain().Config().IsCrossLink(newBlock.Epoch()) { + node.BroadcastCrossLink(newBlock) + } node.BroadcastCXReceipts(newBlock) } else { if node.Consensus.Mode() != consensus.Listening { diff --git a/node/node_syncing.go b/node/node_syncing.go index c991c7b0b..641e27b11 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -168,10 +168,6 @@ func (node *Node) DoBeaconSyncing() { ) if err != nil { node.beaconSync.AddLastMileBlock(beaconBlock) - if node.Consensus.IsLeader() { - // Only leader broadcast crosslink to avoid spamming p2p - node.BroadcastCrossLink() - } } } }