Merge pull request #601 from alajko/copy_backup_node

Archival node setup
pull/607/head
alajko 6 years ago committed by GitHub
commit b1065dfbf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 0
      cmd/harmony/ArchivalNode.md
  2. 39
      cmd/harmony/main.go
  3. 17
      internal/configs/node/config.go
  4. 7
      node/node.go
  5. 6
      node/node_handler.go
  6. 4
      node/node_syncing.go
  7. 8
      node/service_setup.go
  8. 2
      test/configs/oneshard1.txt
  9. 4
      test/deploy.sh

@ -10,17 +10,16 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/internal/utils/contract"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/drand"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/profiler" "github.com/harmony-one/harmony/internal/profiler"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/contract"
"github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/p2p/p2pimpl"
@ -85,7 +84,9 @@ var (
keyFile = flag.String("key", "./.hmykey", "the private key file of the harmony node") keyFile = flag.String("key", "./.hmykey", "the private key file of the harmony node")
// isBeacon indicates this node is a beacon chain node // isBeacon indicates this node is a beacon chain node
isBeacon = flag.Bool("is_beacon", false, "true means this node is a beacon chain node") isBeacon = flag.Bool("is_beacon", false, "true means this node is a beacon chain node")
// isNewNode indicates this node is a new node // isArchival indicates this node is an archival node that will save and archive current blockchain
isArchival = flag.Bool("is_archival", false, "true means this node is a archival node")
//isNewNode indicates this node is a new node
isNewNode = flag.Bool("is_newnode", false, "true means this node is a new node") isNewNode = flag.Bool("is_newnode", false, "true means this node is a new node")
accountIndex = flag.Int("account_index", 0, "the index of the staking account to use") accountIndex = flag.Int("account_index", 0, "the index of the staking account to use")
// isLeader indicates this node is a beacon chain leader node during the bootstrap process // isLeader indicates this node is a beacon chain leader node during the bootstrap process
@ -171,6 +172,8 @@ func createGlobalConfig() *nodeconfig.ConfigType {
if *isLeader { if *isLeader {
nodeConfig.StringRole = "leader" nodeConfig.StringRole = "leader"
nodeConfig.Leader = nodeConfig.SelfPeer nodeConfig.Leader = nodeConfig.SelfPeer
} else if *isArchival {
nodeConfig.StringRole = "archival"
} else { } else {
nodeConfig.StringRole = "validator" nodeConfig.StringRole = "validator"
} }
@ -188,6 +191,13 @@ func createGlobalConfig() *nodeconfig.ConfigType {
return nodeConfig return nodeConfig
} }
func setupArchivalNode(nodeConfig *nodeconfig.ConfigType) (*node.Node, *nodeconfig.ConfigType) {
currentNode := node.New(nodeConfig.Host, &consensus.Consensus{ShardID: uint32(0)}, nil)
currentNode.NodeConfig.SetRole(nodeconfig.ArchivalNode)
currentNode.AddBeaconChainDatabase(nodeConfig.BeaconDB)
return currentNode, nodeConfig
}
func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) (*consensus.Consensus, *node.Node) { func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) (*consensus.Consensus, *node.Node) {
// Consensus object. // Consensus object.
// TODO: consensus object shouldn't start here // TODO: consensus object shouldn't start here
@ -251,11 +261,14 @@ func main() {
flag.Parse() flag.Parse()
initSetup() initSetup()
var currentNode *node.Node
var consensus *consensus.Consensus
nodeConfig := createGlobalConfig() nodeConfig := createGlobalConfig()
if *isArchival {
// Init logging. currentNode, nodeConfig = setupArchivalNode(nodeConfig)
loggingInit(*logFolder, nodeConfig.StringRole, *ip, *port, *onlyLogTps) loggingInit(*logFolder, nodeConfig.StringRole, *ip, *port, *onlyLogTps)
go currentNode.DoBeaconSyncing()
} else {
// Start Profiler for leader if profile argument is on // Start Profiler for leader if profile argument is on
if nodeConfig.StringRole == "leader" && (*profile || *metricsReportURL != "") { if nodeConfig.StringRole == "leader" && (*profile || *metricsReportURL != "") {
prof := profiler.GetProfiler() prof := profiler.GetProfiler()
@ -264,15 +277,15 @@ func main() {
prof.Start() prof.Start()
} }
} }
consensus, currentNode = setUpConsensusAndNode(nodeConfig)
consensus, currentNode := setUpConsensusAndNode(nodeConfig)
if consensus.IsLeader { if consensus.IsLeader {
go currentNode.SendPongMessage() go currentNode.SendPongMessage()
} }
// Init logging.
log.Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())) loggingInit(*logFolder, nodeConfig.StringRole, *ip, *port, *onlyLogTps)
go currentNode.SupportSyncing() go currentNode.SupportSyncing()
}
log.Info("New Harmony Node ====", "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty()))
currentNode.ServiceManagerSetup() currentNode.ServiceManagerSetup()
currentNode.RunServices() currentNode.RunServices()
currentNode.StartServer() currentNode.StartServer()

@ -27,7 +27,7 @@ const (
NewNode NewNode
ClientNode ClientNode
WalletNode WalletNode
BackupNode ArchivalNode
) )
func (role Role) String() string { func (role Role) String() string {
@ -48,8 +48,8 @@ func (role Role) String() string {
return "ClientNode" return "ClientNode"
case WalletNode: case WalletNode:
return "WalletNode" return "WalletNode"
case BackupNode: case ArchivalNode:
return "BackupNode" return "ArchivalNode"
} }
return "Unknown" return "Unknown"
} }
@ -69,6 +69,7 @@ type ConfigType struct {
isClient bool // whether this node is a client node, such as wallet/txgen isClient bool // whether this node is a client node, such as wallet/txgen
isBeacon bool // whether this node is a beacon node or not isBeacon bool // whether this node is a beacon node or not
isLeader bool // whether this node is a leader or not isLeader bool // whether this node is a leader or not
isArchival bool // whether this node is a archival node. archival node backups all blockchain information.
shardID uint32 // shardID of this node shardID uint32 // shardID of this node
role Role // Role of the node role Role // Role of the node
@ -142,6 +143,11 @@ func (conf *ConfigType) SetIsLeader(b bool) {
conf.isLeader = b conf.isLeader = b
} }
// SetIsArchival set the isArchival configuration
func (conf *ConfigType) SetIsArchival(b bool) {
conf.isArchival = b
}
// SetShardID set the shardID // SetShardID set the shardID
func (conf *ConfigType) SetShardID(s uint32) { func (conf *ConfigType) SetShardID(s uint32) {
conf.shardID = s conf.shardID = s
@ -182,6 +188,11 @@ func (conf *ConfigType) IsLeader() bool {
return conf.isLeader return conf.isLeader
} }
// IsArchival returns the isArchival configuration
func (conf *ConfigType) IsArchival() bool {
return conf.isArchival
}
// ShardID returns the shardID // ShardID returns the shardID
func (conf *ConfigType) ShardID() uint32 { func (conf *ConfigType) ShardID() uint32 {
return conf.shardID return conf.shardID

@ -247,7 +247,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N
node.BlockChannel = make(chan *types.Block) node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block) node.ConfirmedBlockChannel = make(chan *types.Block)
node.BeaconBlockChannel = make(chan *types.Block) node.BeaconBlockChannel = make(chan *types.Block)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain)
node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID) node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID)
@ -431,12 +430,12 @@ func (node *Node) initNodeConfiguration(isBeacon bool, isClient bool) (service.N
nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart
var err error var err error
if !isBeacon { if isBeacon {
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
} else {
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
node.clientReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) node.clientReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
node.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient) node.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient)
} else {
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient)
} }
if err != nil { if err != nil {

@ -142,12 +142,14 @@ func (node *Node) messageHandler(content []byte, sender string) {
} else { } else {
// for non-beaconchain node, subscribe to beacon block broadcast // for non-beaconchain node, subscribe to beacon block broadcast
role := node.NodeConfig.Role() role := node.NodeConfig.Role()
if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) { if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode || role == nodeconfig.ArchivalNode) {
utils.GetLogInstance().Info("Block being handled by block channel", "self peer", node.SelfPeer)
for _, block := range blocks { for _, block := range blocks {
node.BeaconBlockChannel <- block node.BeaconBlockChannel <- block
} }
} }
if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil {
utils.GetLogInstance().Info("Block being handled by client by", "self peer", node.SelfPeer)
node.Client.UpdateBlocks(blocks) node.Client.UpdateBlocks(blocks)
} }
} }
@ -340,7 +342,7 @@ func (node *Node) AddNewBlock(newBlock *types.Block) {
if err != nil { if err != nil {
utils.GetLogInstance().Debug("Error adding new block to blockchain", "blockNum", blockNum, "Error", err) utils.GetLogInstance().Debug("Error adding new block to blockchain", "blockNum", blockNum, "Error", err)
} else { } else {
utils.GetLogInstance().Info("adding new block to blockchain", "blockNum", blockNum) utils.GetLogInstance().Info("adding new block to blockchain", "blockNum", blockNum, "by node", node.SelfPeer)
} }
} }

@ -41,7 +41,6 @@ func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer {
res = append(res, v.(p2p.Peer)) res = append(res, v.(p2p.Peer))
return true return true
}) })
removeID := -1 removeID := -1
for i := range res { for i := range res {
if res[i].Port == node.SelfPeer.Port { if res[i].Port == node.SelfPeer.Port {
@ -52,12 +51,12 @@ func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer {
if removeID != -1 { if removeID != -1 {
res = append(res[:removeID], res[removeID+1:]...) res = append(res[:removeID], res[removeID+1:]...)
} }
utils.GetLogInstance().Debug("GetSyncingPeers: ", "res", res, "self", node.SelfPeer)
return res return res
} }
// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing // GetBeaconSyncingPeers returns a list of peers for beaconchain syncing
func (node *Node) GetBeaconSyncingPeers() []p2p.Peer { func (node *Node) GetBeaconSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(&node.BeaconNeighbors) return node.getNeighborPeers(&node.BeaconNeighbors)
} }
@ -79,6 +78,7 @@ func (node *Node) DoBeaconSyncing() {
startHash := node.beaconChain.CurrentBlock().Hash() startHash := node.beaconChain.CurrentBlock().Hash()
node.beaconSync.AddLastMileBlock(beaconBlock) node.beaconSync.AddLastMileBlock(beaconBlock)
node.beaconSync.StartStateSync(startHash[:], node.beaconChain, node.BeaconWorker, true) node.beaconSync.StartStateSync(startHash[:], node.beaconChain, node.BeaconWorker, true)
utils.GetLogInstance().Debug("[SYNC] STARTING BEACON SYNC")
} }
} }
} }

@ -104,8 +104,8 @@ func (node *Node) setupForClientNode() {
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil))
} }
func (node *Node) setupForBackupNode(isBeacon bool) { func (node *Node) setupForArchivalNode() {
nodeConfig, chanPeer := node.initNodeConfiguration(isBeacon, false) nodeConfig, chanPeer := node.initNodeConfiguration(false, false)
// Register peer discovery service. // Register peer discovery service.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer))
// Register networkinfo service. "0" is the beacon shard ID // Register networkinfo service. "0" is the beacon shard ID
@ -130,8 +130,8 @@ func (node *Node) ServiceManagerSetup() {
node.setupForNewNode() node.setupForNewNode()
case nodeconfig.ClientNode: case nodeconfig.ClientNode:
node.setupForClientNode() node.setupForClientNode()
case nodeconfig.BackupNode: case nodeconfig.ArchivalNode:
node.setupForBackupNode(true) node.setupForArchivalNode()
} }
node.serviceManager.SetupServiceMessageChan(node.serviceMessageChan) node.serviceManager.SetupServiceMessageChan(node.serviceMessageChan)
} }

@ -9,4 +9,6 @@
127.0.0.1 9008 validator 0 127.0.0.1 9008 validator 0
127.0.0.1 9009 validator 0 127.0.0.1 9009 validator 0
127.0.0.1 9010 validator 0 127.0.0.1 9010 validator 0
127.0.0.1 9011 archival 0
127.0.0.1 19999 client 0 127.0.0.1 19999 client 0

@ -148,6 +148,10 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
echo "launching validator ..." echo "launching validator ..."
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE & $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE &
fi fi
if [ "$mode" == "archival" ]; then
echo "launching archival node ... wait"
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB $HMY_OPT2 -key /tmp/$ip-$port.key -is_archival 2>&1 | tee -a $LOG_FILE &
fi
sleep 0.5 sleep 0.5
if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then
(( NUM_NN += 10 )) (( NUM_NN += 10 ))

Loading…
Cancel
Save