Merge pull request #1363 from harmony-one/cross_shard_tx

Add first cross link block for mainnet; more refactors and checks.
pull/1387/head
Rongjian Lan 5 years ago committed by GitHub
commit 292463ed22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      core/blockchain.go
  2. 7
      core/types/crosslink.go
  3. 4
      internal/configs/sharding/fixedschedule.go
  4. 6
      internal/configs/sharding/localnet.go
  5. 6
      internal/configs/sharding/mainnet.go
  6. 3
      internal/configs/sharding/shardingconfig.go
  7. 6
      internal/configs/sharding/testnet.go
  8. 64
      internal/utils/gomock_reflect_579506979/prog.go
  9. 134
      node/node_cross_shard.go
  10. 207
      node/node_handler.go
  11. 2
      node/node_newblock.go

@ -1129,6 +1129,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain] cannot parse cross links")
return n, err
}
if !crossLinks.IsSorted() {
header.Logger(utils.Logger()).Warn().Err(err).Msg("[insertChain] cross links are not sorted")
return n, errors.New("proposed cross links are not sorted")
}
for _, crossLink := range *crossLinks {
bc.WriteCrossLinks(types.CrossLinks{crossLink}, false)
bc.WriteShardLastCrossLink(crossLink.ShardID(), crossLink)

@ -74,3 +74,10 @@ func (cls CrossLinks) Sort() {
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0)
})
}
// IsSorted checks whether the cross links are sorted
func (cls CrossLinks) IsSorted() bool {
return sort.SliceIsSorted(cls, func(i, j int) bool {
return cls[i].ShardID() < cls[j].ShardID() || (cls[i].ShardID() == cls[j].ShardID() && cls[i].BlockNum().Cmp(cls[j].BlockNum()) < 0)
})
}

@ -36,6 +36,10 @@ func (s fixedSchedule) VdfDifficulty() int {
return mainnetVdfDifficulty
}
func (s fixedSchedule) FirstCrossLinkBlock() uint64 {
return mainnetFirstCrossLinkBlock
}
// ConsensusRatio ratio of new nodes vs consensus total nodes
func (s fixedSchedule) ConsensusRatio() float64 {
return mainnetConsensusRatio

@ -21,6 +21,8 @@ const (
localnetVdfDifficulty = 5000 // This takes about 10s to finish the vdf
localnetConsensusRatio = float64(0.1)
localnetFirstCrossLinkBlock = 13
)
func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
@ -64,6 +66,10 @@ func (ls localnetSchedule) VdfDifficulty() int {
return localnetVdfDifficulty
}
func (ls localnetSchedule) FirstCrossLinkBlock() uint64 {
return localnetFirstCrossLinkBlock
}
// ConsensusRatio ratio of new nodes vs consensus total nodes
func (ls localnetSchedule) ConsensusRatio() float64 {
return localnetConsensusRatio

@ -14,6 +14,8 @@ const (
mainnetVdfDifficulty = 50000 // This takes about 100s to finish the vdf
mainnetConsensusRatio = float64(0.66)
mainnetFirstCrossLinkBlock = 524288 // 32 * 2^14
)
// MainnetSchedule is the mainnet sharding configuration schedule.
@ -64,6 +66,10 @@ func (ms mainnetSchedule) VdfDifficulty() int {
return mainnetVdfDifficulty
}
func (ms mainnetSchedule) FirstCrossLinkBlock() uint64 {
return mainnetFirstCrossLinkBlock
}
// ConsensusRatio ratio of new nodes vs consensus total nodes
func (ms mainnetSchedule) ConsensusRatio() float64 {
return mainnetConsensusRatio

@ -27,6 +27,9 @@ type Schedule interface {
// ConsensusRatio ratio of new nodes vs consensus total nodes
ConsensusRatio() float64
// FirstCrossLinkBlock returns the first cross link block number that will be accepted into beacon chain
FirstCrossLinkBlock() uint64
}
// Instance is one sharding configuration instance.

@ -20,6 +20,8 @@ const (
threeOne = 111
testnetVdfDifficulty = 10000 // This takes about 20s to finish the vdf
testnetFirstCrossLinkBlock = 100
)
func (testnetSchedule) InstanceForEpoch(epoch *big.Int) Instance {
@ -64,6 +66,10 @@ func (ts testnetSchedule) VdfDifficulty() int {
return testnetVdfDifficulty
}
func (ts testnetSchedule) FirstCrossLinkBlock() uint64 {
return testnetFirstCrossLinkBlock
}
// ConsensusRatio ratio of new nodes vs consensus total nodes
func (ts testnetSchedule) ConsensusRatio() float64 {
return mainnetConsensusRatio

@ -0,0 +1,64 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/ethereum/go-ethereum/log"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"Handler", reflect.TypeOf((*pkg_.Handler)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/ethereum/go-ethereum/log"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

@ -31,33 +31,35 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Crosslink Headers Broadcast Unable to Decode")
Msg("[ProcessingHeader] Crosslink Headers Broadcast Unable to Decode")
return
}
utils.Logger().Debug().
Msgf("[ProcessingHeader NUM] %d", len(headers))
// Try to reprocess all the pending cross links
node.pendingClMutex.Lock()
crossLinkHeadersToProcess := node.pendingCrossLinks
node.pendingCrossLinks = []*types.Header{}
node.pendingClMutex.Unlock()
crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, headers...)
firstCrossLinkBlock := core.ShardingSchedule.FirstCrossLinkBlock()
for _, header := range headers {
if header.Number.Uint64() >= firstCrossLinkBlock {
// Only process cross link starting from FirstCrossLinkBlock
crossLinkHeadersToProcess = append(crossLinkHeadersToProcess, header)
}
}
headersToQuque := []*types.Header{}
for _, header := range crossLinkHeadersToProcess {
utils.Logger().Debug().
Msgf("[ProcessingHeader] 1 shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
exist, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64(), false)
if err == nil && exist != nil {
// Cross link already exists, skip
utils.Logger().Debug().
Msgf("[ProcessingHeader] Cross Link already exists, pass. Block num: %d", header.Number)
continue
}
if header.Number.Uint64() > 0 { // Blindly trust the first cross-link
if header.Number.Uint64() > firstCrossLinkBlock { // Directly trust the first cross-link
// Sanity check on the previous link with the new link
previousLink, err := node.Blockchain().ReadCrossLink(header.ShardID, header.Number.Uint64()-1, false)
if err != nil {
@ -72,14 +74,14 @@ func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
if err != nil {
utils.Logger().Warn().
Err(err).
Msgf("Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID, header.Number)
Msgf("[ProcessingHeader] Failed to verify new cross link header for shardID %d, blockNum %d", header.ShardID, header.Number)
continue
}
}
crossLink := types.NewCrossLink(header)
utils.Logger().Debug().
Msgf("[ProcessingHeader] committing shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
Msgf("[ProcessingHeader] committing for shardID %d, blockNum %d", header.ShardID, header.Number.Uint64())
node.Blockchain().WriteCrossLinks(types.CrossLinks{crossLink}, true)
}
@ -146,6 +148,63 @@ func (node *Node) VerifyCrosslinkHeader(prevHeader, header *types.Header) error
return nil
}
// ProposeCrossLinkDataForBeaconchain propose cross links for beacon chain new block
func (node *Node) ProposeCrossLinkDataForBeaconchain() (types.CrossLinks, error) {
utils.Logger().Info().
Uint64("blockNum", node.Blockchain().CurrentBlock().NumberU64()+1).
Msg("Proposing cross links ...")
curBlock := node.Blockchain().CurrentBlock()
numShards := core.ShardingSchedule.InstanceForEpoch(curBlock.Header().Epoch).NumShards()
shardCrossLinks := make([]types.CrossLinks, numShards)
firstCrossLinkBlock := core.ShardingSchedule.FirstCrossLinkBlock()
for i := 0; i < int(numShards); i++ {
curShardID := uint32(i)
lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID)
lastLinkblockNum := big.NewInt(int64(firstCrossLinkBlock))
blockNumoffset := 0
if err == nil && lastLink != nil {
blockNumoffset = 1
lastLinkblockNum = lastLink.BlockNum()
}
for true {
link, err := node.Blockchain().ReadCrossLink(curShardID, lastLinkblockNum.Uint64()+uint64(blockNumoffset), true)
if err != nil || link == nil {
break
}
if link.BlockNum().Uint64() > firstCrossLinkBlock {
err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header())
if err != nil {
utils.Logger().Debug().
Err(err).
Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64())
break
}
lastLink = link
}
shardCrossLinks[i] = append(shardCrossLinks[i], *link)
blockNumoffset++
}
}
crossLinksToPropose := types.CrossLinks{}
for _, crossLinks := range shardCrossLinks {
crossLinksToPropose = append(crossLinksToPropose, crossLinks...)
}
if len(crossLinksToPropose) != 0 {
crossLinksToPropose.Sort()
return crossLinksToPropose, nil
}
return types.CrossLinks{}, errors.New("No cross link to propose")
}
// ProcessReceiptMessage store the receipts and merkle proof in local data store
func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
cxmsg := proto_node.CXReceiptsMessage{}
@ -210,56 +269,3 @@ func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
func (node *Node) ProcessCrossShardTx(blocks []*types.Block) {
// TODO: add logic
}
// ProposeCrossLinkDataForBeaconchain propose cross links for beacon chain new block
func (node *Node) ProposeCrossLinkDataForBeaconchain() (types.CrossLinks, error) {
curBlock := node.Blockchain().CurrentBlock()
numShards := core.ShardingSchedule.InstanceForEpoch(curBlock.Header().Epoch).NumShards()
shardCrossLinks := make([]types.CrossLinks, numShards)
for i := 0; i < int(numShards); i++ {
curShardID := uint32(i)
lastLink, err := node.Blockchain().ReadShardLastCrossLink(curShardID)
blockNum := big.NewInt(0)
blockNumoffset := 0
if err == nil && lastLink != nil {
blockNumoffset = 1
blockNum = lastLink.BlockNum()
}
for true {
link, err := node.Blockchain().ReadCrossLink(curShardID, blockNum.Uint64()+uint64(blockNumoffset), true)
if err != nil || link == nil {
break
}
if link.BlockNum().Uint64() > 1 {
err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header())
if err != nil {
utils.Logger().Debug().
Err(err).
Msgf("[CrossLink] Failed verifying temp cross link %d", link.BlockNum().Uint64())
break
}
lastLink = link
}
shardCrossLinks[i] = append(shardCrossLinks[i], *link)
blockNumoffset++
}
}
crossLinksToPropose := types.CrossLinks{}
for _, crossLinks := range shardCrossLinks {
crossLinksToPropose = append(crossLinksToPropose, crossLinks...)
}
if len(crossLinksToPropose) != 0 {
crossLinksToPropose.Sort()
return crossLinksToPropose, nil
}
return types.CrossLinks{}, errors.New("No cross link to propose")
}

@ -339,59 +339,13 @@ func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// TODO ek – where do we verify parent-child invariants,
// e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"?
// Verify lastCommitSig
if newBlock.NumberU64() > 1 {
header := newBlock.Header()
parentBlock := node.Blockchain().GetBlockByNumber(newBlock.NumberU64() - 1)
if parentBlock == nil {
return ctxerror.New("[VerifyNewBlock] Failed to get parent block", "shardID", header.ShardID, "blockNum", header.Number)
}
parentHeader := parentBlock.Header()
shardState, err := node.Blockchain().ReadShardState(parentHeader.Epoch)
committee := shardState.FindCommitteeByID(parentHeader.ShardID)
if err != nil || committee == nil {
return ctxerror.New("[VerifyNewBlock] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
var committerKeys []*bls.PublicKey
parseKeysSuccess := true
for _, member := range committee.NodeList {
committerKey := new(bls.PublicKey)
err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
parseKeysSuccess = false
break
}
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[VerifyNewBlock] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("[VerifyNewBlock] cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap); err != nil {
return ctxerror.New("[VerifyNewBlock] cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
aggSig := bls.Sign{}
err = aggSig.Deserialize(header.LastCommitSignature[:])
if newBlock.NumberU64() > 1 {
err := node.VerifyBlockLastCommitSigs(newBlock)
if err != nil {
return ctxerror.New("[VerifyNewBlock] unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1)
commitPayload := append(blockNumBytes, header.ParentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("[VerifyNewBlock] Failed to verify the signature for last commit sig", "shardID", header.ShardID, "blockNum", header.Number)
return err
}
}
// End Verify lastCommitSig
if newBlock.ShardID() != node.Blockchain().ShardID() {
return ctxerror.New("wrong shard ID",
"my shard ID", node.Blockchain().ShardID(),
@ -406,63 +360,138 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
}
// Verify cross links
if node.NodeConfig.ShardID == 0 && len(newBlock.Header().CrossLinks) > 0 {
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(newBlock.Header().CrossLinks, crossLinks)
if node.NodeConfig.ShardID == 0 {
err := node.VerifyBlockCrossLinks(newBlock)
if err != nil {
return ctxerror.New("[CrossLinkVerification] failed to decode cross links",
"blockHash", newBlock.Hash(),
"crossLinks", len(newBlock.Header().CrossLinks),
).WithCause(err)
return err
}
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Uint64() > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", newBlock.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
lastLink = &crossLink
}
// TODO: verify the vrf randomness
// _ = newBlock.Header().Vrf
// TODO: uncomment 4 lines after we finish staking mechanism
//err = node.validateNewShardState(newBlock, &node.CurrentStakes)
// if err != nil {
// return ctxerror.New("failed to verify sharding state").WithCause(err)
// }
return nil
}
// VerifyBlockCrossLinks verifies the cross links of the block
func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
if len(block.Header().CrossLinks) == 0 {
return nil
}
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(block.Header().CrossLinks, crossLinks)
if err != nil {
return ctxerror.New("[CrossLinkVerification] failed to decode cross links",
"blockHash", block.Hash(),
"crossLinks", len(block.Header().CrossLinks),
).WithCause(err)
}
if !crossLinks.IsSorted() {
return ctxerror.New("[CrossLinkVerification] cross links are not sorted",
"blockHash", block.Hash(),
"crossLinks", len(block.Header().CrossLinks),
)
}
firstCrossLinkBlock := core.ShardingSchedule.FirstCrossLinkBlock()
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Uint64() > firstCrossLinkBlock {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
if (*crossLinks)[i-1].Header().ShardID != crossLink.Header().ShardID {
}
} else {
if (*crossLinks)[i-1].Header().ShardID != crossLink.Header().ShardID {
if crossLink.BlockNum().Uint64() > firstCrossLinkBlock {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 2",
"blockHash", newBlock.Hash(),
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
} else {
lastLink = &(*crossLinks)[i-1]
}
} else {
lastLink = &(*crossLinks)[i-1]
}
}
if crossLink.BlockNum().Uint64() != 0 { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
"blockHash", newBlock.Hash(),
"numTx", len(newBlock.Transactions()),
).WithCause(err)
}
if crossLink.BlockNum().Uint64() > firstCrossLinkBlock { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
"blockHash", block.Hash(),
"numTx", len(block.Transactions()),
).WithCause(err)
}
}
}
return nil
}
// TODO: verify the vrf randomness
// _ = newBlock.Header().Vrf
// VerifyBlockLastCommitSigs verifies the last commit sigs of the block
func (node *Node) VerifyBlockLastCommitSigs(block *types.Block) error {
header := block.Header()
parentBlock := node.Blockchain().GetBlockByNumber(block.NumberU64() - 1)
if parentBlock == nil {
return ctxerror.New("[VerifyNewBlock] Failed to get parent block", "shardID", header.ShardID, "blockNum", header.Number)
}
parentHeader := parentBlock.Header()
shardState, err := node.Blockchain().ReadShardState(parentHeader.Epoch)
committee := shardState.FindCommitteeByID(parentHeader.ShardID)
// TODO: uncomment 4 lines after we finish staking mechanism
//err = node.validateNewShardState(newBlock, &node.CurrentStakes)
// if err != nil {
// return ctxerror.New("failed to verify sharding state").WithCause(err)
// }
if err != nil || committee == nil {
return ctxerror.New("[VerifyNewBlock] Failed to read shard state for cross link header", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
var committerKeys []*bls.PublicKey
parseKeysSuccess := true
for _, member := range committee.NodeList {
committerKey := new(bls.PublicKey)
err = member.BlsPublicKey.ToLibBLSPublicKey(committerKey)
if err != nil {
parseKeysSuccess = false
break
}
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[VerifyNewBlock] cannot convert BLS public key", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
mask, err := bls_cosi.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("[VerifyNewBlock] cannot create group sig mask", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap); err != nil {
return ctxerror.New("[VerifyNewBlock] cannot set group sig mask bits", "shardID", header.ShardID, "blockNum", header.Number).WithCause(err)
}
aggSig := bls.Sign{}
err = aggSig.Deserialize(header.LastCommitSignature[:])
if err != nil {
return ctxerror.New("[VerifyNewBlock] unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number.Uint64()-1)
commitPayload := append(blockNumBytes, header.ParentHash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("[VerifyNewBlock] Failed to verify the signature for last commit sig", "shardID", header.ShardID, "blockNum", header.Number)
}
return nil
}

@ -108,7 +108,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
newBlock, err = node.Worker.CommitWithCrossLinks(sig, mask, viewID, coinbase, data)
utils.Logger().Debug().
Uint64("blockNum", newBlock.NumberU64()).
Int("numCrossLinks", len(data)).
Int("numCrossLinks", len(crossLinksToPropose)).
Msg("Successfully added cross links into new block")
}
} else {

Loading…
Cancel
Save