@ -3,11 +3,8 @@ package node
import (
"bytes"
"context"
"encoding/gob"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"math"
"math/big"
"os"
@ -17,7 +14,9 @@ import (
"syscall"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
pb "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
@ -179,7 +178,9 @@ func (node *Node) messageHandler(content []byte, sender string) {
case proto_node . PONG :
node . pongMessageHandler ( msgPayload )
case proto_node . ShardState :
node . epochShardStateMessageHandler ( msgPayload )
if err := node . epochShardStateMessageHandler ( msgPayload ) ; err != nil {
ctxerror . Log15 ( utils . GetLogger ( ) . Warn , err )
}
}
default :
utils . GetLogInstance ( ) . Error ( "Unknown" , "MsgCategory" , msgCategory )
@ -253,7 +254,7 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
func ( node * Node ) VerifyNewBlock ( newBlock * types . Block ) error {
// TODO ek – where do we verify parent-child invariants,
// e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"?
err := node . blockchain . ValidateNewBlock ( newBlock , pki . GetAddressFromPublicKey ( node . SelfPeer . ConsensusPubKey ) )
err := node . Blockchain ( ) . ValidateNewBlock ( newBlock , pki . GetAddressFromPublicKey ( node . SelfPeer . ConsensusPubKey ) )
if err != nil {
return ctxerror . New ( "failed to ValidateNewBlock" ,
"blockHash" , newBlock . Hash ( ) ,
@ -264,7 +265,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// TODO: verify the vrf randomness
_ = newBlock . Header ( ) . RandPreimage
err = node . V alidateNewShardState( newBlock , & node . CurrentStakes )
err = node . v alidateNewShardState( newBlock , & node . CurrentStakes )
if err != nil {
return ctxerror . New ( "failed to verify sharding state" ) . WithCause ( err )
}
@ -276,25 +277,42 @@ var BigMaxUint64 = new(big.Int).SetBytes([]byte{
255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 ,
} )
// ValidateNewShardState validate whether the new shard state root matches
func ( node * Node ) ValidateNewShardState ( block * types . Block , stakeInfo * map [ common . Address ] * structs . StakeInfo ) error {
// TODO ek – how does RLP handle nil versus zero-sized slices? same?
proposedShardState := block . Header ( ) . ShardState
if len ( proposedShardState ) == 0 {
// For now, beacon validators simply wait until the beacon leader
// proposes a new sharding state.
// TODO ek – invoke view change if leader continues epoch for too long
// validateNewShardState validate whether the new shard state root matches
func ( node * Node ) validateNewShardState ( block * types . Block , stakeInfo * map [ common . Address ] * structs . StakeInfo ) error {
// Common case first – blocks without resharding proposal
header := block . Header ( )
if header . ShardStateHash == ( common . Hash { } ) {
// No new shard state was proposed
if block . ShardID ( ) == 0 {
if core . IsEpochLastBlock ( block ) {
// TODO ek - invoke view change
return errors . New ( "beacon leader did not propose resharding" )
}
} else {
if node . nextShardState . master != nil &&
! time . Now ( ) . Before ( node . nextShardState . proposeTime ) {
// TODO ek – invoke view change
return errors . New ( "regular leader did not propose resharding" )
}
}
// We aren't expecting to reshard, so proceed to sign
return nil
}
proposed := header . ShardState
if block . ShardID ( ) == 0 {
// Beacon validators independently recalculate the master state and
// compare it against the proposed copy.
nextEpoch := core . GetEpochFromBlockNumber ( block . NumberU64 ( ) ) + 1
nextEpoch := new ( big . Int ) . Add ( block . Header ( ) . Epoch , common . Big1 )
// TODO ek – this may be called from regular shards,
// for vetting beacon chain blocks received during block syncing.
// DRand may or or may not get in the way. Test this out.
expected := core . CalculateNewShardState ( node . blockchain , nextEpoch , stakeInfo )
if types . CompareShardState ( expected , proposedShardState ) != 0 {
expected , err := core . CalculateNewShardState (
node . Blockchain ( ) , nextEpoch , stakeInfo )
if err != nil {
return ctxerror . New ( "cannot calculate expected shard state" ) .
WithCause ( err )
}
if types . CompareShardState ( expected , proposed ) != 0 {
// TODO ek – log state proposal differences
// TODO ek – this error should trigger view change
return errors . New ( "shard state proposal is different from expected" )
@ -307,62 +325,45 @@ func (node *Node) ValidateNewShardState(block *types.Block, stakeInfo *map[commo
// The sanity check for the master proposal is done earlier,
// when the beacon block containing the master proposal is received
// and before it is admitted into the local beacon chain.
if len ( proposedShardState ) != 1 {
// TODO ek – this error should trigger view change
return ctxerror . New (
"regular resharding proposal has incorrect number of shards" ,
"numShards" , len ( proposedShardState ) )
}
proposed := & proposedShardState [ 0 ]
if proposed . ShardID != block . ShardID ( ) {
return ctxerror . New (
"regular resharding proposal has incorrect shard ID" ,
"blockShardID" , block . ShardID ( ) ,
"proposalShardID" , proposed . ShardID )
}
epoch := block . Header ( ) . Epoch
// TODO ek – this check is due to uint64-based block number
// processing and is only a temporary hack. Very unlikely to hit
// this in testnet, but still logically necessary.
if epoch . Cmp ( common . Big0 ) < 0 || epoch . Cmp ( BigMaxUint64 ) > 0 {
return ctxerror . New ( "block epoch out of range" ,
"epoch" , block . Header ( ) . Epoch )
}
epochLastBlockNum := core . GetLastBlockNumberFromEpoch ( epoch . Uint64 ( ) )
epochLastBlock := node . beaconChain . GetBlockByNumber ( epochLastBlockNum )
if epochLastBlock == nil {
// TODO ek - restore this check once the leader is made to delay
// proposal until it thinks that the quorum has synchronized
// through the end of beacon chain.
// See the corresponding to-do in proposeLocalShardState.
//return ctxerror.New("cannot find epoch-last block of beacon chain",
// "epoch", block.Header().Epoch,
// "epochLastBlockNum", epochLastBlockNum)
// For now just agree to the leader proposal if we aren't sure.
return nil
}
masterProposal := epochLastBlock . Header ( ) . ShardState
//
// TODO ek – fetch masterProposal from beaconchain instead
masterProposal := node . nextShardState . master . ShardState
expected := masterProposal . FindCommitteeByID ( block . ShardID ( ) )
if expected == nil {
// The beacon committee “disowned” our shard,
// which means that this is the last epoch for us for now.
// The local proposal should reflect this by having an empty
// table with no leader.
if len ( proposed . NodeList ) != 0 {
// TODO ek – this error should trigger view change
switch len ( proposed ) {
case 0 :
// Proposal to discontinue shard
if expected != nil {
// TODO ek – invoke view change
return errors . New (
"leader proposed to continue against beacon decision" )
"leader proposed to disband against beacon decision" )
}
case 1 :
// Proposal to continue shard
proposed := proposed [ 0 ]
// Sanity check: Shard ID should match
if proposed . ShardID != block . ShardID ( ) {
// TODO ek – invoke view change
return ctxerror . New ( "proposal has incorrect shard ID" ,
"proposedShard" , proposed . ShardID ,
"blockShard" , block . ShardID ( ) )
}
if types . CompareNodeID ( & proposed . Leader , & types . NodeID { } ) != 0 {
// TODO ek – this error should trigger view change
// Did beaconchain say we are no more?
if expected == nil {
// TODO ek – invoke view change
return errors . New (
"leader proposed empty committee with non-empty leader" )
"leader proposed to continue against beacon decision" )
}
} else if types . CompareCommittee ( expected , proposed ) != 0 {
// TODO ek – log differences
// TODO ek – this error should trigger view change
return errors . New ( "proposal differs from one in beacon chain" )
// Did beaconchain say the same proposal?
if types . CompareCommittee ( expected , & proposed ) != 0 {
// TODO ek – log differences
// TODO ek – invoke view change
return errors . New ( "proposal differs from one in beacon chain" )
}
default :
// TODO ek – invoke view change
return ctxerror . New (
"regular resharding proposal has incorrect number of shards" ,
"numShards" , len ( proposed ) )
}
}
return nil
@ -418,35 +419,37 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
// TODO: update staking information once per epoch.
node . UpdateStakingList ( node . QueryStakeInfo ( ) )
node . printStakingList ( )
// TODO ek – this is a temp hack until beacon chain sync is fixed
if len ( newBlock . Header ( ) . ShardState ) > 0 && node . Consensus . IsLeader {
}
newBlockHeader := newBlock . Header ( )
if newBlockHeader . ShardStateHash != ( common . Hash { } ) {
if node . Consensus . ShardID == 0 {
// TODO ek – this is a temp hack until beacon chain sync is fixed
// End-of-epoch block on beacon chain; block's EpochState is the
// master resharding table. Broadcast it to the network.
epochShardStateMessage := proto_node . ConstructEpochShardStateMessage (
types . EpochShardState {
Epoch : newBlock . Header ( ) . Epoch . Uint64 ( ) + 1 ,
ShardState : newBlock . Header ( ) . ShardState ,
} ,
)
err := node . host . SendMessageToGroups (
[ ] p2p . GroupID { node . NodeConfig . GetClientGroupID ( ) } ,
host . ConstructP2pMessage ( byte ( 0 ) , epochShardStateMessage ) )
if err != nil {
if err := node . broadcastEpochShardState ( newBlock ) ; err != nil {
e := ctxerror . New ( "cannot broadcast shard state" ) . WithCause ( err )
ctxerror . Log15 ( utils . GetLogInstance ( ) . Error , e )
}
}
}
if core . IsEpochLastBlock ( newBlock ) {
// TODO ek – wait for beacon chain's last block to be available
// TODO ek - retrieve the global resharding assignment
// TODO ek – if needed, (start to) move to another shard
node . transitionIntoNextEpoch ( newBlockHeader . ShardState )
}
}
func ( node * Node ) broadcastEpochShardState ( newBlock * types . Block ) error {
epochShardStateMessage := proto_node . ConstructEpochShardStateMessage (
types . EpochShardState {
Epoch : newBlock . Header ( ) . Epoch . Uint64 ( ) + 1 ,
ShardState : newBlock . Header ( ) . ShardState ,
} ,
)
return node . host . SendMessageToGroups (
[ ] p2p . GroupID { node . NodeConfig . GetClientGroupID ( ) } ,
host . ConstructP2pMessage ( byte ( 0 ) , epochShardStateMessage ) )
}
// AddNewBlock is usedd to add new block into the blockchain.
func ( node * Node ) AddNewBlock ( newBlock * types . Block ) {
blockNum , err := node . blockchain . InsertChain ( [ ] * types . Block { newBlock } )
blockNum , err := node . Blockchain ( ) . InsertChain ( [ ] * types . Block { newBlock } )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Error adding new block to blockchain" , "blockNum" , blockNum , "hash" , newBlock . Header ( ) . Hash ( ) , "Error" , err )
} else {
@ -583,7 +586,10 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
}
if pong . ShardID != node . Consensus . ShardID {
utils . GetLogInstance ( ) . Error ( "Received Pong message for the wrong shard" , "receivedShardID" , pong . ShardID )
utils . GetLogInstance ( ) . Error (
"Received Pong message for the wrong shard" ,
"receivedShardID" , pong . ShardID ,
"expectedShardID" , node . Consensus . ShardID )
return 0
}
@ -657,89 +663,123 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return 0
}
func ( node * Node ) epochShardStateMessageHandler ( msgPayload [ ] byte ) int {
utils . GetLogInstance ( ) . Error ( "[Received new shard state]" )
func ( node * Node ) epochShardStateMessageHandler ( msgPayload [ ] byte ) error {
logger := utils . GetLogInstance ( )
getLogger := func ( ) log . Logger { return utils . WithCallerSkip ( logger , 1 ) }
epochShardState , err := proto_node . DeserializeEpochShardStateFromMessage ( msgPayload )
if err != nil {
utils . GetLogInstance ( ) . Error ( "Can't get shard state Message" , "error" , err )
return - 1
return ctxerror . New ( "Can't get shard state message" ) . WithCause ( err )
}
if ( node . Consensus ! = nil && node . Consensus . ShardID != 0 ) || node . NodeConfig . Role ( ) = = nodeconfig . NewNode {
node . processEpochShardState ( epochShardState )
if node . Consensus = = nil && node . NodeConfig . Role ( ) ! = nodeconfig . NewNode {
return nil
}
return 0
// Remember the master sharding state if the epoch ID matches.
curEpoch := node . Blockchain ( ) . CurrentBlock ( ) . Header ( ) . Epoch
expectedEpoch := new ( big . Int ) . Add ( curEpoch , common . Big1 )
receivedEpoch := big . NewInt ( int64 ( epochShardState . Epoch ) )
if receivedEpoch . Cmp ( expectedEpoch ) != 0 {
return ctxerror . New ( "invalid epoch in epoch shard state message" ,
"receivedEpoch" , receivedEpoch ,
"expectedEpoch" , expectedEpoch )
}
getLogger ( ) . Info ( "received new shard state" , "epoch" , receivedEpoch )
node . nextShardState . master = epochShardState
if node . Consensus . IsLeader {
// Wait a bit to allow the master table to reach other validators.
node . nextShardState . proposeTime = time . Now ( ) . Add ( 5 * time . Second )
} else {
// Wait a bit to allow the master table to reach the leader,
// and to allow the leader to propose next shard state based upon it.
node . nextShardState . proposeTime = time . Now ( ) . Add ( 15 * time . Second )
}
// TODO ek – this should be done from replaying beaconchain once
// beaconchain sync is fixed
err = node . Beaconchain ( ) . WriteShardState (
receivedEpoch , epochShardState . ShardState )
if err != nil {
return ctxerror . New ( "cannot store shard state" , "epoch" , receivedEpoch ) .
WithCause ( err )
}
return nil
}
func ( node * Node ) processEpochShardState ( epochShardState * types . EpochShardState ) {
shardState := epochShardState . ShardState
epoch := epochShardState . Epoch
func ( node * Node ) transitionIntoNextEpoch ( shardState types . ShardState ) {
logger := utils . GetLogInstance ( )
getLogger := func ( ) log . Logger { return utils . WithCallerSkip ( logger , 1 ) }
logger = logger . New (
"blsPubKey" , hex . EncodeToString ( node . Consensus . PubKey . Serialize ( ) ) ,
"curShard" , node . Blockchain ( ) . ShardID ( ) ,
"curLeader" , node . Consensus . IsLeader )
for _ , c := range shardState {
utils . GetLogInstance ( ) . Debug ( "new shard information" , "shardID" , c . ShardID , "NodeList" , c . NodeList )
logger . Debug ( "new shard information" ,
"shardID" , c . ShardID ,
"nodeList" , c . NodeList )
}
myShardID , isNextLeader := findRoleInShardState (
node . Consensus . PubKey , shardState )
logger = logger . New (
"nextShard" , myShardID ,
"nextLeader" , isNextLeader )
if myShardID == math . MaxUint32 {
getLogger ( ) . Info ( "Somehow I got kicked out. Exiting" )
os . Exit ( 8 ) // 8 represents it's a loop and the program restart itself
}
myShardID := uint32 ( math . MaxUint32 )
isNextLeader := false
myBlsPubKey := node . Consensus . PubKey . Serialize ( )
myShardState := types . Committee { }
for _ , shard := range shardState {
for _ , nodeID := range shard . NodeList {
if bytes . Compare ( nodeID . BlsPublicKey [ : ] , myBlsPubKey ) == 0 {
myShardID = shard . ShardID
isNextLeader = shard . Leader == nodeID
myShardState = shard
}
myShardState := shardState [ myShardID ]
// Update public keys
var publicKeys [ ] * bls . PublicKey
for idx , nodeID := range myShardState . NodeList {
key := & bls . PublicKey { }
err := key . Deserializ e( nodeID . BlsPublicKey [ : ] )
if err != nil {
getLogger ( ) . Error ( "Failed to deserialize BLS public key in shard state" ,
"idx" , idx ,
"error" , err )
}
publicKeys = append ( publicKeys , key )
}
node . Consensus . UpdatePublicKeys ( publicKeys )
node . DRand . UpdatePublicKeys ( publicKeys )
if myShardID != uint32 ( math . MaxUint32 ) {
// Update public keys
ss := myShardState
publicKeys := [ ] * bls . PublicKey { }
for _ , nodeID := range ss . NodeList {
key := & bls . PublicKey { }
err := key . Deserialize ( nodeID . BlsPublicKey [ : ] )
if err != nil {
utils . GetLogInstance ( ) . Error ( "Failed to deserialize BLS public key in shard state" , "error" , err )
}
publicKeys = append ( publicKeys , key )
if node . Blockchain ( ) . ShardID ( ) == myShardID {
getLogger ( ) . Info ( "staying in the same shard" )
} else {
getLogger ( ) . Info ( "moving to another shard" )
if err := node . shardChains . Close ( ) ; err != nil {
getLogger ( ) . Error ( "cannot close shard chains" , "error" , err )
}
node . Consensus . UpdatePublicKeys ( publicKeys )
node . DRand . UpdatePublicKeys ( publicKeys )
restartProcess ( getRestartArguments ( myShardID ) )
}
}
aboutLeader := ""
if nodeconfig . GetDefaultConfig ( ) . IsLeader ( ) {
aboutLeader = "I am not leader anymore"
if isNextLeader {
aboutLeader = "I am still leader"
}
} else {
aboutLeader = "I am still validator"
if isNextLeader {
aboutLeader = "I become the leader"
func findRoleInShardState (
key * bls . PublicKey , state types . ShardState ,
) ( shardID uint32 , isLeader bool ) {
keyBytes := key . Serialize ( )
for idx , shard := range state {
for _ , nodeID := range shard . NodeList {
if bytes . Compare ( nodeID . BlsPublicKey [ : ] , keyBytes ) == 0 {
return uint32 ( idx ) , shard . Leader == nodeID
}
}
if node . blockchain . ShardID ( ) == myShardID {
utils . GetLogInstance ( ) . Info ( fmt . Sprintf ( "[Resharded][epoch:%d] I stay at shard %d, %s" , epoch , myShardID , aboutLeader ) , "BlsPubKey" , hex . EncodeToString ( myBlsPubKey ) )
} else {
utils . GetLogInstance ( ) . Info ( fmt . Sprintf ( "[Resharded][epoch:%d] I got resharded to shard %d from shard %d, %s" , epoch , myShardID , node . blockchain . ShardID ( ) , aboutLeader ) , "BlsPubKey" , hex . EncodeToString ( myBlsPubKey ) )
node . storeEpochShardState ( epochShardState )
}
return math . MaxUint32 , false
}
execFile , err := getBinaryPath ( )
if err != nil {
utils . GetLogInstance ( ) . Crit ( "Failed to get program path when restarting program" , "error" , err , "file" , execFile )
}
args := getRestartArguments ( myShardID )
utils . GetLogInstance ( ) . Info ( "Restarting program" , "args" , args , "env" , os . Environ ( ) )
err = syscall . Exec ( execFile , args , os . Environ ( ) )
if err != nil {
utils . GetLogInstance ( ) . Crit ( "Failed to restart program after resharding" , "error" , err )
}
}
} else {
utils . GetLogInstance ( ) . Info ( fmt . Sprintf ( "[Resharded][epoch:%d] Somehow I got kicked out. Exiting" , epoch ) , "BlsPubKey" , hex . EncodeToString ( myBlsPubKey ) )
os . Exit ( 8 ) // 8 represents it's a loop and the program restart itself
func restartProcess ( args [ ] string ) {
execFile , err := getBinaryPath ( )
if err != nil {
utils . GetLogInstance ( ) . Crit ( "Failed to get program path when restarting program" , "error" , err , "file" , execFile )
}
utils . GetLogInstance ( ) . Info ( "Restarting program" , "args" , args , "env" , os . Environ ( ) )
err = syscall . Exec ( execFile , args , os . Environ ( ) )
if err != nil {
utils . GetLogInstance ( ) . Crit ( "Failed to restart program after resharding" , "error" , err )
}
panic ( "syscall.Exec() is not supposed to return" )
}
func getRestartArguments ( myShardID uint32 ) [ ] string {
@ -780,38 +820,6 @@ func getBinaryPath() (argv0 string, err error) {
return
}
// Stores the epoch shard state into local file
// TODO: think about storing it into level db.
func ( node * Node ) storeEpochShardState ( epochShardState * types . EpochShardState ) {
byteBuffer := bytes . NewBuffer ( [ ] byte { } )
encoder := gob . NewEncoder ( byteBuffer )
err := encoder . Encode ( epochShardState )
if err != nil {
utils . GetLogInstance ( ) . Error ( "[Resharded] Failed to encode epoch shard state" , "error" , err )
}
err = ioutil . WriteFile ( "./epoch_shard_state" + node . SelfPeer . IP + node . SelfPeer . Port , byteBuffer . Bytes ( ) , 0644 )
if err != nil {
utils . GetLogInstance ( ) . Error ( "[Resharded] Failed to store epoch shard state in local file" , "error" , err )
}
}
func ( node * Node ) retrieveEpochShardState ( ) ( * types . EpochShardState , error ) {
b , err := ioutil . ReadFile ( "./epoch_shard_state" + node . SelfPeer . IP + node . SelfPeer . Port )
if err != nil {
utils . GetLogInstance ( ) . Error ( "[Resharded] Failed to retrieve epoch shard state" , "error" , err )
}
epochShardState := new ( types . EpochShardState )
r := bytes . NewBuffer ( b )
decoder := gob . NewDecoder ( r )
err = decoder . Decode ( epochShardState )
if err != nil {
return nil , fmt . Errorf ( "Decode local epoch shard state error" )
}
return epochShardState , nil
}
// ConsensusMessageHandler passes received message in node_handler to consensus
func ( node * Node ) ConsensusMessageHandler ( msgPayload [ ] byte ) {
if node . Consensus . ConsensusVersion == "v1" {