diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 5cdc7bd2e..2b560e567 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -39,6 +39,7 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) { for _, header := range headers { if header.Number.Uint64() >= firstCrossLinkBlock { // Only process cross link starting from FirstCrossLinkBlock + utils.Logger().Debug().Msgf("[ProcessHeaderMessage] Add Pending CrossLink, shardID %d, blockNum %d", header.ShardID, header.Number) crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, header) } } @@ -48,10 +49,13 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) { headersToQuque := []*types.Header{} for _, header := range crossLinkHeadersToProcess { + if len(headersToQuque) > crossLinkBatchSize { + break + } exist, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64(), false) if err == nil && exist != nil { utils.Logger().Debug(). - Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d", header.Number) + Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d, shardID %d", header.Number, header.ShardID) continue } @@ -63,7 +67,7 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) { if err != nil { headersToQuque = append(headersToQuque, header) utils.Logger().Error().Err(err). - Msg("[ProcessingHeader] ReadCrossLink cannot read previousLink") + Msgf("[ProcessingHeader] ReadCrossLink cannot read previousLink with number %d, shardID %d", header.Number.Uint64()-1, header.ShardID) continue } } diff --git a/node/node_handler.go b/node/node_handler.go index 1eacb6ad5..3ba0e405e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -36,7 +36,8 @@ import ( ) const ( - consensusTimeout = 30 * time.Second + consensusTimeout = 30 * time.Second + crossLinkBatchSize = 7 ) // ReceiveGlobalMessage use libp2p pubsub mechanism to receive global broadcast messages @@ -263,19 +264,40 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { // BroadcastCrossLinkHeader is called by consensus leader to send the new header as cross link to beacon chain. func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) { utils.Logger().Info().Msgf("Broadcasting new header to beacon chain groupID %s", node.NodeConfig.GetBeaconGroupID()) - lastThreeHeaders := []*types.Header{} + headers := []*types.Header{} + lastLink, err := node.Beaconchain().ReadShardLastCrossLink(newBlock.ShardID()) + var latestBlockNum uint64 - block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2) - if block != nil { - lastThreeHeaders = append(lastThreeHeaders, block.Header()) - } - block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1) - if block != nil { - lastThreeHeaders = append(lastThreeHeaders, block.Header()) + // if cannot find latest crosslink header, broadcast latest 3 block headers + if err != nil { + utils.Logger().Debug().Err(err).Msg("[BroadcastCrossLinkHeader] ReadShardLastCrossLink Failed") + block := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 2) + if block != nil { + headers = append(headers, block.Header()) + } + block = node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1) + if block != nil { + headers = append(headers, block.Header()) + } + headers = append(headers, newBlock.Header()) + } else { + latestBlockNum = lastLink.BlockNum().Uint64() + for blockNum := latestBlockNum + 1; blockNum <= newBlock.NumberU64(); blockNum++ { + if blockNum > latestBlockNum+crossLinkBatchSize { + break + } + block := node.Blockchain().GetBlockByNumber(blockNum) + if block != nil { + headers = append(headers, block.Header()) + } + } } - lastThreeHeaders = append(lastThreeHeaders, newBlock.Header()) - node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(lastThreeHeaders))) + utils.Logger().Info().Msgf("[BroadcastCrossLinkHeader] Broadcasting Block Headers, latestBlockNum %d, currentBlockNum %d, Number of Headers %d", latestBlockNum, newBlock.NumberU64(), len(headers)) + for _, header := range headers { + utils.Logger().Debug().Msgf("[BroadcastCrossLinkHeader] Broadcasting %d", header.Number.Uint64()) + } + node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers))) } // BroadcastCXReceipts broadcasts cross shard receipts to correspoding