@ -20,6 +20,7 @@ import (
"github.com/harmony-one/harmony/contracts"
"github.com/harmony-one/harmony/contracts"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/values"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/chain"
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -27,6 +28,7 @@ import (
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/msgq"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
p2p_host "github.com/harmony-one/harmony/p2p/host"
@ -53,6 +55,18 @@ const (
TxPoolLimit = 20000
TxPoolLimit = 20000
// NumTryBroadCast is the number of times trying to broadcast
// NumTryBroadCast is the number of times trying to broadcast
NumTryBroadCast = 3
NumTryBroadCast = 3
// ClientRxQueueSize is the number of client messages to queue before tail-dropping.
ClientRxQueueSize = 16384
// ShardRxQueueSize is the number of shard messages to queue before tail-dropping.
ShardRxQueueSize = 16384
// GlobalRxQueueSize is the number of global messages to queue before tail-dropping.
GlobalRxQueueSize = 16384
// ClientRxWorkers is the number of concurrent client message handlers.
ClientRxWorkers = 8
// ShardRxWorkers is the number of concurrent shard message handlers.
ShardRxWorkers = 32
// GlobalRxWorkers is the number of concurrent global message handlers.
GlobalRxWorkers = 32
)
)
func ( state State ) String ( ) string {
func ( state State ) String ( ) string {
@ -145,6 +159,11 @@ type Node struct {
// The p2p host used to send/receive p2p messages
// The p2p host used to send/receive p2p messages
host p2p . Host
host p2p . Host
// Incoming messages to process.
clientRxQueue * msgq . Queue
shardRxQueue * msgq . Queue
globalRxQueue * msgq . Queue
// Service manager.
// Service manager.
serviceManager * service . Manager
serviceManager * service . Manager
@ -372,7 +391,8 @@ func (node *Node) getTransactionsForNewBlock(
}
}
selected , unselected , invalid := node . Worker . SelectTransactionsForNewBlock ( newBlockNum , pendingTransactions , node . recentTxsStats , txsThrottleConfig , coinbase )
selected , unselected , invalid := node . Worker . SelectTransactionsForNewBlock ( newBlockNum , pendingTransactions , node . recentTxsStats , txsThrottleConfig , coinbase )
selectedStaking , unselectedStaking , invalidStaking := node . Worker . SelectStakingTransactionsForNewBlock ( newBlockNum , pendingStakingTransactions , node . recentTxsStats , txsThrottleConfig , coinbase )
selectedStaking , unselectedStaking , invalidStaking :=
node . Worker . SelectStakingTransactionsForNewBlock ( newBlockNum , pendingStakingTransactions , coinbase )
node . pendingTransactions = make ( map [ common . Hash ] * types . Transaction )
node . pendingTransactions = make ( map [ common . Hash ] * types . Transaction )
for _ , unselectedTx := range unselected {
for _ , unselectedTx := range unselected {
@ -397,8 +417,30 @@ func (node *Node) getTransactionsForNewBlock(
return selected , selectedStaking
return selected , selectedStaking
}
}
func ( node * Node ) startRxPipeline (
receiver p2p . GroupReceiver , queue * msgq . Queue , numWorkers int ,
) {
// consumers
for i := 0 ; i < numWorkers ; i ++ {
go queue . HandleMessages ( node )
}
// provider
go node . receiveGroupMessage ( receiver , queue )
}
// StartServer starts a server and process the requests by a handler.
// StartServer starts a server and process the requests by a handler.
func ( node * Node ) StartServer ( ) {
func ( node * Node ) StartServer ( ) {
// start the goroutine to receive client message
// client messages are sent by clients, like txgen, wallet
node . startRxPipeline ( node . clientReceiver , node . clientRxQueue , ClientRxWorkers )
// start the goroutine to receive group message
node . startRxPipeline ( node . shardGroupReceiver , node . shardRxQueue , ShardRxWorkers )
// start the goroutine to receive global message, used for cross-shard TX
// FIXME (leo): we use beacon client topic as the global topic for now
node . startRxPipeline ( node . globalGroupReceiver , node . globalRxQueue , GlobalRxWorkers )
select { }
select { }
}
}
@ -465,14 +507,14 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node . TxPool = core . NewTxPool ( core . DefaultTxPoolConfig , node . Blockchain ( ) . Config ( ) , blockchain )
node . TxPool = core . NewTxPool ( core . DefaultTxPoolConfig , node . Blockchain ( ) . Config ( ) , blockchain )
node . CxPool = core . NewCxPool ( core . CxPoolSize )
node . CxPool = core . NewCxPool ( core . CxPoolSize )
node . Worker = worker . New ( node . Blockchain ( ) . Config ( ) , blockchain , chain . Engine )
node . Worker = worker . New ( node . Blockchain ( ) . Config ( ) , blockchain , chain . Engine )
if node . Blockchain ( ) . ShardID ( ) != 0 {
if node . Blockchain ( ) . ShardID ( ) != values . BeaconChainShardID {
node . BeaconWorker = worker . New ( node . Beaconchain ( ) . Config ( ) , beaconChain , chain . Engine )
node . BeaconWorker = worker . New ( node . Beaconchain ( ) . Config ( ) , beaconChain , chain . Engine )
}
}
node . pendingCXReceipts = make ( map [ string ] * types . CXReceiptsProof )
node . pendingCXReceipts = make ( map [ string ] * types . CXReceiptsProof )
node . pendingTransactions = make ( map [ common . Hash ] * types . Transaction )
node . pendingTransactions = make ( map [ common . Hash ] * types . Transaction )
node . pendingStakingTransactions = make ( map [ common . Hash ] * staking . StakingTransaction )
node . pendingStakingTransactions = make ( map [ common . Hash ] * staking . StakingTransaction )
node . Consensus . VerifiedNewBlock = make ( chan * types . Block )
node . Consensus . VerifiedNewBlock = make ( chan * types . Block )
// the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block
// the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block
node . Consensus . SetBlockNum ( blockchain . CurrentBlock ( ) . NumberU64 ( ) + 1 )
node . Consensus . SetBlockNum ( blockchain . CurrentBlock ( ) . NumberU64 ( ) + 1 )
@ -486,20 +528,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
} else {
} else {
node . AddContractKeyAndAddress ( scFaucet )
node . AddContractKeyAndAddress ( scFaucet )
}
}
//if node.Consensus.ShardID == 0 {
// // Contracts only exist in beacon chain
// if node.isFirstTime {
// // Setup one time smart contracts
// node.CurrentStakes = make(map[common.Address]*structs.StakeInfo)
// node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked
// } else {
// node.AddContractKeyAndAddress(scStaking)
// }
//}
node . ContractCaller = contracts . NewContractCaller ( node . Blockchain ( ) , node . Blockchain ( ) . Config ( ) )
node . ContractCaller = contracts . NewContractCaller ( node . Blockchain ( ) , node . Blockchain ( ) . Config ( ) )
// Create test keys. Genesis will later need this.
// Create test keys. Genesis will later need this.
var err error
var err error
node . TestBankKeys , err = CreateTestBankKeys ( TestAccountNumber )
node . TestBankKeys , err = CreateTestBankKeys ( TestAccountNumber )
@ -513,16 +542,9 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
Interface ( "genesis block header" , node . Blockchain ( ) . GetHeaderByNumber ( 0 ) ) .
Interface ( "genesis block header" , node . Blockchain ( ) . GetHeaderByNumber ( 0 ) ) .
Msg ( "Genesis block hash" )
Msg ( "Genesis block hash" )
// start the goroutine to receive client message
node . clientRxQueue = msgq . New ( ClientRxQueueSize )
// client messages are sent by clients, like txgen, wallet
node . shardRxQueue = msgq . New ( ShardRxQueueSize )
go node . ReceiveClientGroupMessage ( )
node . globalRxQueue = msgq . New ( GlobalRxQueueSize )
// start the goroutine to receive group message
go node . ReceiveGroupMessage ( )
// start the goroutine to receive global message, used for cross-shard TX
// FIXME (leo): we use beacon client topic as the global topic for now
go node . ReceiveGlobalMessage ( )
// Setup initial state of syncing.
// Setup initial state of syncing.
node . peerRegistrationRecord = make ( map [ string ] * syncConfig )
node . peerRegistrationRecord = make ( map [ string ] * syncConfig )