Merge pull request #1449 from chaosma/master

make crosslink broadcast more stable with retry logic
pull/1455/head
chaosma 5 years ago committed by GitHub
commit df37360c3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      node/node_cross_shard.go
  2. 44
      node/node_handler.go

@ -39,6 +39,7 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
for _, header := range headers {
if header.Number.Uint64() >= firstCrossLinkBlock {
// Only process cross link starting from FirstCrossLinkBlock
utils.Logger().Debug().Msgf("[ProcessHeaderMessage] Add Pending CrossLink, shardID %d, blockNum %d", header.ShardID, header.Number)
crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, header)
}
}
@ -48,10 +49,13 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
headersToQuque := []*types.Header{}
for _, header := range crossLinkHeadersToProcess {
if len(headersToQuque) > crossLinkBatchSize {
break
}
exist, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64(), false)
if err == nil && exist != nil {
utils.Logger().Debug().
Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d", header.Number)
Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d, shardID %d", header.Number, header.ShardID)
continue
}
@ -63,7 +67,7 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
if err != nil {
headersToQuque = append(headersToQuque, header)
utils.Logger().Error().Err(err).
Msg("[ProcessingHeader] ReadCrossLink cannot read previousLink")
Msgf("[ProcessingHeader] ReadCrossLink cannot read previousLink with number %d, shardID %d", header.Number.Uint64()-1, header.ShardID)
continue
}
}

@ -36,7 +36,8 @@ import (
)
const (
consensusTimeout = 30 * time.Second
consensusTimeout = 30 * time.Second
crossLinkBatchSize = 7
)
// ReceiveGlobalMessage use libp2p pubsub mechanism to receive global broadcast messages
@ -263,19 +264,40 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
// BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain.
func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", node.NodeConfig.GetBeaconGroupID())
lastThreeHeaders := []*types.Header{}
headers := []*types.Header{}
lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID())
var latestBlockNum uint64
block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2)
if block != nil {
lastThreeHeaders = append(lastThreeHeaders, block.Header())
}
block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1)
if block != nil {
lastThreeHeaders = append(lastThreeHeaders, block.Header())
// if cannot find latest crosslink header, broadcast latest 3 block headers
if err != nil {
utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLinkHeader] ReadShardLastCrossLink Failed")
block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2)
if block != nil {
headers = append(headers, block.Header())
}
block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1)
if block != nil {
headers = append(headers, block.Header())
}
headers = append(headers, newBlock.Header())
} else {
latestBlockNum = lastLink.BlockNum().Uint64()
for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ {
if blockNum > latestBlockNum+crossLinkBatchSize {
break
}
block := node.Blockchain().GetBlockByNumber(blockNum)
if block != nil {
headers = append(headers, block.Header())
}
}
}
lastThreeHeaders = append(lastThreeHeaders, newBlock.Header())
node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(lastThreeHeaders)))
utils.Logger().Info().Msgf("[BroadcastCrossLinkHeader] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers))
for _, header := range headers {
utils.Logger().Debug().Msgf("[BroadcastCrossLinkHeader] Broadcasting %d", header.Number.Uint64())
}
node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers)))
}
// BroadcastCXReceipts broadcasts cross shard receipts to correspoding

Loading…
Cancel
Save