Extend failure sink (#2045)

* [project] Rename specs to specifications b/c gcc thinks specs is a special file

* [rpc][node] Refactor failed stking txn buffer as container/ring, track failed plain txn as well

* [rpc] Expose RPC for staking, plain txn error sink
pull/2052/head
Edgar Aroutiounian 5 years ago committed by GitHub
parent de34b1753c
commit 868c4c896c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      core/types/transaction.go
  2. 9
      hmy/api_backend.go
  3. 15
      hmy/backend.go
  4. 6
      internal/hmyapi/backend.go
  5. 10
      internal/hmyapi/harmony.go
  6. 14
      node/node.go
  7. 12
      node/node_handler_test.go
  8. 18
      node/node_newblock.go
  9. 52
      node/rpc.go
  10. 33
      node/worker/worker.go
  11. 6
      node/worker/worker_test.go
  12. 0
      specifications/p2p/nat-hole-punching-spike.md
  13. 0
      specifications/p2p/peerdiscovery.md
  14. 0
      specifications/test/testplan.md
  15. 12
      test/chain/main.go

@ -70,6 +70,13 @@ type Transaction struct {
from atomic.Value
}
// RPCTransactionError ..
type RPCTransactionError struct {
TxHashID string `json:"tx-hash-id"`
TimestampOfRejection int64 `json:"time-at-rejection"`
ErrMessage string `json:"error-message"`
}
//String print mode string
func (txType TransactionType) String() string {
if txType == SameShardTx {

@ -409,11 +409,16 @@ func (b *APIBackend) GetShardState() (*shard.State, error) {
return b.hmy.BlockChain().ReadShardState(b.hmy.BlockChain().CurrentHeader().Epoch())
}
// GetCurrentStakingTransactionErrorSink ..
func (b *APIBackend) GetCurrentStakingTransactionErrorSink() []staking.RPCTransactionError {
// GetCurrentStakingErrorSink ..
func (b *APIBackend) GetCurrentStakingErrorSink() []staking.RPCTransactionError {
return b.hmy.nodeAPI.ErroredStakingTransactionSink()
}
// GetCurrentTransactionErrorSink ..
func (b *APIBackend) GetCurrentTransactionErrorSink() []types.RPCTransactionError {
return b.hmy.nodeAPI.ErroredTransactionSink()
}
// IsBeaconChainExplorerNode ..
func (b *APIBackend) IsBeaconChainExplorerNode() bool {
return b.hmy.nodeAPI.IsBeaconChainExplorerNode()

@ -17,22 +17,18 @@ import (
// Harmony implements the Harmony full node service.
type Harmony struct {
// Channel for shutting down the service
shutdownChan chan bool // Channel for shutting down the Harmony
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
shutdownChan chan bool // Channel for shutting down the Harmony
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
blockchain *core.BlockChain
txPool *core.TxPool
cxPool *core.CxPool
accountManager *accounts.Manager
eventMux *event.TypeMux
// DB interfaces
chainDb ethdb.Database // Block chain database
chainDb ethdb.Database // Block chain database
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
APIBackend *APIBackend
nodeAPI NodeAPI
nodeAPI NodeAPI
// aka network version, which is used to identify which network we are using
networkID uint64
// TODO(ricl): put this into config object
@ -53,6 +49,7 @@ type NodeAPI interface {
GetTransactionsHistory(address, txType, order string) ([]common.Hash, error)
IsCurrentlyLeader() bool
ErroredStakingTransactionSink() []staking.RPCTransactionError
ErroredTransactionSink() []types.RPCTransactionError
IsBeaconChainExplorerNode() bool
}
@ -77,9 +74,7 @@ func New(
networkID: 1, // TODO(ricl): this should be from config
shardID: shardID,
}
hmy.APIBackend = &APIBackend{hmy}
return hmy, nil
}

@ -36,7 +36,6 @@ type Backend interface {
AccountManager() *accounts.Manager
// ExtRPCEnabled() bool
RPCGasCap() *big.Int // global gas cap for hmy_call over rpc: DoS protection
// BlockChain API
// SetHead(number uint64)
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*block.Header, error)
@ -49,7 +48,6 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
// TxPool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
// GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
@ -59,7 +57,6 @@ type Backend interface {
// Stats() (pending int, queued int)
// TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
ChainConfig() *params.ChainConfig
CurrentBlock() *types.Block
// Get balance
@ -81,7 +78,8 @@ type Backend interface {
GetDelegationsByDelegator(delegator common.Address) ([]common.Address, []*staking.Delegation)
GetValidatorSelfDelegation(addr common.Address) *big.Int
GetShardState() (*shard.State, error)
GetCurrentStakingTransactionErrorSink() []staking.RPCTransactionError
GetCurrentStakingErrorSink() []staking.RPCTransactionError
GetCurrentTransactionErrorSink() []types.RPCTransactionError
IsBeaconChainExplorerNode() bool
GetMedianRawStakeSnapshot() *big.Int
}

@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
staking "github.com/harmony-one/harmony/staking/types"
)
@ -68,6 +69,11 @@ func (s *PublicHarmonyAPI) GetNodeMetadata() NodeMetadata {
}
// GetCurrentTransactionErrorSink ..
func (s *PublicHarmonyAPI) GetCurrentTransactionErrorSink() []staking.RPCTransactionError {
return s.b.GetCurrentStakingTransactionErrorSink()
func (s *PublicHarmonyAPI) GetCurrentTransactionErrorSink() []types.RPCTransactionError {
return s.b.GetCurrentTransactionErrorSink()
}
// GetCurrentStakingErrorSink ..
func (s *PublicHarmonyAPI) GetCurrentStakingErrorSink() []staking.RPCTransactionError {
return s.b.GetCurrentStakingErrorSink()
}

@ -1,6 +1,7 @@
package node
import (
"container/ring"
"crypto/ecdsa"
"fmt"
"os"
@ -220,7 +221,8 @@ type Node struct {
// Last 1024 staking transaction error, only in memory
errorSink struct {
sync.Mutex
failedTxns []staking.RPCTransactionError
failedStakingTxns *ring.Ring
failedTxns *ring.Ring
}
}
@ -289,7 +291,8 @@ func (node *Node) addPendingTransactions(newTxs types.Transactions) {
func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) {
txPoolLimit := 1000 // TODO: incorporate staking txn into TxPool
if node.NodeConfig.ShardID == 0 && node.Blockchain().Config().IsPreStaking(node.Blockchain().CurrentHeader().Epoch()) {
if node.NodeConfig.ShardID == shard.BeaconChainShardID &&
node.Blockchain().Config().IsPreStaking(node.Blockchain().CurrentHeader().Epoch()) {
node.pendingStakingTxMutex.Lock()
for _, tx := range newStakingTxs {
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok {
@ -396,11 +399,12 @@ func (node *Node) GetSyncID() [SyncIDLength]byte {
func New(host p2p.Host, consensusObj *consensus.Consensus,
chainDBFactory shardchain.DBFactory, isArchival bool) *Node {
node := Node{}
const sinkSize = 1024
node.errorSink = struct {
sync.Mutex
failedTxns []staking.RPCTransactionError
}{}
failedStakingTxns *ring.Ring
failedTxns *ring.Ring
}{sync.Mutex{}, ring.New(sinkSize), ring.New(sinkSize)}
node.syncFreq = SyncFrequency
node.beaconSyncFreq = SyncFrequency

@ -37,7 +37,11 @@ func TestAddNewBlock(t *testing.T) {
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
node.Worker.CommitTransactions(txs, stks, common.Address{}, func(staking.RPCTransactionError) {})
node.Worker.CommitTransactions(
txs, stks, common.Address{},
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
_, err = node.Blockchain().InsertChain([]*types.Block{block}, true)
@ -70,7 +74,11 @@ func TestVerifyNewBlock(t *testing.T) {
txs := make(map[common.Address]types.Transactions)
stks := staking.StakingTransactions{}
node.Worker.CommitTransactions(txs, stks, common.Address{}, func(staking.RPCTransactionError) {})
node.Worker.CommitTransactions(
txs, stks, common.Address{},
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
block, _ := node.Worker.FinalizeNewBlock([]byte{}, []byte{}, 0, common.Address{}, nil, nil)
if err := node.VerifyNewBlock(block); err != nil {

@ -123,13 +123,19 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
if err := node.Worker.CommitTransactions(
pending, pendingStakingTransactions, beneficiary,
func(payload staking.RPCTransactionError) {
const maxSize = 1024
func(payload []staking.RPCTransactionError) {
node.errorSink.Lock()
if l := len(node.errorSink.failedTxns); l >= maxSize {
node.errorSink.failedTxns = append(node.errorSink.failedTxns[1:], payload)
} else {
node.errorSink.failedTxns = append(node.errorSink.failedTxns, payload)
for i := range payload {
node.errorSink.failedStakingTxns.Value = payload[i]
node.errorSink.failedStakingTxns = node.errorSink.failedStakingTxns.Next()
}
node.errorSink.Unlock()
},
func(payload []types.RPCTransactionError) {
node.errorSink.Lock()
for i := range payload {
node.errorSink.failedTxns.Value = payload[i]
node.errorSink.failedTxns = node.errorSink.failedTxns.Next()
}
node.errorSink.Unlock()
},

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/hmy"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/hmyapi"
@ -24,25 +25,20 @@ const (
var (
// HTTP RPC
rpcAPIs []rpc.API
httpListener net.Listener
httpHandler *rpc.Server
wsListener net.Listener
wsHandler *rpc.Server
httpEndpoint = ""
wsEndpoint = ""
rpcAPIs []rpc.API
httpListener net.Listener
httpHandler *rpc.Server
wsListener net.Listener
wsHandler *rpc.Server
httpEndpoint = ""
wsEndpoint = ""
httpModules = []string{"hmy", "net", "explorer"}
httpVirtualHosts = []string{"*"}
httpTimeouts = rpc.DefaultHTTPTimeouts
httpOrigins = []string{"*"}
wsModules = []string{"net", "web3"}
wsOrigins = []string{"*"}
harmony *hmy.Harmony
wsModules = []string{"net", "web3"}
wsOrigins = []string{"*"}
harmony *hmy.Harmony
)
// IsCurrentlyLeader exposes if node is currently the leader node
@ -51,10 +47,27 @@ func (node *Node) IsCurrentlyLeader() bool {
}
// ErroredStakingTransactionSink is the inmemory failed staking transactions this node has
func (node *Node) ErroredStakingTransactionSink() []staking.RPCTransactionError {
func (node *Node) ErroredStakingTransactionSink() (result []staking.RPCTransactionError) {
node.errorSink.Lock()
defer node.errorSink.Unlock()
node.errorSink.failedStakingTxns.Do(func(d interface{}) {
if d != nil {
result = append(result, d.(staking.RPCTransactionError))
}
})
return result
}
// ErroredTransactionSink is the inmemory failed transactions this node has
func (node *Node) ErroredTransactionSink() (result []types.RPCTransactionError) {
node.errorSink.Lock()
defer node.errorSink.Unlock()
return node.errorSink.failedTxns
node.errorSink.failedTxns.Do(func(d interface{}) {
if d != nil {
result = append(result, d.(types.RPCTransactionError))
}
})
return result
}
// IsBeaconChainExplorerNode ..
@ -117,7 +130,6 @@ func (node *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, c
// All listeners booted successfully
httpListener = listener
httpHandler = handler
return nil
}
@ -126,7 +138,6 @@ func (node *Node) stopHTTP() {
if httpListener != nil {
httpListener.Close()
httpListener = nil
utils.Logger().Info().Str("url", fmt.Sprintf("http://%s", httpEndpoint)).Msg("HTTP endpoint closed")
}
if httpHandler != nil {
@ -149,7 +160,6 @@ func (node *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsO
// All listeners booted successfully
wsListener = listener
wsHandler = handler
return nil
}
@ -158,7 +168,6 @@ func (node *Node) stopWS() {
if wsListener != nil {
wsListener.Close()
wsListener = nil
utils.Logger().Info().Str("url", fmt.Sprintf("ws://%s", wsEndpoint)).Msg("WebSocket endpoint closed")
}
if wsHandler != nil {
@ -172,7 +181,6 @@ func (node *Node) stopWS() {
func (node *Node) APIs() []rpc.API {
// Gather all the possible APIs to surface
apis := hmyapi.GetAPIs(harmony.APIBackend)
// Append all the local APIs and return
return append(apis, []rpc.API{
{

@ -52,7 +52,8 @@ type Worker struct {
func (w *Worker) CommitTransactions(
pendingNormal map[common.Address]types.Transactions,
pendingStaking staking.StakingTransactions, coinbase common.Address,
stkingTxErrorSink func(staking.RPCTransactionError),
stkingTxErrorSink func([]staking.RPCTransactionError),
txnErrorSink func([]types.RPCTransactionError),
) error {
if w.current.gasPool == nil {
@ -60,9 +61,9 @@ func (w *Worker) CommitTransactions(
}
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pendingNormal)
var coalescedLogs []*types.Log
coalescedLogs := []*types.Log{}
erroredTxns := []types.RPCTransactionError{}
erroredStakingTxns := []staking.RPCTransactionError{}
// NORMAL
for {
// If we don't have enough gas for any further transactions then we're done
@ -77,14 +78,12 @@ func (w *Worker) CommitTransactions(
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(w.current.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.config.IsEIP155(w.current.header.Number()) {
utils.Logger().Info().Str("hash", tx.Hash().Hex()).Str("eip155Epoch", w.config.EIP155Epoch.String()).Msg("Ignoring reply protected transaction")
txs.Pop()
continue
}
@ -97,6 +96,11 @@ func (w *Worker) CommitTransactions(
}
logs, err := w.commitTransaction(tx, coinbase)
if err != nil {
erroredTxns = append(erroredTxns, types.RPCTransactionError{
tx.Hash().Hex(), time.Now().Unix(), err.Error(),
})
}
sender, _ := common2.AddressToBech32(from)
switch err {
case core.ErrGasLimitReached:
@ -131,8 +135,9 @@ func (w *Worker) CommitTransactions(
if w.chain.ShardID() == shard.BeaconChainShardID {
for _, tx := range pendingStaking {
logs, err := w.commitStakingTransaction(tx, coinbase)
if txID := tx.Hash().Hex(); err != nil {
stkingTxErrorSink(staking.RPCTransactionError{
if err != nil {
txID := tx.Hash().Hex()
erroredStakingTxns = append(erroredStakingTxns, staking.RPCTransactionError{
TxHashID: txID,
StakingDirective: tx.StakingType().String(),
TimestampOfRejection: time.Now().Unix(),
@ -149,9 +154,15 @@ func (w *Worker) CommitTransactions(
}
}
}
utils.Logger().Info().Int("newTxns", len(w.current.txs)).Uint64("blockGasLimit", w.current.header.GasLimit()).Uint64("blockGasUsed", w.current.header.GasUsed()).Msg("Block gas limit and usage info")
// Here call the error functions
stkingTxErrorSink(erroredStakingTxns)
txnErrorSink(erroredTxns)
utils.Logger().Info().
Int("newTxns", len(w.current.txs)).
Uint64("blockGasLimit", w.current.header.GasLimit()).
Uint64("blockGasUsed", w.current.header.GasUsed()).
Msg("Block gas limit and usage info")
return nil
}

@ -78,7 +78,11 @@ func TestCommitTransactions(t *testing.T) {
// Commit the tx to the worker
txs := make(map[common.Address]types.Transactions)
txs[testBankAddress] = types.Transactions{tx}
err := worker.CommitTransactions(txs, nil, testBankAddress, func(staking.RPCTransactionError) {})
err := worker.CommitTransactions(
txs, nil, testBankAddress,
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
if err != nil {
t.Error(err)
}

@ -127,7 +127,11 @@ func fundFaucetContract(chain *core.BlockChain) {
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = txs
err := contractworker.CommitTransactions(txmap, nil, testUserAddress, func(staking.RPCTransactionError) {})
err := contractworker.CommitTransactions(
txmap, nil, testUserAddress,
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
if err != nil {
fmt.Println(err)
}
@ -166,7 +170,11 @@ func callFaucetContractToFundAnAddress(chain *core.BlockChain) {
txmap := make(map[common.Address]types.Transactions)
txmap[FaucetAddress] = types.Transactions{callfaucettx}
err = contractworker.CommitTransactions(txmap, nil, testUserAddress, func(staking.RPCTransactionError) {})
err = contractworker.CommitTransactions(
txmap, nil, testUserAddress,
func([]staking.RPCTransactionError) {},
func([]types.RPCTransactionError) {},
)
if err != nil {
fmt.Println(err)
}

Loading…
Cancel
Save