Merge pull request #1439 from mikedoan/broadcast

Broadcast tx from non-leader
pull/1452/head
Minh Doan 5 years ago committed by GitHub
commit 6e96f215ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      node/node.go

@ -14,6 +14,7 @@ import (
"github.com/harmony-one/harmony/api/client"
clientService "github.com/harmony-one/harmony/api/client/service"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
@ -30,6 +31,7 @@ import (
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
)
// State is a state of a node.
@ -49,6 +51,8 @@ const (
const (
// TxPoolLimit is the limit of transaction pool.
TxPoolLimit = 20000
// NumTryBroadCast is the number of times trying to broadcast
NumTryBroadCast = 3
)
func (state State) String() string {
@ -246,6 +250,21 @@ func (node *Node) reducePendingTransactions() {
}
}
func (node *Node) tryBroadcast(tx *types.Transaction) {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
shardGroupID := p2p.NewGroupIDByShardID(p2p.ShardID(tx.ShardID()))
utils.Logger().Info().Str("shardGroupID", string(shardGroupID)).Msg("tryBroadcast")
for attempt := 0; attempt < NumTryBroadCast; attempt++ {
if err := node.host.SendMessageToGroups([]p2p.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast {
utils.Logger().Error().Int("attempt", attempt).Msg("Error when trying to broadcast tx")
} else {
break
}
}
}
// Add new transactions to the pending transaction list.
func (node *Node) addPendingTransactions(newTxs types.Transactions) {
node.pendingTxMutex.Lock()
@ -256,8 +275,14 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) {
}
// AddPendingTransaction adds one new transaction to the pending transaction list.
// This is only called from SDK.
func (node *Node) AddPendingTransaction(newTx *types.Transaction) {
if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID {
node.addPendingTransactions(types.Transactions{newTx})
} else {
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx")
node.tryBroadcast(newTx)
}
utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction")
}

Loading…
Cancel
Save