Resolve conflicts

pull/1785/head
flicker-harmony 5 years ago
commit f15db6be8c
  1. 328
      cmd/client/txgen/main.go
  2. 517
      cmd/harmony/main.go
  3. 685
      node/node.go

@ -0,0 +1,328 @@
package main
import (
"flag"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/client"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/genesis"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
"github.com/harmony-one/harmony/shard"
)
var (
version string
builtBy string
builtAt string
commit string
stateMutex sync.Mutex
)
const (
checkFrequency = 2 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions.
)
// Settings is the settings for TX generation. No Cross-Shard Support!
type Settings struct {
NumOfAddress int
MaxNumTxsPerBatch int
}
func printVersion(me string) {
fmt.Fprintf(os.Stderr, "Harmony (C) 2019. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt)
os.Exit(0)
}
// The main entrance for the transaction generator program which simulate transactions and send to the network for
// processing.
var (
ip = flag.String("ip", "127.0.0.1", "IP of the node")
port = flag.String("port", "9999", "port of the node.")
numTxns = flag.Int("numTxns", 100, "number of transactions to send per message")
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
duration = flag.Int("duration", 30, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag = flag.Bool("version", false, "Output version info")
crossShardRatio = flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility
shardIDFlag = flag.Int("shardID", 0, "The shardID the node belongs to.")
// Key file to store the private key
keyFile = flag.String("key", "./.txgenkey", "the private key file of the txgen")
// logging verbosity
verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)")
)
func setUpTXGen() *node.Node {
nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile)
if err != nil {
panic(err)
}
peerPubKey := bls.RandPrivateKey().GetPublicKey()
if peerPubKey == nil {
panic(fmt.Errorf("generate key error"))
}
shardID := *shardIDFlag
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey}
// Nodes containing blockchain data to mirror the shards' data in the network
myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey)
if err != nil {
panic("unable to new host in txgen")
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
decider := quorum.NewDecider(quorum.SuperMajorityVote)
consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil, decider)
chainDBFactory := &shardchain.MemDBFactory{}
txGen := node.New(myhost, consensusObj, chainDBFactory, false) //Changed it : no longer archival node.
txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID))
consensusObj.ChainReader = txGen.Blockchain()
genesisShardingConfig := shard.Schedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch))
startIdx := 0
endIdx := startIdx + genesisShardingConfig.NumNodesPerShard()
pubs := []*bls2.PublicKey{}
for _, acct := range genesis.HarmonyAccounts[startIdx:endIdx] {
pub := &bls2.PublicKey{}
if err := pub.DeserializeHexStr(acct.BlsPublicKey); err != nil {
fmt.Printf("Can not deserialize public key. err: %v", err)
os.Exit(1)
}
pubs = append(pubs, pub)
}
consensusObj.Decider.UpdateParticipants(pubs)
txGen.NodeConfig.SetRole(nodeconfig.ClientNode)
if shardID == 0 {
txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon)
} else {
txGen.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID)))
}
txGen.NodeConfig.SetIsClient(true)
return txGen
}
func main() {
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress")
flag.Parse()
if *versionFlag {
printVersion(os.Args[0])
}
// Logging setup
utils.SetLogContext(*port, *ip)
utils.SetLogVerbosity(log.Lvl(*verbosity))
if len(utils.BootNodes) == 0 {
bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings)
if err != nil {
panic(err)
}
utils.BootNodes = bootNodeAddrs
}
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
setting := Settings{
NumOfAddress: 10000,
MaxNumTxsPerBatch: *numTxns,
}
shardID := *shardIDFlag
utils.Logger().Debug().
Int("cx ratio", *crossShardRatio).
Msg("Cross Shard Ratio Is Set But not used")
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder)
h := log.MultiHandler(
log.StreamHandler(os.Stdout, log.TerminalFormat(false)),
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
)
log.Root().SetHandler(h)
txGen := setUpTXGen()
txGen.ServiceManagerSetup()
txGen.RunServices()
start := time.Now()
totalTime := float64(*duration)
utils.Logger().Debug().
Float64("totalTime", totalTime).
Bool("RunForever", isDurationForever(totalTime)).
Msg("Total Duration")
ticker := time.NewTicker(checkFrequency * time.Second)
txGen.DoSyncWithoutConsensus()
syncLoop:
for {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
utils.Logger().Debug().
Int("duration", (int(t.Sub(start)))).
Time("startTime", start).
Float64("totalTime", totalTime).
Msg("Generator timer ended in syncLoop.")
break syncLoop
}
select {
case <-ticker.C:
if txGen.State.String() == "NodeReadyForConsensus" {
utils.Logger().Debug().
Str("txgen node", txGen.SelfPeer.String()).
Str("Node State", txGen.State.String()).
Msg("Generator is now in Sync.")
ticker.Stop()
break syncLoop
}
}
}
readySignal := make(chan uint32)
// This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*types.Block) {
utils.Logger().Info().
Uint64("block num", blocks[0].NumberU64()).
Msg("[Txgen] Received new block")
for _, block := range blocks {
shardID := block.ShardID()
if txGen.Consensus.ShardID == shardID {
utils.Logger().Info().
Int("txNum", len(block.Transactions())).
Uint32("shardID", shardID).
Str("preHash", block.ParentHash().Hex()).
Uint64("currentBlock", txGen.Blockchain().CurrentBlock().NumberU64()).
Uint64("incoming block", block.NumberU64()).
Msg("Got block from leader")
if block.NumberU64()-txGen.Blockchain().CurrentBlock().NumberU64() == 1 {
if err := txGen.AddNewBlock(block); err != nil {
utils.Logger().Error().
Err(err).
Msg("Error when adding new block")
}
stateMutex.Lock()
if err := txGen.Worker.UpdateCurrent(block.Coinbase()); err != nil {
utils.Logger().Warn().Err(err).Msg("(*Worker).UpdateCurrent failed")
}
stateMutex.Unlock()
readySignal <- shardID
}
} else {
continue
}
}
}
txGen.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
// wait for 3 seconds for client to send ping message to leader
// FIXME (leo) the readySignal should be set once we really sent ping message to leader
time.Sleep(1 * time.Second) // wait for nodes to be ready
readySignal <- uint32(shardID)
}()
pushLoop:
for {
t := time.Now()
utils.Logger().Debug().
Float64("running time", t.Sub(start).Seconds()).
Float64("totalTime", totalTime).
Msg("Current running time")
if !isDurationForever(totalTime) && t.Sub(start).Seconds() >= totalTime {
utils.Logger().Debug().
Int("duration", (int(t.Sub(start)))).
Time("startTime", start).
Float64("totalTime", totalTime).
Msg("Generator timer ended.")
break pushLoop
}
if shardID != 0 {
if otherHeight, flag := txGen.IsSameHeight(); flag {
if otherHeight >= 1 {
go func() {
readySignal <- uint32(shardID)
utils.Logger().Debug().Msg("Same blockchain height so readySignal generated")
time.Sleep(3 * time.Second) // wait for nodes to be ready
}()
}
}
}
select {
case shardID := <-readySignal:
lock := sync.Mutex{}
txs, err := GenerateSimulatedTransactionsAccount(uint32(shardID), txGen, setting)
if err != nil {
utils.Logger().Debug().
Err(err).
Msg("Error in Generating Txns")
}
lock.Lock()
SendTxsToShard(txGen, txs, uint32(shardID))
lock.Unlock()
case <-time.After(10 * time.Second):
utils.Logger().Warn().Msg("No new block is received so far")
}
}
}
// SendTxsToShard sends txs to shard, currently just to beacon shard
func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint32) {
msg := proto_node.ConstructTransactionListMessageAccount(txs)
var err error
if shardID == 0 {
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID))
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
}
if err != nil {
utils.Logger().Debug().
Err(err).
Msg("Error in Sending Txns")
}
}
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) {
TxnsToGenerate := setting.MaxNumTxsPerBatch // TODO: make use of settings
txs := make([]*types.Transaction, TxnsToGenerate)
rounds := (TxnsToGenerate / 100)
remainder := TxnsToGenerate % 100
for i := 0; i < 100; i++ {
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey))
for j := 0; j < rounds; j++ {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[100*j+i] = tx
}
if i < remainder {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(rounds), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[100*rounds+i] = tx
}
}
return txs, nil
}
func isDurationForever(duration float64) bool {
return duration <= 0
}

@ -0,0 +1,517 @@
package main
import (
"encoding/hex"
"flag"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"runtime"
"strconv"
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/blsgen"
"github.com/harmony-one/harmony/internal/common"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding"
"github.com/harmony-one/harmony/internal/genesis"
hmykey "github.com/harmony-one/harmony/internal/keystore"
"github.com/harmony-one/harmony/internal/memprofiling"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
"github.com/harmony-one/harmony/shard"
)
// Version string variables
var (
version string
builtBy string
builtAt string
commit string
)
// Host
var (
myHost p2p.Host
)
// InitLDBDatabase initializes a LDBDatabase. isGenesis=true will return the beacon chain database for normal shard nodes
func InitLDBDatabase(ip string, port string, freshDB bool, isBeacon bool) (*ethdb.LDBDatabase, error) {
var dbFileName string
if isBeacon {
dbFileName = fmt.Sprintf("./db/harmony_beacon_%s_%s", ip, port)
} else {
dbFileName = fmt.Sprintf("./db/harmony_%s_%s", ip, port)
}
if freshDB {
var err = os.RemoveAll(dbFileName)
if err != nil {
fmt.Println(err.Error())
}
}
return ethdb.NewLDBDatabase(dbFileName, 0, 0)
}
func printVersion() {
fmt.Fprintln(os.Stderr, nodeconfig.GetVersion())
os.Exit(0)
}
var (
ip = flag.String("ip", "127.0.0.1", "ip of the node")
port = flag.String("port", "9000", "port of the node.")
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
logMaxSize = flag.Int("log_max_size", 100, "the max size in megabytes of the log file before it gets rotated")
freshDB = flag.Bool("fresh_db", false, "true means the existing disk based db will be removed")
profile = flag.Bool("profile", false, "Turn on profiling (CPU, Memory).")
metricsReportURL = flag.String("metrics_report_url", "", "If set, reports metrics to this URL.")
versionFlag = flag.Bool("version", false, "Output version info")
onlyLogTps = flag.Bool("only_log_tps", false, "Only log TPS if true")
dnsZone = flag.String("dns_zone", "", "if given and not empty, use peers from the zone (default: use libp2p peer discovery instead)")
dnsFlag = flag.Bool("dns", true, "[deprecated] equivalent to -dns_zone t.hmny.io")
//Leader needs to have a minimal number of peers to start consensus
minPeers = flag.Int("min_peers", 32, "Minimal number of Peers in shard")
// Key file to store the private key
keyFile = flag.String("key", "./.hmykey", "the p2p key file of the harmony node")
// isGenesis indicates this node is a genesis node
isGenesis = flag.Bool("is_genesis", true, "true means this node is a genesis node")
// isArchival indicates this node is an archival node that will save and archive current blockchain
isArchival = flag.Bool("is_archival", true, "false makes node faster by turning caching off")
// delayCommit is the commit-delay timer, used by Harmony nodes
delayCommit = flag.String("delay_commit", "0ms", "how long to delay sending commit messages in consensus, ex: 500ms, 1s")
// nodeType indicates the type of the node: validator, explorer
nodeType = flag.String("node_type", "validator", "node type: validator, explorer")
// networkType indicates the type of the network
networkType = flag.String("network_type", "mainnet", "type of the network: mainnet, testnet, devnet, localnet")
// syncFreq indicates sync frequency
syncFreq = flag.Int("sync_freq", 60, "unit in seconds")
// beaconSyncFreq indicates beaconchain sync frequency
beaconSyncFreq = flag.Int("beacon_sync_freq", 60, "unit in seconds")
// blockPeriod indicates the how long the leader waits to propose a new block.
blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.")
leaderOverride = flag.Bool("leader_override", false, "true means override the default leader role and acts as validator")
// shardID indicates the shard ID of this node
shardID = flag.Int("shard_id", -1, "the shard ID of this node")
enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.")
enableGC = flag.Bool("enableGC", true, "Enable calling garbage collector manually .")
blsKeyFile = flag.String("blskey_file", "", "The encrypted file of bls serialized private key by passphrase.")
blsPass = flag.String("blspass", "", "The file containing passphrase to decrypt the encrypted bls file.")
blsPassphrase string
// Sharding configuration parameters for devnet
devnetNumShards = flag.Uint("dn_num_shards", 2, "number of shards for -network_type=devnet (default: 2)")
devnetShardSize = flag.Int("dn_shard_size", 10, "number of nodes per shard for -network_type=devnet (default 10)")
devnetHarmonySize = flag.Int("dn_hmy_size", -1, "number of Harmony-operated nodes per shard for -network_type=devnet; negative (default) means equal to -dn_shard_size")
// logConn logs incoming/outgoing connections
logConn = flag.Bool("log_conn", false, "log incoming/outgoing connections")
keystoreDir = flag.String("keystore", hmykey.DefaultKeyStoreDir, "The default keystore directory")
initialAccount = &genesis.DeployAccount{}
// logging verbosity
verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)")
// dbDir is the database directory.
dbDir = flag.String("db_dir", "", "blockchain database directory")
// Disable view change.
disableViewChange = flag.Bool("disable_view_change", false, "Do not propose view change (testing only)")
// metrics flag to collct meetrics or not, pushgateway ip and port for metrics
metricsFlag = flag.Bool("metrics", false, "Collect and upload node metrics")
pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "Metrics view ip")
pushgatewayPort = flag.String("pushgateway_port", "9091", "Metrics view port")
publicRPC = flag.Bool("public_rpc", false, "Enable Public RPC Access (default: false)")
)
func initSetup() {
// maybe request passphrase for bls key.
passphraseForBls()
// Configure log parameters
utils.SetLogContext(*port, *ip)
utils.SetLogVerbosity(log.Lvl(*verbosity))
utils.AddLogFile(fmt.Sprintf("%v/validator-%v-%v.log", *logFolder, *ip, *port), *logMaxSize)
if *onlyLogTps {
matchFilterHandler := log.MatchFilterHandler("msg", "TPS Report", utils.GetLogInstance().GetHandler())
utils.GetLogInstance().SetHandler(matchFilterHandler)
}
// Add GOMAXPROCS to achieve max performance.
runtime.GOMAXPROCS(runtime.NumCPU() * 4)
// Set port and ip to global config.
nodeconfig.GetDefaultConfig().Port = *port
nodeconfig.GetDefaultConfig().IP = *ip
// Setup mem profiling.
memprofiling.GetMemProfiling().Config()
// Set default keystore Dir
hmykey.DefaultKeyStoreDir = *keystoreDir
// Set up randomization seed.
rand.Seed(int64(time.Now().Nanosecond()))
if len(utils.BootNodes) == 0 {
bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings)
if err != nil {
panic(err)
}
utils.BootNodes = bootNodeAddrs
}
}
func passphraseForBls() {
// If FN node running, they should either specify blsPrivateKey or the file with passphrase
// However, explorer or non-validator nodes need no blskey
if *nodeType != "validator" {
return
}
if *blsKeyFile == "" || *blsPass == "" {
fmt.Println("Internal nodes need to have pass to decrypt blskey")
os.Exit(101)
}
passphrase, err := utils.GetPassphraseFromSource(*blsPass)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when reading passphrase file: %v\n", err)
os.Exit(100)
}
blsPassphrase = passphrase
}
func setupInitialAccount() (isLeader bool) {
genesisShardingConfig := shard.Schedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch))
pubKey := setupConsensusKey(nodeconfig.GetDefaultConfig())
reshardingEpoch := genesisShardingConfig.ReshardingEpoch()
if reshardingEpoch != nil && len(reshardingEpoch) > 0 {
for _, epoch := range reshardingEpoch {
config := shard.Schedule.InstanceForEpoch(epoch)
isLeader, initialAccount = config.FindAccount(pubKey.SerializeToHexStr())
if initialAccount != nil {
break
}
}
} else {
isLeader, initialAccount = genesisShardingConfig.FindAccount(pubKey.SerializeToHexStr())
}
if initialAccount == nil {
fmt.Fprintf(os.Stderr, "ERROR cannot find your BLS key in the genesis/FN tables: %s\n", pubKey.SerializeToHexStr())
os.Exit(100)
}
fmt.Printf("My Genesis Account: %v\n", *initialAccount)
return isLeader
}
func setupConsensusKey(nodeConfig *nodeconfig.ConfigType) *bls.PublicKey {
consensusPriKey, err := blsgen.LoadBlsKeyWithPassPhrase(*blsKeyFile, blsPassphrase)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when loading bls key, err :%v\n", err)
os.Exit(100)
}
pubKey := consensusPriKey.GetPublicKey()
// Consensus keys are the BLS12-381 keys used to sign consensus messages
nodeConfig.ConsensusPriKey, nodeConfig.ConsensusPubKey = consensusPriKey, consensusPriKey.GetPublicKey()
if nodeConfig.ConsensusPriKey == nil || nodeConfig.ConsensusPubKey == nil {
fmt.Println("error to get consensus keys.")
os.Exit(100)
}
return pubKey
}
func createGlobalConfig() *nodeconfig.ConfigType {
var err error
nodeConfig := nodeconfig.GetShardConfig(initialAccount.ShardID)
if *nodeType == "validator" {
// Set up consensus keys.
setupConsensusKey(nodeConfig)
} else {
nodeConfig.ConsensusPriKey = &bls.SecretKey{} // set dummy bls key for consensus object
}
// Set network type
netType := nodeconfig.NetworkType(*networkType)
switch netType {
case nodeconfig.Mainnet, nodeconfig.Testnet, nodeconfig.Pangaea, nodeconfig.Localnet, nodeconfig.Devnet:
nodeconfig.SetNetworkType(netType)
default:
panic(fmt.Sprintf("invalid network type: %s", *networkType))
}
nodeConfig.SetPushgatewayIP(*pushgatewayIP)
nodeConfig.SetPushgatewayPort(*pushgatewayPort)
nodeConfig.SetMetricsFlag(*metricsFlag)
// P2p private key is used for secure message transfer between p2p nodes.
nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile)
if err != nil {
panic(err)
}
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey}
myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2pPriKey)
if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet {
myHost.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogger()))
}
if err != nil {
panic("unable to new host in harmony")
}
nodeConfig.DBDir = *dbDir
return nodeConfig
}
func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// Consensus object.
// TODO: consensus object shouldn't start here
// TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later.
decider := quorum.NewDecider(quorum.SuperMajorityVote)
currentConsensus, err := consensus.New(
myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey, decider,
)
currentConsensus.Decider.SetShardIDProvider(func() (uint32, error) {
return currentConsensus.ShardID, nil
})
currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address)
if err != nil {
fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
commitDelay, err := time.ParseDuration(*delayCommit)
if err != nil || commitDelay < 0 {
_, _ = fmt.Fprintf(os.Stderr, "ERROR invalid commit delay %#v", *delayCommit)
os.Exit(1)
}
currentConsensus.SetCommitDelay(commitDelay)
currentConsensus.MinPeers = *minPeers
if *disableViewChange {
currentConsensus.DisableViewChangeForTestingOnly()
}
// Current node.
chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir}
currentNode := node.New(myHost, currentConsensus, chainDBFactory, *isArchival)
switch {
case *networkType == nodeconfig.Localnet:
epochConfig := shard.Schedule.InstanceForEpoch(ethCommon.Big0)
selfPort, err := strconv.ParseUint(*port, 10, 16)
if err != nil {
utils.Logger().Fatal().
Err(err).
Str("self_port_string", *port).
Msg("cannot convert self port string into port number")
}
currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider(
6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard()))
case *dnsZone != "":
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(*dnsZone, syncing.GetSyncingPort(*port))
case *dnsFlag:
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider("t.hmny.io", syncing.GetSyncingPort(*port))
default:
currentNode.SyncingPeerProvider = node.NewLegacySyncingPeerProvider(currentNode)
}
// TODO: refactor the creation of blockchain out of node.New()
currentConsensus.ChainReader = currentNode.Blockchain()
// Set up prometheus pushgateway for metrics monitoring serivce.
currentNode.NodeConfig.SetPushgatewayIP(nodeConfig.PushgatewayIP)
currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort)
currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag)
currentNode.NodeConfig.SetBeaconGroupID(nodeconfig.NewGroupIDByShardID(0))
switch *nodeType {
case "explorer":
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode)
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(*shardID)))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(*shardID)))
case "validator":
currentNode.NodeConfig.SetRole(nodeconfig.Validator)
if nodeConfig.ShardID == shard.BeaconChainShardID {
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(shard.BeaconChainShardID))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(shard.BeaconChainShardID))
} else {
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID)))
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID)))
}
}
currentNode.NodeConfig.ConsensusPubKey = nodeConfig.ConsensusPubKey
currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey
// Setup block period for currentNode.
currentNode.BlockPeriod = time.Duration(*blockPeriod) * time.Second
// TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection.
// Enable it back after mainnet.
// dRand := drand.New(nodeConfig.Host, nodeConfig.ShardID, []p2p.Peer{}, nodeConfig.Leader, currentNode.ConfirmedBlockChannel, nodeConfig.ConsensusPriKey)
// currentNode.Consensus.RegisterPRndChannel(dRand.PRndChannel)
// currentNode.Consensus.RegisterRndChannel(dRand.RndChannel)
// currentNode.DRand = dRand
// This needs to be executed after consensus and drand are setup
if err := currentNode.InitConsensusWithValidators(); err != nil {
utils.Logger().Warn().
Int("shardID", *shardID).
Err(err).
Msg("InitConsensusWithMembers failed")
}
// Set the consensus ID to be the current block number
viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64()
currentConsensus.SetViewID(viewID)
utils.Logger().Info().
Uint64("viewID", viewID).
Msg("Init Blockchain")
// Assign closure functions to the consensus object
currentConsensus.BlockVerifier = currentNode.VerifyNewBlock
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing
currentNode.State = node.NodeWaitToJoin
// update consensus information based on the blockchain
mode := currentConsensus.UpdateConsensusInformation()
currentConsensus.SetMode(mode)
// Watching currentNode and currentConsensus.
memprofiling.GetMemProfiling().Add("currentNode", currentNode)
memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus)
return currentNode
}
func main() {
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress (delimited by ,)")
flag.Parse()
switch *nodeType {
case "validator":
case "explorer":
break
default:
fmt.Fprintf(os.Stderr, "Unknown node type: %s\n", *nodeType)
os.Exit(1)
}
nodeconfig.SetPublicRPC(*publicRPC)
nodeconfig.SetVersion(fmt.Sprintf("Harmony (C) 2019. %v, version %v-%v (%v %v)", path.Base(os.Args[0]), version, commit, builtBy, builtAt))
if *versionFlag {
printVersion()
}
switch *networkType {
case nodeconfig.Mainnet:
shard.Schedule = shardingconfig.MainnetSchedule
case nodeconfig.Testnet:
shard.Schedule = shardingconfig.TestnetSchedule
case nodeconfig.Pangaea:
shard.Schedule = shardingconfig.PangaeaSchedule
case nodeconfig.Localnet:
shard.Schedule = shardingconfig.LocalnetSchedule
case nodeconfig.Devnet:
if *devnetHarmonySize < 0 {
*devnetHarmonySize = *devnetShardSize
}
// TODO (leo): use a passing list of accounts here
devnetConfig, err := shardingconfig.NewInstance(
uint32(*devnetNumShards), *devnetShardSize, *devnetHarmonySize, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, nil)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ERROR invalid devnet sharding config: %s",
err)
os.Exit(1)
}
shard.Schedule = shardingconfig.NewFixedSchedule(devnetConfig)
}
initSetup()
// Set up manual call for garbage collection.
if *enableGC {
memprofiling.MaybeCallGCPeriodically()
}
if *nodeType == "validator" {
setupInitialAccount()
}
if *shardID >= 0 {
utils.Logger().Info().
Uint32("original", initialAccount.ShardID).
Int("override", *shardID).
Msg("ShardID Override")
initialAccount.ShardID = uint32(*shardID)
}
nodeConfig := createGlobalConfig()
currentNode := setupConsensusAndNode(nodeConfig)
//setup state syncing and beacon syncing frequency
currentNode.SetSyncFreq(*syncFreq)
currentNode.SetBeaconSyncFreq(*beaconSyncFreq)
if nodeConfig.ShardID != shard.BeaconChainShardID && currentNode.NodeConfig.Role() != nodeconfig.ExplorerNode {
utils.Logger().Info().Uint32("shardID", currentNode.Blockchain().ShardID()).Uint32("shardID", nodeConfig.ShardID).Msg("SupportBeaconSyncing")
go currentNode.SupportBeaconSyncing()
}
startMsg := "==== New Harmony Node ===="
if *nodeType == "explorer" {
startMsg = "==== New Explorer Node ===="
}
utils.Logger().Info().
Str("BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize())).
Uint32("ShardID", nodeConfig.ShardID).
Str("ShardGroupID", nodeConfig.GetShardGroupID().String()).
Str("BeaconGroupID", nodeConfig.GetBeaconGroupID().String()).
Str("ClientGroupID", nodeConfig.GetClientGroupID().String()).
Str("Role", currentNode.NodeConfig.Role().String()).
Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, myHost.GetID().Pretty())).
Msg(startMsg)
if *enableMemProfiling {
memprofiling.GetMemProfiling().Start()
}
go currentNode.SupportSyncing()
currentNode.ServiceManagerSetup()
currentNode.RunServices()
// RPC for SDK not supported for mainnet.
if err := currentNode.StartRPC(*port); err != nil {
utils.Logger().Warn().
Err(err).
Msg("StartRPC failed")
}
// Run additional node collectors
// Collect node metrics if metrics flag is set
if currentNode.NodeConfig.GetMetricsFlag() {
go currentNode.CollectMetrics()
}
// Commit committtee if node role is explorer
if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode {
go currentNode.CommitCommittee()
}
currentNode.StartServer()
}

@ -0,0 +1,685 @@
package node
import (
"crypto/ecdsa"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/accounts"
"github.com/harmony-one/harmony/api/client"
clientService "github.com/harmony-one/harmony/api/client/service"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/reward"
"github.com/harmony-one/harmony/contracts"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/msgq"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard"
"github.com/harmony-one/harmony/shard/committee"
staking "github.com/harmony-one/harmony/staking/types"
)
// State is a state of a node.
type State byte
// All constants except the NodeLeader below are for validators only.
const (
NodeInit State = iota // Node just started, before contacting BeaconChain
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeNotInSync // Node out of sync, might be just joined Shard or offline for a period of time
NodeOffline // Node is offline
NodeReadyForConsensus // Node is ready for doing consensus
NodeDoingConsensus // Node is already doing consensus
NodeLeader // Node is the leader of some shard.
)
const (
// TxPoolLimit is the limit of transaction pool.
TxPoolLimit = 20000
// NumTryBroadCast is the number of times trying to broadcast
NumTryBroadCast = 3
// ClientRxQueueSize is the number of client messages to queue before tail-dropping.
ClientRxQueueSize = 16384
// ShardRxQueueSize is the number of shard messages to queue before tail-dropping.
ShardRxQueueSize = 16384
// GlobalRxQueueSize is the number of global messages to queue before tail-dropping.
GlobalRxQueueSize = 16384
// ClientRxWorkers is the number of concurrent client message handlers.
ClientRxWorkers = 8
// ShardRxWorkers is the number of concurrent shard message handlers.
ShardRxWorkers = 32
// GlobalRxWorkers is the number of concurrent global message handlers.
GlobalRxWorkers = 32
)
func (state State) String() string {
switch state {
case NodeInit:
return "NodeInit"
case NodeWaitToJoin:
return "NodeWaitToJoin"
case NodeNotInSync:
return "NodeNotInSync"
case NodeOffline:
return "NodeOffline"
case NodeReadyForConsensus:
return "NodeReadyForConsensus"
case NodeDoingConsensus:
return "NodeDoingConsensus"
case NodeLeader:
return "NodeLeader"
}
return "Unknown"
}
const (
maxBroadcastNodes = 10 // broadcast at most maxBroadcastNodes peers that need in sync
broadcastTimeout int64 = 60 * 1000000000 // 1 mins
//SyncIDLength is the length of bytes for syncID
SyncIDLength = 20
)
// use to push new block to outofsync node
type syncConfig struct {
timestamp int64
client *downloader.Client
}
// Node represents a protocol-participating node in the network
type Node struct {
Consensus *consensus.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
BlockChannel chan *types.Block // The channel to send newly proposed blocks
ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks
BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes
DRand *drand.DRand // The instance for distributed randomness protocol
pendingCrossLinks []*block.Header
pendingClMutex sync.Mutex
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
pendingCXMutex sync.Mutex
// Shard databases
shardChains shardchain.Collection
Client *client.Client // The presence of a client object means this node will also act as a client
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
BCPeers []p2p.Peer // list of Beacon Chain Peers. This is needed by all nodes.
// TODO: Neighbors should store only neighbor nodes in the same shard
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
numPeers int // Number of Peers
State State // State of the Node
stateMutex sync.Mutex // mutex for change node state
// BeaconNeighbors store only neighbor nodes in the beacon chain shard
BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below
CxPool *core.CxPool // pool for missing cross shard receipts resend
pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus
pendingTxMutex sync.Mutex
recentTxsStats types.RecentTxsStats
pendingStakingTransactions map[common.Hash]*staking.StakingTransaction // All the staking transactions received but not yet processed for Consensus
pendingStakingTxMutex sync.Mutex
Worker *worker.Worker
BeaconWorker *worker.Worker // worker for beacon chain
// Client server (for wallet requests)
clientServer *clientService.Server
// Syncing component.
syncID [SyncIDLength]byte // a unique ID for the node during the state syncing process with peers
downloaderServer *downloader.Server
stateSync *syncing.StateSync
beaconSync *syncing.StateSync
peerRegistrationRecord map[string]*syncConfig // record registration time (unixtime) of peers begin in syncing
SyncingPeerProvider SyncingPeerProvider
// syncing frequency parameters
syncFreq int
beaconSyncFreq int
// The p2p host used to send/receive p2p messages
host p2p.Host
// Incoming messages to process.
clientRxQueue *msgq.Queue
shardRxQueue *msgq.Queue
globalRxQueue *msgq.Queue
// Service manager.
serviceManager *service.Manager
// Demo account.
DemoContractAddress common.Address
LotteryManagerPrivateKey *ecdsa.PrivateKey
// Puzzle account.
PuzzleContractAddress common.Address
PuzzleManagerPrivateKey *ecdsa.PrivateKey
// For test only; TODO ek – remove this
TestBankKeys []*ecdsa.PrivateKey
ContractDeployerKey *ecdsa.PrivateKey
ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block
ContractAddresses []common.Address
// For puzzle contracts
AddressNonce sync.Map
// Shard group Message Receiver
shardGroupReceiver p2p.GroupReceiver
// Global group Message Receiver, communicate with beacon chain, or cross-shard TX
globalGroupReceiver p2p.GroupReceiver
// Client Message Receiver to handle light client messages
// Beacon leader needs to use this receiver to talk to new node
clientReceiver p2p.GroupReceiver
// Duplicated Ping Message Received
duplicatedPing sync.Map
// Channel to notify consensus service to really start consensus
startConsensus chan struct{}
// node configuration, including group ID, shard ID, etc
NodeConfig *nodeconfig.ConfigType
// Chain configuration.
chainConfig params.ChainConfig
// map of service type to its message channel.
serviceMessageChan map[service.Type]chan *msg_pb.Message
// Used to call smart contract locally
ContractCaller *contracts.ContractCaller
accountManager *accounts.Manager
// Next shard state
nextShardState struct {
// The received master shard state
master *shard.EpochShardState
// When for a leader to propose the next shard state,
// or for a validator to wait for a proposal before view change.
// TODO ek – replace with retry-based logic instead of delay
proposeTime time.Time
}
isFirstTime bool // the node was started with a fresh database
// How long in second the leader needs to wait to propose a new block.
BlockPeriod time.Duration
// last time consensus reached for metrics
lastConsensusTime int64
}
// Blockchain returns the blockchain for the node's current shard.
func (node *Node) Blockchain() *core.BlockChain {
shardID := node.NodeConfig.ShardID
bc, err := node.shardChains.ShardChain(shardID)
if err != nil {
utils.Logger().Error().
Uint32("shardID", shardID).
Err(err).
Msg("cannot get shard chain")
}
return bc
}
// Beaconchain returns the beaconchain from node.
func (node *Node) Beaconchain() *core.BlockChain {
bc, err := node.shardChains.ShardChain(0)
if err != nil {
utils.Logger().Error().Err(err).Msg("cannot get beaconchain")
}
return bc
}
func (node *Node) tryBroadcast(tx *types.Transaction) {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
shardGroupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(tx.ShardID()))
utils.Logger().Info().Str("shardGroupID", string(shardGroupID)).Msg("tryBroadcast")
for attempt := 0; attempt < NumTryBroadCast; attempt++ {
if err := node.host.SendMessageToGroups([]nodeconfig.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast {
utils.Logger().Error().Int("attempt", attempt).Msg("Error when trying to broadcast tx")
} else {
break
}
}
}
// Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) {
txPoolLimit := shard.Schedule.MaxTxPoolSizeLimit()
node.pendingTxMutex.Lock()
for _, tx := range newTxs {
if _, ok := node.pendingTransactions[tx.Hash()]; !ok {
node.pendingTransactions[tx.Hash()] = tx
}
if len(node.pendingTransactions) > txPoolLimit {
break
}
}
node.pendingTxMutex.Unlock()
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions")
}
// Add new staking transactions to the pending staking transaction list.
func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) {
txPoolLimit := shard.Schedule.MaxTxPoolSizeLimit()
node.pendingStakingTxMutex.Lock()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
node.pendingStakingTransactions[tx.Hash()] = tx
}
if len(node.pendingStakingTransactions) > txPoolLimit {
break
}
}
node.pendingStakingTxMutex.Unlock()
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions")
}
// AddPendingStakingTransaction staking transactions
func (node *Node) AddPendingStakingTransaction(
newStakingTx *staking.StakingTransaction) {
node.addPendingStakingTransactions(staking.StakingTransactions{newStakingTx})
}
// AddPendingTransaction adds one new transaction to the pending transaction list.
// This is only called from SDK.
func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID {
node.addPendingTransactions(types.Transactions{newTx})
} else {
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)
}
utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction")
}
// AddPendingReceipts adds one receipt message to pending list.
func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) {
node.pendingCXMutex.Lock()
defer node.pendingCXMutex.Unlock()
if receipts.ContainsEmptyField() {
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("CXReceiptsProof contains empty field")
return
}
blockNum := receipts.Header.Number().Uint64()
shardID := receipts.Header.ShardID()
key := utils.GetPendingCXKey(shardID, blockNum)
if _, ok := node.pendingCXReceipts[key]; ok {
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Already Got Same Receipt message")
return
}
node.pendingCXReceipts[key] = receipts
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message")
}
// Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, staking.StakingTransactions) {
txsThrottleConfig := shard.Schedule.TxsThrottleConfig()
// the next block number to be added in consensus protocol, which is always one more than current chain header block
newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1
// remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block
for blockNum := range node.recentTxsStats {
recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod)
if recentTxsBlockNumGap < newBlockNum-blockNum {
delete(node.recentTxsStats, blockNum)
}
}
node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts)
// Must update to the correct current state before processing potential txns
if err := node.Worker.UpdateCurrent(coinbase); err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed updating worker's state before txn selection")
return types.Transactions{}, staking.StakingTransactions{}
}
node.pendingTxMutex.Lock()
defer node.pendingTxMutex.Unlock()
node.pendingStakingTxMutex.Lock()
defer node.pendingStakingTxMutex.Unlock()
pendingTransactions := types.Transactions{}
pendingStakingTransactions := staking.StakingTransactions{}
for _, tx := range node.pendingTransactions {
pendingTransactions = append(pendingTransactions, tx)
}
for _, tx := range node.pendingStakingTransactions {
pendingStakingTransactions = append(pendingStakingTransactions, tx)
}
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase)
selectedStaking, unselectedStaking, invalidStaking :=
node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
for _, unselectedTx := range unselected {
node.pendingTransactions[unselectedTx.Hash()] = unselectedTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingTransactions)).
Int("selected", len(selected)).
Int("invalidDiscarded", len(invalid)).
Msg("Selecting Transactions")
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction)
for _, unselectedStakingTx := range unselectedStaking {
node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx
}
utils.Logger().Info().
Int("remainPending", len(node.pendingStakingTransactions)).
Int("selected", len(unselectedStaking)).
Int("invalidDiscarded", len(invalidStaking)).
Msg("Selecting Staking Transactions")
return selected, selectedStaking
}
func (node *Node) startRxPipeline(
receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int,
) {
// consumers
for i := 0; i < numWorkers; i++ {
go queue.HandleMessages(node)
}
// provider
go node.receiveGroupMessage(receiver, queue)
}
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
// client messages are sent by clients, like txgen, wallet
node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers)
// start the goroutine to receive group message
node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers)
// start the goroutine to receive global message, used for cross-shard TX
// FIXME (leo): we use beacon client topic as the global topic for now
node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers)
select {}
}
// Count the total number of transactions in the blockchain
// Currently used for stats reporting purpose
func (node *Node) countNumTransactionsInBlockchain() int {
count := 0
for block := node.Blockchain().CurrentBlock(); block != nil; block = node.Blockchain().GetBlockByHash(block.Header().ParentHash()) {
count += len(block.Transactions())
}
return count
}
// GetSyncID returns the syncID of this node
func (node *Node) GetSyncID() [SyncIDLength]byte {
return node.syncID
}
// New creates a new node.
func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node {
node := Node{}
node.syncFreq = SyncFrequency
node.beaconSyncFreq = SyncFrequency
// Get the node config that's created in the harmony.go program.
if consensusObj != nil {
node.NodeConfig = nodeconfig.GetShardConfig(consensusObj.ShardID)
} else {
node.NodeConfig = nodeconfig.GetDefaultConfig()
}
copy(node.syncID[:], GenerateRandomString(SyncIDLength))
if host != nil {
node.host = host
node.SelfPeer = host.GetSelfPeer()
}
chainConfig := *params.TestnetChainConfig
switch node.NodeConfig.GetNetworkType() {
case nodeconfig.Mainnet:
chainConfig = *params.MainnetChainConfig
case nodeconfig.Pangaea:
chainConfig = *params.PangaeaChainConfig
}
node.chainConfig = chainConfig
collection := shardchain.NewCollection(
chainDBFactory, &genesisInitializer{&node}, chain.Engine, &chainConfig,
)
if isArchival {
collection.DisableCache()
}
node.shardChains = collection
if host != nil && consensusObj != nil {
// Consensus and associated channel to communicate blocks
node.Consensus = consensusObj
// Load the chains.
blockchain := node.Blockchain() // this also sets node.isFirstTime if the DB is fresh
beaconChain := node.Beaconchain()
node.BlockChannel = make(chan *types.Block)
node.ConfirmedBlockChannel = make(chan *types.Block)
node.BeaconBlockChannel = make(chan *types.Block)
node.recentTxsStats = make(types.RecentTxsStats)
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain)
node.CxPool = core.NewCxPool(core.CxPoolSize)
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine)
if node.Blockchain().ShardID() != shard.BeaconChainShardID {
node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine)
}
node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof)
node.pendingTransactions = make(map[common.Hash]*types.Transaction)
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction)
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
chain.Engine.SetRewarder(node.Consensus.Decider.(reward.Distributor))
// the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block
node.Consensus.SetBlockNum(blockchain.CurrentBlock().NumberU64() + 1)
// Add Faucet contract to all shards, so that on testnet, we can demo wallet in explorer
// TODO (leo): we need to have support of cross-shard tx later so that the token can be transferred from beacon chain shard to other tx shards.
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet {
if node.isFirstTime {
// Setup one time smart contracts
node.AddFaucetContractToPendingTransactions()
} else {
node.AddContractKeyAndAddress(scFaucet)
}
node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config())
// Create test keys. Genesis will later need this.
var err error
node.TestBankKeys, err = CreateTestBankKeys(TestAccountNumber)
if err != nil {
utils.Logger().Error().Err(err).Msg("Error while creating test keys")
}
}
}
utils.Logger().Info().
Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)).
Msg("Genesis block hash")
node.clientRxQueue = msgq.New(ClientRxQueueSize)
node.shardRxQueue = msgq.New(ShardRxQueueSize)
node.globalRxQueue = msgq.New(GlobalRxQueueSize)
// Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig)
node.startConsensus = make(chan struct{})
go node.bootstrapConsensus()
return &node
}
// InitConsensusWithValidators initialize shard state from latest epoch and update committee pub
// keys for consensus and drand
func (node *Node) InitConsensusWithValidators() (err error) {
if node.Consensus == nil {
utils.Logger().Error().Msg("[InitConsensusWithValidators] consenus is nil; Cannot figure out shardID")
return ctxerror.New("[InitConsensusWithValidators] consenus is nil; Cannot figure out shardID")
}
shardID := node.Consensus.ShardID
blockNum := node.Blockchain().CurrentBlock().NumberU64()
node.Consensus.SetMode(consensus.Listening)
epoch := shard.Schedule.CalcEpochNumber(blockNum)
utils.Logger().Info().
Uint64("blockNum", blockNum).
Uint32("shardID", shardID).
Uint64("epoch", epoch.Uint64()).
Msg("[InitConsensusWithValidators] Try To Get PublicKeys")
_, pubKeys := committee.WithStakingEnabled.ComputePublicKeys(
epoch, node.Consensus.ChainReader, int(shardID),
)
if len(pubKeys) == 0 {
utils.Logger().Error().
Uint32("shardID", shardID).
Uint64("blockNum", blockNum).
Msg("[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys")
return ctxerror.New(
"[InitConsensusWithValidators] PublicKeys is Empty, Cannot update public keys",
"shardID", shardID,
"blockNum", blockNum)
}
for i := range pubKeys {
if pubKeys[i].IsEqual(node.Consensus.PubKey) {
utils.Logger().Info().
Uint64("blockNum", blockNum).
Int("numPubKeys", len(pubKeys)).
Msg("[InitConsensusWithValidators] Successfully updated public keys")
node.Consensus.UpdatePublicKeys(pubKeys)
node.Consensus.SetMode(consensus.Normal)
return nil
}
}
// TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection.
// node.DRand.UpdatePublicKeys(pubKeys)
return nil
}
// AddPeers adds neighbors nodes
func (node *Node) AddPeers(peers []*p2p.Peer) int {
count := 0
for _, p := range peers {
key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID)
_, ok := node.Neighbors.LoadOrStore(key, *p)
if !ok {
// !ok means new peer is stored
count++
node.host.AddPeer(p)
node.numPeers++
continue
}
}
return count
}
// AddBeaconPeer adds beacon chain neighbors nodes
// Return false means new neighbor peer was added
// Return true means redundant neighbor peer wasn't added
func (node *Node) AddBeaconPeer(p *p2p.Peer) bool {
key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID)
_, ok := node.BeaconNeighbors.LoadOrStore(key, *p)
return ok
}
// isBeacon = true if the node is beacon node
// isClient = true if the node light client(wallet)
func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
PushgatewayIP: node.NodeConfig.GetPushgatewayIP(),
PushgatewayPort: node.NodeConfig.GetPushgatewayPort(),
IsClient: node.NodeConfig.IsClient(),
Beacon: nodeconfig.NewGroupIDByShardID(0),
ShardGroupID: node.NodeConfig.GetShardGroupID(),
Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType),
}
if nodeConfig.IsClient {
nodeConfig.Actions[nodeconfig.NewClientGroupIDByShardID(0)] = nodeconfig.ActionStart
} else {
nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = nodeconfig.ActionStart
}
var err error
node.shardGroupReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetShardGroupID())
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create shard receiver")
}
node.globalGroupReceiver, err = node.host.GroupReceiver(nodeconfig.NewClientGroupIDByShardID(0))
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create global receiver")
}
node.clientReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetClientGroupID())
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to create client receiver")
}
return nodeConfig, chanPeer
}
// AccountManager ...
func (node *Node) AccountManager() *accounts.Manager {
return node.accountManager
}
// ServiceManager ...
func (node *Node) ServiceManager() *service.Manager {
return node.serviceManager
}
// SetSyncFreq sets the syncing frequency in the loop
func (node *Node) SetSyncFreq(syncFreq int) {
node.syncFreq = syncFreq
}
// SetBeaconSyncFreq sets the syncing frequency in the loop
func (node *Node) SetBeaconSyncFreq(syncFreq int) {
node.beaconSyncFreq = syncFreq
}
Loading…
Cancel
Save