@ -3,11 +3,10 @@ package node
import (
import (
"bytes"
"bytes"
"context"
"context"
"encoding/gob"
"encoding/hex"
"encoding/hex"
"fmt"
"errors"
"io/ioutil"
"math"
"math"
"math/big"
"os"
"os"
"os/exec"
"os/exec"
"strconv"
"strconv"
@ -15,10 +14,9 @@ import (
"syscall"
"syscall"
"time"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/core"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rlp"
pb "github.com/golang/protobuf/proto"
pb "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/bls/ffi/go/bls"
@ -28,9 +26,12 @@ import (
"github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/contracts/structs"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/crypto/pki"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/host"
@ -177,7 +178,9 @@ func (node *Node) messageHandler(content []byte, sender string) {
case proto_node . PONG :
case proto_node . PONG :
node . pongMessageHandler ( msgPayload )
node . pongMessageHandler ( msgPayload )
case proto_node . ShardState :
case proto_node . ShardState :
node . epochShardStateMessageHandler ( msgPayload )
if err := node . epochShardStateMessageHandler ( msgPayload ) ; err != nil {
ctxerror . Log15 ( utils . GetLogger ( ) . Warn , err )
}
}
}
default :
default :
utils . GetLogInstance ( ) . Error ( "Unknown" , "MsgCategory" , msgCategory )
utils . GetLogInstance ( ) . Error ( "Unknown" , "MsgCategory" , msgCategory )
@ -248,21 +251,127 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
}
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func ( node * Node ) VerifyNewBlock ( newBlock * types . Block ) bool {
func ( node * Node ) VerifyNewBlock ( newBlock * types . Block ) error {
err := node . blockchain . ValidateNewBlock ( newBlock , pki . GetAddressFromPublicKey ( node . SelfPeer . ConsensusPubKey ) )
// TODO ek – where do we verify parent-child invariants,
// e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"?
err := node . Blockchain ( ) . ValidateNewBlock ( newBlock , pki . GetAddressFromPublicKey ( node . SelfPeer . ConsensusPubKey ) )
if err != nil {
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Failed verifying new block" , "Error" , err , "tx" , newBlock . Transactions ( ) [ 0 ] )
return ctxerror . New ( "cannot ValidateNewBlock" ,
return false
"blockHash" , newBlock . Hash ( ) ,
"numTx" , len ( newBlock . Transactions ( ) ) ,
) . WithCause ( err )
}
}
// TODO: verify the vrf randomness
// TODO: verify the vrf randomness
_ = newBlock . Header ( ) . RandPreimage
_ = newBlock . Header ( ) . RandPreimage
err = node . blockchain . V alidateNewShardState( newBlock , & node . CurrentStakes )
err = node . v alidateNewShardState( newBlock , & node . CurrentStakes )
if err != nil {
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Failed to verify new sharding state" , "err" , err )
return ctxerror . New ( "failed to verify sharding state" ) . WithCause ( err )
}
}
return true
return nil
}
// BigMaxUint64 is maximum possible uint64 value, that is, (1**64)-1.
var BigMaxUint64 = new ( big . Int ) . SetBytes ( [ ] byte {
255 , 255 , 255 , 255 , 255 , 255 , 255 , 255 ,
} )
// validateNewShardState validate whether the new shard state root matches
func ( node * Node ) validateNewShardState ( block * types . Block , stakeInfo * map [ common . Address ] * structs . StakeInfo ) error {
// Common case first – blocks without resharding proposal
header := block . Header ( )
if header . ShardStateHash == ( common . Hash { } ) {
// No new shard state was proposed
if block . ShardID ( ) == 0 {
if core . IsEpochLastBlock ( block ) {
// TODO ek - invoke view change
return errors . New ( "beacon leader did not propose resharding" )
}
} else {
if node . nextShardState . master != nil &&
! time . Now ( ) . Before ( node . nextShardState . proposeTime ) {
// TODO ek – invoke view change
return errors . New ( "regular leader did not propose resharding" )
}
}
// We aren't expecting to reshard, so proceed to sign
return nil
}
proposed := header . ShardState
if block . ShardID ( ) == 0 {
// Beacon validators independently recalculate the master state and
// compare it against the proposed copy.
nextEpoch := new ( big . Int ) . Add ( block . Header ( ) . Epoch , common . Big1 )
// TODO ek – this may be called from regular shards,
// for vetting beacon chain blocks received during block syncing.
// DRand may or or may not get in the way. Test this out.
expected , err := core . CalculateNewShardState (
node . Blockchain ( ) , nextEpoch , stakeInfo )
if err != nil {
return ctxerror . New ( "cannot calculate expected shard state" ) .
WithCause ( err )
}
if types . CompareShardState ( expected , proposed ) != 0 {
// TODO ek – log state proposal differences
// TODO ek – this error should trigger view change
err := errors . New ( "shard state proposal is different from expected" )
// TODO ek/chao – calculated shard state is different even with the
// same input, i.e. it is nondeterministic.
// Don't treat this as a blocker until we fix the nondeterminism.
//return err
ctxerror . Log15 ( utils . GetLogger ( ) . Warn , err )
}
} else {
// Regular validators fetch the local-shard copy on the beacon chain
// and compare it against the proposed copy.
//
// We trust the master proposal in our copy of beacon chain.
// The sanity check for the master proposal is done earlier,
// when the beacon block containing the master proposal is received
// and before it is admitted into the local beacon chain.
//
// TODO ek – fetch masterProposal from beaconchain instead
masterProposal := node . nextShardState . master . ShardState
expected := masterProposal . FindCommitteeByID ( block . ShardID ( ) )
switch len ( proposed ) {
case 0 :
// Proposal to discontinue shard
if expected != nil {
// TODO ek – invoke view change
return errors . New (
"leader proposed to disband against beacon decision" )
}
case 1 :
// Proposal to continue shard
proposed := proposed [ 0 ]
// Sanity check: Shard ID should match
if proposed . ShardID != block . ShardID ( ) {
// TODO ek – invoke view change
return ctxerror . New ( "proposal has incorrect shard ID" ,
"proposedShard" , proposed . ShardID ,
"blockShard" , block . ShardID ( ) )
}
// Did beaconchain say we are no more?
if expected == nil {
// TODO ek – invoke view change
return errors . New (
"leader proposed to continue against beacon decision" )
}
// Did beaconchain say the same proposal?
if types . CompareCommittee ( expected , & proposed ) != 0 {
// TODO ek – log differences
// TODO ek – invoke view change
return errors . New ( "proposal differs from one in beacon chain" )
}
default :
// TODO ek – invoke view change
return ctxerror . New (
"regular resharding proposal has incorrect number of shards" ,
"numShards" , len ( proposed ) )
}
}
return nil
}
}
// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
@ -315,29 +424,37 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
// TODO: update staking information once per epoch.
// TODO: update staking information once per epoch.
node . UpdateStakingList ( node . QueryStakeInfo ( ) )
node . UpdateStakingList ( node . QueryStakeInfo ( ) )
node . printStakingList ( )
node . printStakingList ( )
if core . IsEpochBlock ( newBlock ) {
}
shardState := node . blockchain . StoreNewShardState ( newBlock , & node . CurrentStakes )
newBlockHeader := newBlock . Header ( )
if shardState != nil {
if newBlockHeader . ShardStateHash != ( common . Hash { } ) {
if nodeconfig . GetDefaultConfig ( ) . IsLeader ( ) {
if node . Consensus . ShardID == 0 {
epochShardState := types . EpochShardState { Epoch : core . GetEpochFromBlockNumber ( newBlock . NumberU64 ( ) ) , ShardState : shardState }
// TODO ek – this is a temp hack until beacon chain sync is fixed
epochShardStateMessage := proto_node . ConstructEpochShardStateMessage ( epochShardState )
// End-of-epoch block on beacon chain; block's EpochState is the
// Broadcast new shard state
// master resharding table. Broadcast it to the network.
err := node . host . SendMessageToGroups ( [ ] p2p . GroupID { node . NodeConfig . GetClientGroupID ( ) } , host . ConstructP2pMessage ( byte ( 0 ) , epochShardStateMessage ) )
if err := node . broadcastEpochShardState ( newBlock ) ; err != nil {
if err != nil {
e := ctxerror . New ( "cannot broadcast shard state" ) . WithCause ( err )
utils . GetLogInstance ( ) . Error ( "[Resharding] failed to broadcast shard state message" , "group" , node . NodeConfig . GetClientGroupID ( ) )
ctxerror . Log15 ( utils . GetLogInstance ( ) . Error , e )
} else {
utils . GetLogInstance ( ) . Info ( "[Resharding] broadcasted shard state message to" , "group" , node . NodeConfig . GetClientGroupID ( ) )
}
}
node . processEpochShardState ( & types . EpochShardState { Epoch : core . GetEpochFromBlockNumber ( newBlock . NumberU64 ( ) ) , ShardState : shardState } )
}
}
}
}
node . transitionIntoNextEpoch ( newBlockHeader . ShardState )
}
}
}
}
func ( node * Node ) broadcastEpochShardState ( newBlock * types . Block ) error {
epochShardStateMessage := proto_node . ConstructEpochShardStateMessage (
types . EpochShardState {
Epoch : newBlock . Header ( ) . Epoch . Uint64 ( ) + 1 ,
ShardState : newBlock . Header ( ) . ShardState ,
} ,
)
return node . host . SendMessageToGroups (
[ ] p2p . GroupID { node . NodeConfig . GetClientGroupID ( ) } ,
host . ConstructP2pMessage ( byte ( 0 ) , epochShardStateMessage ) )
}
// AddNewBlock is usedd to add new block into the blockchain.
// AddNewBlock is usedd to add new block into the blockchain.
func ( node * Node ) AddNewBlock ( newBlock * types . Block ) {
func ( node * Node ) AddNewBlock ( newBlock * types . Block ) {
blockNum , err := node . blockchain . InsertChain ( [ ] * types . Block { newBlock } )
blockNum , err := node . Blockchain ( ) . InsertChain ( [ ] * types . Block { newBlock } )
if err != nil {
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Error adding new block to blockchain" , "blockNum" , blockNum , "hash" , newBlock . Header ( ) . Hash ( ) , "Error" , err )
utils . GetLogInstance ( ) . Debug ( "Error adding new block to blockchain" , "blockNum" , blockNum , "hash" , newBlock . Header ( ) . Hash ( ) , "Error" , err )
} else {
} else {
@ -473,7 +590,10 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
}
}
if pong . ShardID != node . Consensus . ShardID {
if pong . ShardID != node . Consensus . ShardID {
utils . GetLogInstance ( ) . Error ( "Received Pong message for the wrong shard" , "receivedShardID" , pong . ShardID )
utils . GetLogInstance ( ) . Error (
"Received Pong message for the wrong shard" ,
"receivedShardID" , pong . ShardID ,
"expectedShardID" , node . Consensus . ShardID )
return 0
return 0
}
}
@ -547,89 +667,115 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return 0
return 0
}
}
func ( node * Node ) epochShardStateMessageHandler ( msgPayload [ ] byte ) int {
func ( node * Node ) epochShardStateMessageHandler ( msgPayload [ ] byte ) error {
utils . GetLogInstance ( ) . Error ( "[Received new shard state]" )
logger := utils . GetLogInstance ( )
getLogger := func ( ) log . Logger { return utils . WithCallerSkip ( logger , 1 ) }
epochShardState , err := proto_node . DeserializeEpochShardStateFromMessage ( msgPayload )
epochShardState , err := proto_node . DeserializeEpochShardStateFromMessage ( msgPayload )
if err != nil {
if err != nil {
utils . GetLogInstance ( ) . Error ( "Can't get shard state Message" , "error" , err )
return ctxerror . New ( "Can't get shard state message" ) . WithCause ( err )
return - 1
}
}
if ( node . Consensus ! = nil && node . Consensus . ShardID != 0 ) || node . NodeConfig . Role ( ) = = nodeconfig . NewNode {
if node . Consensus = = nil && node . NodeConfig . Role ( ) ! = nodeconfig . NewNode {
node . processEpochShardState ( epochShardState )
return nil
}
}
return 0
receivedEpoch := big . NewInt ( int64 ( epochShardState . Epoch ) )
getLogger ( ) . Info ( "received new shard state" , "epoch" , receivedEpoch )
node . nextShardState . master = epochShardState
if node . NodeConfig . IsLeader ( ) {
// Wait a bit to allow the master table to reach other validators.
node . nextShardState . proposeTime = time . Now ( ) . Add ( 5 * time . Second )
} else {
// Wait a bit to allow the master table to reach the leader,
// and to allow the leader to propose next shard state based upon it.
node . nextShardState . proposeTime = time . Now ( ) . Add ( 15 * time . Second )
}
// TODO ek – this should be done from replaying beaconchain once
// beaconchain sync is fixed
err = node . Beaconchain ( ) . WriteShardState (
receivedEpoch , epochShardState . ShardState )
if err != nil {
return ctxerror . New ( "cannot store shard state" , "epoch" , receivedEpoch ) .
WithCause ( err )
}
return nil
}
}
func ( node * Node ) processEpochShardState ( epochShardState * types . EpochShardState ) {
func ( node * Node ) transitionIntoNextEpoch ( shardState types . ShardState ) {
shardState := epochShardState . ShardState
logger := utils . GetLogInstance ( )
epoch := epochShardState . Epoch
getLogger := func ( ) log . Logger { return utils . WithCallerSkip ( logger , 1 ) }
logger = logger . New (
"blsPubKey" , hex . EncodeToString ( node . Consensus . PubKey . Serialize ( ) ) ,
"curShard" , node . Blockchain ( ) . ShardID ( ) ,
"curLeader" , node . NodeConfig . IsLeader ( ) )
for _ , c := range shardState {
for _ , c := range shardState {
utils . GetLogInstance ( ) . Debug ( "new shard information" , "shardID" , c . ShardID , "NodeList" , c . NodeList )
logger . Debug ( "new shard information" ,
}
"shardID" , c . ShardID ,
"nodeList" , c . NodeList )
myShardID := uint32 ( math . MaxUint32 )
}
isNextLeader := false
myShardID , isNextLeader := findRoleInShardState (
myBlsPubKey := node . Consensus . PubKey . Serialize ( )
node . Consensus . PubKey , shardState )
myShardState := types . Committee { }
logger = logger . New (
for _ , shard := range shardState {
"nextShard" , myShardID ,
for _ , nodeID := range shard . NodeList {
"nextLeader" , isNextLeader )
if bytes . Compare ( nodeID . BlsPublicKey [ : ] , myBlsPubKey ) == 0 {
myShardID = shard . ShardID
if myShardID == math . MaxUint32 {
isNextLeader = shard . Leader == nodeID
getLogger ( ) . Info ( "Somehow I got kicked out. Exiting" )
myShardState = shard
os . Exit ( 8 ) // 8 represents it's a loop and the program restart itself
}
}
}
}
if myShardID != uint32 ( math . MaxUint32 ) {
myShardState := shardState [ myShardID ]
// Update public keys
ss := myShardState
// Update public keys
publicKeys := [ ] * bls . PublicKey { }
var publicKeys [ ] * bls . PublicKey
for _ , nodeID := range ss . NodeList {
for idx , nodeID := range myShardState . NodeList {
key := & bls . PublicKey { }
key := & bls . PublicKey { }
err := key . Deserialize ( nodeID . BlsPublicKey [ : ] )
err := key . Deserialize ( nodeID . BlsPublicKey [ : ] )
if err != nil {
if err != nil {
utils . GetLogInstance ( ) . Error ( "Failed to deserialize BLS public key in shard state" , "error" , err )
getLogger ( ) . Error ( "Failed to deserialize BLS public key in shard state" ,
}
"idx" , idx ,
publicKeys = append ( publicKeys , key )
"error" , err )
}
}
node . Consensus . UpdatePublicKeys ( publicKeys )
publicKeys = append ( publicKeys , key )
node . DRand . UpdatePublicKeys ( publicKeys )
}
node . Consensus . UpdatePublicKeys ( publicKeys )
node . DRand . UpdatePublicKeys ( publicKeys )
aboutLeader := ""
if node . Blockchain ( ) . ShardID ( ) == myShardID {
if nodeconfig . GetDefaultConfig ( ) . IsLeader ( ) {
getLogger ( ) . Info ( "staying in the same shard" )
aboutLeader = "I am not leader anymore"
} else {
if isNextLeader {
getLogger ( ) . Info ( "moving to another shard" )
aboutLeader = "I am still leader"
if err := node . shardChains . Close ( ) ; err != nil {
}
getLogger ( ) . Error ( "cannot close shard chains" , "error" , err )
} else {
aboutLeader = "I am still validator"
if isNextLeader {
aboutLeader = "I become the leader"
}
}
}
if node . blockchain . ShardID ( ) == myShardID {
restartProcess ( getRestartArguments ( myShardID ) )
utils . GetLogInstance ( ) . Info ( fmt . Sprintf ( "[Resharded][epoch:%d] I stay at shard %d, %s" , epoch , myShardID , aboutLeader ) , "BlsPubKey" , hex . EncodeToString ( myBlsPubKey ) )
}
} else {
}
utils . GetLogInstance ( ) . Info ( fmt . Sprintf ( "[Resharded][epoch:%d] I got resharded to shard %d from shard %d, %s" , epoch , myShardID , node . blockchain . ShardID ( ) , aboutLeader ) , "BlsPubKey" , hex . EncodeToString ( myBlsPubKey ) )
node . storeEpochShardState ( epochShardState )
execFile , err := getBinaryPath ( )
func findRoleInShardState (
if err != nil {
key * bls . PublicKey , state types . ShardState ,
utils . GetLogInstance ( ) . Crit ( "Failed to get program path when restarting program" , "error" , err , "file" , execFile )
) ( shardID uint32 , isLeader bool ) {
}
keyBytes := key . Serialize ( )
args := getRestartArguments ( myShardID )
for idx , shard := range state {
utils . GetLogInstance ( ) . Info ( "Restarting program" , "args" , args , "env" , os . Environ ( ) )
for nodeIdx , nodeID := range shard . NodeList {
err = syscall . Exec ( execFile , args , os . Environ ( ) )
if bytes . Compare ( nodeID . BlsPublicKey [ : ] , keyBytes ) == 0 {
if err != nil {
return uint32 ( idx ) , nodeIdx == 0
utils . GetLogInstance ( ) . Crit ( "Failed to restart program after resharding" , "error" , err )
}
}
}
}
} else {
utils . GetLogInstance ( ) . Info ( fmt . Sprintf ( "[Resharded][epoch:%d] Somehow I got kicked out. Exiting" , epoch ) , "BlsPubKey" , hex . EncodeToString ( myBlsPubKey ) )
os . Exit ( 8 ) // 8 represents it's a loop and the program restart itself
}
}
return math . MaxUint32 , false
}
func restartProcess ( args [ ] string ) {
execFile , err := getBinaryPath ( )
if err != nil {
utils . GetLogInstance ( ) . Crit ( "Failed to get program path when restarting program" , "error" , err , "file" , execFile )
}
utils . GetLogInstance ( ) . Info ( "Restarting program" , "args" , args , "env" , os . Environ ( ) )
err = syscall . Exec ( execFile , args , os . Environ ( ) )
if err != nil {
utils . GetLogInstance ( ) . Crit ( "Failed to restart program after resharding" , "error" , err )
}
panic ( "syscall.Exec() is not supposed to return" )
}
}
func getRestartArguments ( myShardID uint32 ) [ ] string {
func getRestartArguments ( myShardID uint32 ) [ ] string {
@ -670,38 +816,6 @@ func getBinaryPath() (argv0 string, err error) {
return
return
}
}
// Stores the epoch shard state into local file
// TODO: think about storing it into level db.
func ( node * Node ) storeEpochShardState ( epochShardState * types . EpochShardState ) {
byteBuffer := bytes . NewBuffer ( [ ] byte { } )
encoder := gob . NewEncoder ( byteBuffer )
err := encoder . Encode ( epochShardState )
if err != nil {
utils . GetLogInstance ( ) . Error ( "[Resharded] Failed to encode epoch shard state" , "error" , err )
}
err = ioutil . WriteFile ( "./epoch_shard_state" + node . SelfPeer . IP + node . SelfPeer . Port , byteBuffer . Bytes ( ) , 0644 )
if err != nil {
utils . GetLogInstance ( ) . Error ( "[Resharded] Failed to store epoch shard state in local file" , "error" , err )
}
}
func ( node * Node ) retrieveEpochShardState ( ) ( * types . EpochShardState , error ) {
b , err := ioutil . ReadFile ( "./epoch_shard_state" + node . SelfPeer . IP + node . SelfPeer . Port )
if err != nil {
utils . GetLogInstance ( ) . Error ( "[Resharded] Failed to retrieve epoch shard state" , "error" , err )
}
epochShardState := new ( types . EpochShardState )
r := bytes . NewBuffer ( b )
decoder := gob . NewDecoder ( r )
err = decoder . Decode ( epochShardState )
if err != nil {
return nil , fmt . Errorf ( "Decode local epoch shard state error" )
}
return epochShardState , nil
}
// ConsensusMessageHandler passes received message in node_handler to consensus
// ConsensusMessageHandler passes received message in node_handler to consensus
func ( node * Node ) ConsensusMessageHandler ( msgPayload [ ] byte ) {
func ( node * Node ) ConsensusMessageHandler ( msgPayload [ ] byte ) {
if node . Consensus . ConsensusVersion == "v1" {
if node . Consensus . ConsensusVersion == "v1" {