Revert "Revert "make crosslink broadcast smarter and more efficient (#3036)""

This reverts commit 33ea179b60.
pull/3040/head
Rongjian Lan 5 years ago
parent 33ea179b60
commit cdf98123c2
  1. 2
      node/node_cross_link.go
  2. 45
      node/node_handler.go
  3. 4
      node/node_syncing.go

@ -91,7 +91,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
Msgf("[ProcessingCrossLink] Received crosslinks: %d", len(crosslinks))
for i, cl := range crosslinks {
if i > crossLinkBatchSize {
if i > crossLinkBatchSize*10 { // A sanity check to prevent spamming
break
}

@ -236,7 +236,18 @@ func (node *Node) BroadcastSlash(witness *slash.Record) {
// BroadcastCrossLink is called by consensus leader to
// send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLink(newBlock *types.Block) {
func (node *Node) BroadcastCrossLink() {
curBlock := node.Blockchain().CurrentBlock()
if curBlock == nil {
return
}
if node.NodeConfig.ShardID == shard.BeaconChainShardID ||
!node.Blockchain().Config().IsCrossLink(curBlock.Epoch()) {
// no need to broadcast crosslink if it's beacon chain or it's not crosslink epoch
return
}
// no point to broadcast the crosslink if we aren't even in the right epoch yet
if !node.Blockchain().Config().IsCrossLink(
node.Blockchain().CurrentHeader().Epoch(),
@ -249,36 +260,50 @@ func (node *Node) BroadcastCrossLink(newBlock *types.Block) {
nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID),
)
headers := []*block.Header{}
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID())
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(curBlock.ShardID())
var latestBlockNum uint64
// TODO chao: record the missing crosslink in local database instead of using latest crosslink
// if cannot find latest crosslink, broadcast latest 3 block headers
if err != nil {
utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLink] ReadShardLastCrossLink Failed")
header := node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 2)
header := node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 2)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
}
header = node.Blockchain().GetHeaderByNumber(newBlock.NumberU64() - 1)
header = node.Blockchain().GetHeaderByNumber(curBlock.NumberU64() - 1)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
}
headers = append(headers, newBlock.Header())
headers = append(headers, curBlock.Header())
} else {
latestBlockNum = lastLink.BlockNum()
for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ {
batchSize := crossLinkBatchSize
diff := curBlock.Number().Uint64() - latestBlockNum
if diff > 100 {
// Increase batch size by 1 for every 100 blocks beyond
batchSize += int(diff-100) / 100
}
// Cap at a sane size to avoid overload network
if batchSize > crossLinkBatchSize*10 {
batchSize = crossLinkBatchSize * 10
}
for blockNum := latestBlockNum + 1; blockNum <= curBlock.NumberU64(); blockNum++ {
header := node.Blockchain().GetHeaderByNumber(blockNum)
if header != nil && node.Blockchain().Config().IsCrossLink(header.Epoch()) {
headers = append(headers, header)
if len(headers) == crossLinkBatchSize {
if len(headers) == batchSize {
break
}
}
}
}
utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers))
utils.Logger().Info().Msgf("[BroadcastCrossLink] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, curBlock.NumberU64(), len(headers))
for _, header := range headers {
utils.Logger().Debug().Msgf(
"[BroadcastCrossLink] Broadcasting %d",
@ -419,10 +444,6 @@ func (node *Node) PostConsensusProcessing(
if node.NodeConfig.ShardID == shard.BeaconChainShardID {
node.BroadcastNewBlock(newBlock)
}
if node.NodeConfig.ShardID != shard.BeaconChainShardID &&
node.Blockchain().Config().IsCrossLink(newBlock.Epoch()) {
node.BroadcastCrossLink(newBlock)
}
node.BroadcastCXReceipts(newBlock)
} else {
if node.Consensus.Mode() != consensus.Listening {

@ -168,6 +168,10 @@ func (node *Node) DoBeaconSyncing() {
)
if err != nil {
node.beaconSync.AddLastMileBlock(beaconBlock)
if node.Consensus.IsLeader() {
// Only leader broadcast crosslink to avoid spamming p2p
node.BroadcastCrossLink()
}
}
}
}

Loading…
Cancel
Save