From 89ad5ef458e4df61a436c2b1a5ce7117a3ae6b96 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Tue, 14 May 2019 04:07:44 +0000 Subject: [PATCH 1/2] fix concurrent map write error #837 Signed-off-by: Leo Chen --- node/node.go | 4 +--- node/node_handler.go | 4 ++-- node/puzzle_contract.go | 10 +++++++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/node/node.go b/node/node.go index f0fe3d234..f6b13e099 100644 --- a/node/node.go +++ b/node/node.go @@ -153,7 +153,7 @@ type Node struct { ContractAddresses []common.Address // For puzzle contracts - AddressNonce map[common.Address]*uint64 + AddressNonce sync.Map // Shard group Message Receiver shardGroupReceiver p2p.GroupReceiver @@ -325,8 +325,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) - node.AddressNonce = make(map[common.Address]*uint64) - node.startConsensus = make(chan struct{}) // Get the node config that's created in the harmony.go program. diff --git a/node/node_handler.go b/node/node_handler.go index f82f535e3..c17eea491 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -314,9 +314,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { if err != nil { utils.GetLogInstance().Error("Error when parsing tx into message") } - if _, ok := node.AddressNonce[msg.From()]; ok { + if _, ok := node.AddressNonce.Load(msg.From()); ok { nonce := node.GetNonceOfAddress(msg.From()) - atomic.StoreUint64(node.AddressNonce[msg.From()], nonce) + node.AddressNonce.Store(msg.From(), nonce) } } diff --git a/node/puzzle_contract.go b/node/puzzle_contract.go index bdf8041e5..e20735a11 100644 --- a/node/puzzle_contract.go +++ b/node/puzzle_contract.go @@ -214,11 +214,15 @@ func (node *Node) CreateTransactionForEndMethod(priKey string) (string, error) { // GetAndIncreaseAddressNonce get and increase the address's nonce func (node *Node) GetAndIncreaseAddressNonce(address common.Address) uint64 { - if value, ok := node.AddressNonce[address]; ok { - nonce := atomic.AddUint64(value, 1) + if value, ok := node.AddressNonce.Load(address); ok { + n, ok := value.(uint64) + if !ok { + return 0 + } + nonce := atomic.AddUint64(&n, 1) return nonce - 1 } nonce := node.GetNonceOfAddress(address) + 1 - node.AddressNonce[address] = &nonce + node.AddressNonce.Store(address, nonce) return nonce - 1 } From 0d83532785eaa107dd68d9dff946d2ddf8074d32 Mon Sep 17 00:00:00 2001 From: Yelin Lu Date: Fri, 10 May 2019 16:09:20 +0000 Subject: [PATCH 2/2] new rpc framework 1. Remove eth rpc code and unnecessary changes from 7a0f18f92bb88baf2d343109ea68177ddabe946a. Import the package instead. 2. Move HmyAPIBackend into core package so it has more access to core. 3. Add API interface for services. 4. Start RPC for all roles, for both HTTP (baseport+10) and WS (baseport+20). 3. Keep implemented APIs but move to internal/hmyapi package. --- api/service/blockproposal/service.go | 6 + api/service/clientsupport/service.go | 6 + api/service/consensus/service.go | 6 + api/service/discovery/service.go | 6 + api/service/explorer/service.go | 6 + api/service/manager.go | 7 +- api/service/manager_test.go | 3 + api/service/networkinfo/service.go | 6 + api/service/newclientsupport/service.go | 6 + api/service/randomness/service.go | 6 + api/service/resharding/service.go | 6 + api/service/restclientsupport/service.go | 6 + api/service/rpc/service.go | 128 ----- api/service/staking/service.go | 6 + cmd/harmony/main.go | 1 + rpc/backend.go => core/api_backend.go | 28 +- core/blockchain.go | 3 - {rpc => internal}/hmyapi/addrlock.go | 0 {rpc => internal}/hmyapi/backend.go | 11 +- {rpc => internal}/hmyapi/private.go | 0 {rpc => internal}/hmyapi/public.go | 10 +- {rpc => internal}/hmyapi/types.go | 0 node/rpc.go | 133 +++++ node/service.go | 32 -- node/service_setup.go | 4 - rpc/README.md | 20 - rpc/client.go | 623 ----------------------- rpc/endpoints.go | 79 --- rpc/errors.go | 49 -- rpc/handler.go | 381 -------------- rpc/http.go | 322 ------------ rpc/ipc.go | 40 -- rpc/ipc_unix.go | 46 -- rpc/json.go | 314 ------------ rpc/server.go | 116 ----- rpc/service.go | 269 ---------- rpc/stdio.go | 50 -- rpc/subscription.go | 311 ----------- rpc/types.go | 94 ---- rpc/websocket.go | 222 -------- 40 files changed, 231 insertions(+), 3131 deletions(-) delete mode 100644 api/service/rpc/service.go rename rpc/backend.go => core/api_backend.go (78%) rename {rpc => internal}/hmyapi/addrlock.go (100%) rename {rpc => internal}/hmyapi/backend.go (64%) rename {rpc => internal}/hmyapi/private.go (100%) rename {rpc => internal}/hmyapi/public.go (96%) rename {rpc => internal}/hmyapi/types.go (100%) create mode 100644 node/rpc.go delete mode 100644 node/service.go delete mode 100644 rpc/README.md delete mode 100644 rpc/client.go delete mode 100644 rpc/endpoints.go delete mode 100644 rpc/errors.go delete mode 100644 rpc/handler.go delete mode 100644 rpc/http.go delete mode 100644 rpc/ipc.go delete mode 100644 rpc/ipc_unix.go delete mode 100644 rpc/json.go delete mode 100644 rpc/server.go delete mode 100644 rpc/service.go delete mode 100644 rpc/stdio.go delete mode 100644 rpc/subscription.go delete mode 100644 rpc/types.go delete mode 100644 rpc/websocket.go diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index e2bc83959..14ef89238 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -1,6 +1,7 @@ package blockproposal import ( + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" ) @@ -54,3 +55,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/clientsupport/service.go b/api/service/clientsupport/service.go index 3cbdaa020..7913db674 100644 --- a/api/service/clientsupport/service.go +++ b/api/service/clientsupport/service.go @@ -4,6 +4,7 @@ import ( "strconv" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" clientService "github.com/harmony-one/harmony/api/client/service" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/state" @@ -55,3 +56,8 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { func (s *Service) NotifyService(params map[string]interface{}) { return } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/consensus/service.go b/api/service/consensus/service.go index 139ed55d3..9211f6bbc 100644 --- a/api/service/consensus/service.go +++ b/api/service/consensus/service.go @@ -1,6 +1,7 @@ package consensus import ( + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" @@ -52,3 +53,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index b354169bf..204c19632 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -3,6 +3,7 @@ package discovery import ( "time" + "github.com/ethereum/go-ethereum/rpc" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/service" @@ -147,3 +148,8 @@ func (s *Service) Init() { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index ae615c9f1..754ba96ab 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/mux" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/types" @@ -302,3 +303,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/manager.go b/api/service/manager.go index aacae3521..403b4863a 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -3,6 +3,7 @@ package service import ( "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" ) @@ -34,7 +35,6 @@ const ( PeerDiscovery Resharding Staking - RPC Test Done ) @@ -65,8 +65,6 @@ func (t Type) String() string { return "PeerDiscovery" case Resharding: return "Resharding" - case RPC: - return "RPC" case Test: return "Test" case Done: @@ -95,6 +93,9 @@ type Interface interface { SetMessageChan(msgChan chan *msg_pb.Message) StopService() NotifyService(map[string]interface{}) + + // APIs retrieves the list of RPC descriptors the service provides + APIs() []rpc.API } // Manager stores all services for service manager. diff --git a/api/service/manager_test.go b/api/service/manager_test.go index 346181756..0e147f78e 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/p2p" @@ -40,6 +41,8 @@ func (s *SupportSyncingTest) SetMessageChan(msgChan chan *msg_pb.Message) { s.msgChan = msgChan } +func (s *SupportSyncingTest) APIs() []rpc.API { return nil } + // Test TakeAction. func TestTakeAction(t *testing.T) { m := &Manager{} diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 069f2bac5..7145399a5 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" @@ -226,3 +227,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/newclientsupport/service.go b/api/service/newclientsupport/service.go index ffff8a4db..a0858e628 100644 --- a/api/service/newclientsupport/service.go +++ b/api/service/newclientsupport/service.go @@ -3,6 +3,7 @@ package newclientsupport import ( "math/big" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" ) @@ -42,3 +43,8 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { func (s *Service) NotifyService(params map[string]interface{}) { return } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/randomness/service.go b/api/service/randomness/service.go index a32bdc926..8fb11513b 100644 --- a/api/service/randomness/service.go +++ b/api/service/randomness/service.go @@ -1,6 +1,7 @@ package randomness import ( + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/internal/utils" @@ -43,3 +44,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/resharding/service.go b/api/service/resharding/service.go index 9377ecf88..2cb04571d 100644 --- a/api/service/resharding/service.go +++ b/api/service/resharding/service.go @@ -3,6 +3,7 @@ package resharding import ( "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" @@ -89,3 +90,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/restclientsupport/service.go b/api/service/restclientsupport/service.go index 1f98095ff..0ca7affcc 100644 --- a/api/service/restclientsupport/service.go +++ b/api/service/restclientsupport/service.go @@ -11,6 +11,7 @@ import ( "strconv" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/mux" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" @@ -259,3 +260,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/rpc/service.go b/api/service/rpc/service.go deleted file mode 100644 index e3cf6def4..000000000 --- a/api/service/rpc/service.go +++ /dev/null @@ -1,128 +0,0 @@ -package rpcservice - -import ( - "context" - "fmt" - "net" - "net/http" - "strconv" - - msg_pb "github.com/harmony-one/harmony/api/proto/message" - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/rpc" - "github.com/harmony-one/harmony/rpc/hmyapi" -) - -const ( - rpcPortDiff = 123 -) - -// Service is the struct for rpc service. -type Service struct { - messageChan chan *msg_pb.Message - server *http.Server - // Util - peer *p2p.Peer - blockchain *core.BlockChain - txPool *core.TxPool - // HTTP RPC - rpcAPIs []rpc.API // List of APIs currently provided by the node - httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) - httpWhitelist []string // HTTP RPC modules to allow through this endpoint - httpListener net.Listener // HTTP RPC listener socket to server API requests - httpHandler *rpc.Server // HTTP RPC request handler to process the API requests -} - -// New returns RPC service. -func New(b *core.BlockChain, p *p2p.Peer, t *core.TxPool) *Service { - return &Service{ - blockchain: b, - peer: p, - txPool: t, - } -} - -// StartService starts RPC service. -func (s *Service) StartService() { - utils.GetLogInstance().Info("Starting RPC service.") - if err := s.startRPC(); err != nil { - utils.GetLogInstance().Error("Failed to start RPC service.") - } else { - utils.GetLogInstance().Info("Started RPC service.") - } -} - -// StopService shutdowns RPC service. -func (s *Service) StopService() { - utils.GetLogInstance().Info("Shutting down RPC service.") - if err := s.server.Shutdown(context.Background()); err != nil { - utils.GetLogInstance().Error("Error when shutting down RPC server", "error", err) - } else { - utils.GetLogInstance().Error("Shutting down RPC server successufully") - } -} - -// NotifyService notify service -func (s *Service) NotifyService(params map[string]interface{}) { - return -} - -// SetMessageChan sets up message channel to service. -func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { - s.messageChan = messageChan -} - -// startRPC is a helper method to start all the various RPC endpoint during node -// startup. It's not meant to be called at any time afterwards as it makes certain -// assumptions about the state of the node. -func (s *Service) startRPC() error { - apis := hmyapi.GetAPIs(rpc.NewBackend(s.blockchain, s.txPool)) - - port, _ := strconv.Atoi(s.peer.Port) - s.httpEndpoint = fmt.Sprintf("localhost:%v", port+rpcPortDiff) - if err := s.startHTTP(s.httpEndpoint, apis); err != nil { - utils.GetLogInstance().Debug("Failed to start RPC HTTP") - return err - } - utils.GetLogInstance().Debug("Started RPC HTTP") - - // All API endpoints started successfully - s.rpcAPIs = apis - return nil -} - -// startHTTP initializes and starts the HTTP RPC endpoint. -func (s *Service) startHTTP(endpoint string, apis []rpc.API) error { - utils.GetLogInstance().Debug("rpc startHTTP", "endpoint", endpoint, "apis", apis) - // Short circuit if the HTTP endpoint isn't being exposed - if endpoint == "" { - return nil - } - listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis) - if err != nil { - return err - } - utils.GetLogInstance().Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint)) - // All listeners booted successfully - s.httpEndpoint = endpoint - s.httpListener = listener - s.httpHandler = handler - - return nil -} - -// stopHTTP terminates the HTTP RPC endpoint. -func (s *Service) stopHTTP() { - if s.httpListener != nil { - s.httpListener.Close() - s.httpListener = nil - - utils.GetLogInstance().Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", s.httpEndpoint)) - } - if s.httpHandler != nil { - s.httpHandler.Stop() - s.httpHandler = nil - } -} diff --git a/api/service/staking/service.go b/api/service/staking/service.go index 896fe1ad6..a3342182a 100644 --- a/api/service/staking/service.go +++ b/api/service/staking/service.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" protobuf "github.com/golang/protobuf/proto" proto "github.com/harmony-one/harmony/api/client/service/proto" proto_common "github.com/harmony-one/harmony/api/proto" @@ -250,3 +251,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index c62a72778..f3280f86e 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -354,6 +354,7 @@ func main() { go currentNode.SupportSyncing() currentNode.ServiceManagerSetup() + currentNode.StartRPC(*port) currentNode.RunServices() currentNode.StartServer() } diff --git a/rpc/backend.go b/core/api_backend.go similarity index 78% rename from rpc/backend.go rename to core/api_backend.go index a90a0eed8..badcad21c 100644 --- a/rpc/backend.go +++ b/core/api_backend.go @@ -1,4 +1,4 @@ -package rpc +package core import ( "context" @@ -6,25 +6,25 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" - "github.com/harmony-one/harmony/core" + "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" ) // HmyAPIBackend ... type HmyAPIBackend struct { - blockchain *core.BlockChain - txPool *core.TxPool + blockchain *BlockChain + txPool *TxPool } // NewBackend ... -func NewBackend(blockchain *core.BlockChain, txPool *core.TxPool) *HmyAPIBackend { +func NewBackend(blockchain *BlockChain, txPool *TxPool) *HmyAPIBackend { return &HmyAPIBackend{blockchain, txPool} } // ChainDb ... func (b *HmyAPIBackend) ChainDb() ethdb.Database { - return b.blockchain.ChainDb() + return b.blockchain.db } // GetBlock ... @@ -38,22 +38,22 @@ func (b *HmyAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction } // BlockByNumber ... -func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr BlockNumber) (*types.Block, error) { +func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { // Pending block is only known by the miner - if blockNr == PendingBlockNumber { + if blockNr == rpc.PendingBlockNumber { return nil, errors.New("not implemented") } // Otherwise resolve and return the block - if blockNr == latestBlockNumber { + if blockNr == rpc.LatestBlockNumber { return b.blockchain.CurrentBlock(), nil } return b.blockchain.GetBlockByNumber(uint64(blockNr)), nil } // StateAndHeaderByNumber ... -func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr BlockNumber) (*state.DB, *types.Header, error) { +func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) { // Pending state is only known by the miner - if blockNr == PendingBlockNumber { + if blockNr == rpc.PendingBlockNumber { return nil, nil, errors.New("not implemented") } // Otherwise resolve the block number and return its state @@ -66,13 +66,13 @@ func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr Bloc } // HeaderByNumber ... -func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr BlockNumber) (*types.Header, error) { +func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { // Pending block is only known by the miner - if blockNr == PendingBlockNumber { + if blockNr == rpc.PendingBlockNumber { return nil, errors.New("not implemented") } // Otherwise resolve and return the block - if blockNr == latestBlockNumber { + if blockNr == rpc.LatestBlockNumber { return b.blockchain.CurrentBlock().Header(), nil } return b.blockchain.GetHeaderByNumber(uint64(blockNr)), nil diff --git a/core/blockchain.go b/core/blockchain.go index b8245867f..5f1ce92cd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1744,6 +1744,3 @@ func (bc *BlockChain) StoreNewShardState(block *types.Block, stakeInfo *map[comm } return shardState } - -// ChainDb returns the database -func (bc *BlockChain) ChainDb() ethdb.Database { return bc.db } diff --git a/rpc/hmyapi/addrlock.go b/internal/hmyapi/addrlock.go similarity index 100% rename from rpc/hmyapi/addrlock.go rename to internal/hmyapi/addrlock.go diff --git a/rpc/hmyapi/backend.go b/internal/hmyapi/backend.go similarity index 64% rename from rpc/hmyapi/backend.go rename to internal/hmyapi/backend.go index bc9950c3c..ae2571a77 100644 --- a/rpc/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -1,22 +1,21 @@ package hmyapi import ( - "github.com/harmony-one/harmony/rpc" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" ) -const namespace = "hmy" - // GetAPIs returns all the APIs. -func GetAPIs(b *rpc.HmyAPIBackend) []rpc.API { +func GetAPIs(b *core.HmyAPIBackend) []rpc.API { nonceLock := new(AddrLocker) return []rpc.API{ { - Namespace: namespace, + Namespace: "hmy", Version: "1.0", Service: NewPublicBlockChainAPI(b), Public: true, }, { - Namespace: namespace, + Namespace: "hmy", Version: "1.0", Service: NewPublicTransactionPoolAPI(b, nonceLock), Public: true, diff --git a/rpc/hmyapi/private.go b/internal/hmyapi/private.go similarity index 100% rename from rpc/hmyapi/private.go rename to internal/hmyapi/private.go diff --git a/rpc/hmyapi/public.go b/internal/hmyapi/public.go similarity index 96% rename from rpc/hmyapi/public.go rename to internal/hmyapi/public.go index 5502222bd..b40d618ee 100644 --- a/rpc/hmyapi/public.go +++ b/internal/hmyapi/public.go @@ -6,21 +6,21 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/api/proto" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/rpc" ) // PublicBlockChainAPI provides an API to access the Harmony blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { - b *rpc.HmyAPIBackend + b *core.HmyAPIBackend } // NewPublicBlockChainAPI creates a new Harmony blockchain API. -func NewPublicBlockChainAPI(b *rpc.HmyAPIBackend) *PublicBlockChainAPI { +func NewPublicBlockChainAPI(b *core.HmyAPIBackend) *PublicBlockChainAPI { return &PublicBlockChainAPI{b} } @@ -124,12 +124,12 @@ func (s *PublicNetAPI) PeerCount() hexutil.Uint { // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - b *rpc.HmyAPIBackend + b *core.HmyAPIBackend nonceLock *AddrLocker } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. -func NewPublicTransactionPoolAPI(b *rpc.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI { +func NewPublicTransactionPoolAPI(b *core.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI { return &PublicTransactionPoolAPI{b, nonceLock} } diff --git a/rpc/hmyapi/types.go b/internal/hmyapi/types.go similarity index 100% rename from rpc/hmyapi/types.go rename to internal/hmyapi/types.go diff --git a/node/rpc.go b/node/rpc.go new file mode 100644 index 000000000..1e30f48c1 --- /dev/null +++ b/node/rpc.go @@ -0,0 +1,133 @@ +package node + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/internal/hmyapi" +) + +const ( + rpcHTTPPortOffset = 10 + rpcWSPortOffset = 20 +) + +var ( + // HTTP RPC + rpcAPIs []rpc.API + httpListener net.Listener + httpHandler *rpc.Server + + wsListener net.Listener + wsHandler *rpc.Server + + httpEndpoint = "" + wsEndpoint = "" + + httpModules = []string{"hmy"} + httpVirtualHosts = []string{"*"} + httpTimeouts = rpc.DefaultHTTPTimeouts + + wsModules = []string{"net", "web3"} + wsOrigins = []string{"*"} + + apiBackend *core.HmyAPIBackend +) + +// StartRPC start RPC service +func (node *Node) StartRPC(nodePort string) error { + // Gather all the possible APIs to surface + apiBackend = core.NewBackend(node.blockchain, node.TxPool) + + apis := hmyapi.GetAPIs(apiBackend) + for _, service := range node.serviceManager.GetServices() { + apis = append(apis, service.APIs()...) + } + + port, _ := strconv.Atoi(nodePort) + + httpEndpoint = fmt.Sprintf(":%v", port+rpcHTTPPortOffset) + if err := node.startHTTP(httpEndpoint, apis, httpModules, nil, httpVirtualHosts, httpTimeouts); err != nil { + return err + } + + wsEndpoint = fmt.Sprintf(":%v", port+rpcWSPortOffset) + if err := node.startWS(wsEndpoint, apis, wsModules, wsOrigins, true); err != nil { + node.stopHTTP() + return err + } + + rpcAPIs = apis + return nil +} + +// startHTTP initializes and starts the HTTP RPC endpoint. +func (node *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error { + // Short circuit if the HTTP endpoint isn't being exposed + if endpoint == "" { + return nil + } + + listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts) + if err != nil { + return err + } + + log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ",")) + // All listeners booted successfully + httpListener = listener + httpHandler = handler + + return nil +} + +// stopHTTP terminates the HTTP RPC endpoint. +func (node *Node) stopHTTP() { + if httpListener != nil { + httpListener.Close() + httpListener = nil + + log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", httpEndpoint)) + } + if httpHandler != nil { + httpHandler.Stop() + httpHandler = nil + } +} + +// startWS initializes and starts the websocket RPC endpoint. +func (node *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error { + // Short circuit if the WS endpoint isn't being exposed + if endpoint == "" { + return nil + } + listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll) + if err != nil { + return err + } + log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr())) + // All listeners booted successfully + wsListener = listener + wsHandler = handler + + return nil +} + +// stopWS terminates the websocket RPC endpoint. +func (node *Node) stopWS() { + if wsListener != nil { + wsListener.Close() + wsListener = nil + + log.Info("WebSocket endpoint closed", "url", fmt.Sprintf("ws://%s", wsEndpoint)) + } + if wsHandler != nil { + wsHandler.Stop() + wsHandler = nil + } +} diff --git a/node/service.go b/node/service.go deleted file mode 100644 index f97acf88d..000000000 --- a/node/service.go +++ /dev/null @@ -1,32 +0,0 @@ -package node - -import ( - "github.com/ethereum/go-ethereum/p2p" - "github.com/harmony-one/harmony/rpc" -) - -// Service is an individual protocol that can be registered into a node. -// -// Notes: -// -// • Service life-cycle management is delegated to the node. The service is allowed to -// initialize itself upon creation, but no goroutines should be spun up outside of the -// Start method. -// -// • Restart logic is not required as the node will create a fresh instance -// every time a service is started. -type Service interface { - // Protocols retrieves the P2P protocols the service wishes to start. - Protocols() []p2p.Protocol - - // APIs retrieves the list of RPC descriptors the service provides - APIs() []rpc.API - - // Start is called after all services have been constructed and the networking - // layer was also initialized to spawn any goroutines required by the service. - Start(server *p2p.Server) error - - // Stop terminates all goroutines belonging to the service, blocking until they - // are all terminated. - Stop() error -} diff --git a/node/service_setup.go b/node/service_setup.go index 6ff593cea..21c10340a 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -11,7 +11,6 @@ import ( "github.com/harmony-one/harmony/api/service/networkinfo" "github.com/harmony-one/harmony/api/service/randomness" "github.com/harmony-one/harmony/api/service/restclientsupport" - rpcservice "github.com/harmony-one/harmony/api/service/rpc" "github.com/harmony-one/harmony/api/service/staking" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -39,9 +38,6 @@ func (node *Node) setupForShardLeader() { node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) - - // Register RPC service - node.serviceManager.RegisterService(service.RPC, rpcservice.New(node.blockchain, &node.SelfPeer, node.TxPool)) } func (node *Node) setupForShardValidator() { diff --git a/rpc/README.md b/rpc/README.md deleted file mode 100644 index b789cd8a1..000000000 --- a/rpc/README.md +++ /dev/null @@ -1,20 +0,0 @@ -Files that are mainly ported over from go-ethereum (might include tslint fix): -* rpc/client.go: the rpc client. -* rpc/endpoints.go: starting HTTP/WS/IPC endpoints. -* rpc/errors.go: error code handling. -* rpc/handler.go: handling requests, running methods. -* rpc/http.go: starting HTTP service. -* rpc/ipc_unix.go: util for ipc unix -* rpc/ipc.go: util for general ipc -* rpc/json.go: json de-/serialization -* rpc/server.go: the rpc server -* rpc/stdio.go: stdio for ipc. -* rpc/subscription.go: ws subscriptions. -* rpc/types.go: type declarations -* rpc/websocket.go: ws connection. -* rpc/hmyapi/addrlock.go - -Richard's changes: -* rpc/service.go: our service (http/ws/ipc) starter/stoper -* other files under `rpc/hmyapi/`. -* other files that are not in `rpc` folder. Added some util functions. \ No newline at end of file diff --git a/rpc/client.go b/rpc/client.go deleted file mode 100644 index 7995640d6..000000000 --- a/rpc/client.go +++ /dev/null @@ -1,623 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "net/url" - "reflect" - "strconv" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/log" -) - -var ( - errClientQuit = errors.New("client is closed") - errNoResult = errors.New("no result in JSON-RPC response") - errSubscriptionQueueOverflow = errors.New("subscription queue overflow") - errClientReconnected = errors.New("client reconnected") - errDead = errors.New("connection lost") -) - -const ( - // Timeouts - tcpKeepAliveInterval = 30 * time.Second - defaultDialTimeout = 10 * time.Second // used if context has no deadline - subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls -) - -const ( - // Subscriptions are removed when the subscriber cannot keep up. - // - // This can be worked around by supplying a channel with sufficiently sized buffer, - // but this can be inconvenient and hard to explain in the docs. Another issue with - // buffered channels is that the buffer is static even though it might not be needed - // most of the time. - // - // The approach taken here is to maintain a per-subscription linked list buffer - // shrinks on demand. If the buffer reaches the size below, the subscription is - // dropped. - maxClientSubscriptionBuffer = 20000 -) - -// BatchElem is an element in a batch request. -type BatchElem struct { - Method string - Args []interface{} - // The result is unmarshaled into this field. Result must be set to a - // non-nil pointer value of the desired type, otherwise the response will be - // discarded. - Result interface{} - // Error is set if the server returns an error for this request, or if - // unmarshaling into Result fails. It is not set for I/O errors. - Error error -} - -// Client represents a connection to an RPC server. -type Client struct { - idgen func() ID // for subscriptions - isHTTP bool - services *serviceRegistry - - idCounter uint32 - - // This function, if non-nil, is called when the connection is lost. - reconnectFunc reconnectFunc - - // writeConn is used for writing to the connection on the caller's goroutine. It should - // only be accessed outside of dispatch, with the write lock held. The write lock is - // taken by sending on requestOp and released by sending on sendDone. - writeConn jsonWriter - - // for dispatch - close chan struct{} - closing chan struct{} // closed when client is quitting - didClose chan struct{} // closed when client quits - reconnected chan ServerCodec // where write/reconnect sends the new connection - readOp chan readOp // read messages - readErr chan error // errors from read - reqInit chan *requestOp // register response IDs, takes write lock - reqSent chan error // signals write completion, releases write lock - reqTimeout chan *requestOp // removes response IDs when call timeout expires -} - -type reconnectFunc func(ctx context.Context) (ServerCodec, error) - -type clientContextKey struct{} - -type clientConn struct { - codec ServerCodec - handler *handler -} - -func (c *Client) newClientConn(conn ServerCodec) *clientConn { - ctx := context.WithValue(context.Background(), clientContextKey{}, c) - handler := newHandler(ctx, conn, c.idgen, c.services) - return &clientConn{conn, handler} -} - -func (cc *clientConn) close(err error, inflightReq *requestOp) { - cc.handler.close(err, inflightReq) - cc.codec.Close() -} - -type readOp struct { - msgs []*jsonrpcMessage - batch bool -} - -type requestOp struct { - ids []json.RawMessage - err error - resp chan *jsonrpcMessage // receives up to len(ids) responses - sub *ClientSubscription // only set for EthSubscribe requests -} - -func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { - select { - case <-ctx.Done(): - // Send the timeout to dispatch so it can remove the request IDs. - select { - case c.reqTimeout <- op: - case <-c.closing: - } - return nil, ctx.Err() - case resp := <-op.resp: - return resp, op.err - } -} - -// Dial creates a new client for the given URL. -// -// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a -// file name with no URL scheme, a local socket connection is established using UNIX -// domain sockets on supported platforms and named pipes on Windows. If you want to -// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. -// -// For websocket connections, the origin is set to the local host name. -// -// The client reconnects automatically if the connection is lost. -func Dial(rawurl string) (*Client, error) { - return DialContext(context.Background(), rawurl) -} - -// DialContext creates a new RPC client, just like Dial. -// -// The context is used to cancel or time out the initial connection establishment. It does -// not affect subsequent interactions with the client. -func DialContext(ctx context.Context, rawurl string) (*Client, error) { - u, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - switch u.Scheme { - case "http", "https": - return DialHTTP(rawurl) - case "ws", "wss": - return DialWebsocket(ctx, rawurl, "") - case "stdio": - return DialStdIO(ctx) - case "": - return DialIPC(ctx, rawurl) - default: - return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) - } -} - -// ClientFromContext Client retrieves the client from the context, if any. This can be used to perform -// 'reverse calls' in a handler method. -func ClientFromContext(ctx context.Context) (*Client, bool) { - client, ok := ctx.Value(clientContextKey{}).(*Client) - return client, ok -} - -func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { - conn, err := connect(initctx) - if err != nil { - return nil, err - } - c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) - c.reconnectFunc = connect - return c, nil -} - -func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { - _, isHTTP := conn.(*httpConn) - c := &Client{ - idgen: idgen, - isHTTP: isHTTP, - services: services, - writeConn: conn, - close: make(chan struct{}), - closing: make(chan struct{}), - didClose: make(chan struct{}), - reconnected: make(chan ServerCodec), - readOp: make(chan readOp), - readErr: make(chan error), - reqInit: make(chan *requestOp), - reqSent: make(chan error, 1), - reqTimeout: make(chan *requestOp), - } - if !isHTTP { - go c.dispatch(conn) - } - return c -} - -// RegisterName creates a service for the given receiver type under the given name. When no -// methods on the given receiver match the criteria to be either a RPC method or a -// subscription an error is returned. Otherwise a new service is created and added to the -// service collection this client provides to the server. -func (c *Client) RegisterName(name string, receiver interface{}) error { - return c.services.registerName(name, receiver) -} - -func (c *Client) nextID() json.RawMessage { - id := atomic.AddUint32(&c.idCounter, 1) - return strconv.AppendUint(nil, uint64(id), 10) -} - -// SupportedModules calls the rpc_modules method, retrieving the list of -// APIs that are available on the server. -func (c *Client) SupportedModules() (map[string]string, error) { - var result map[string]string - ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) - defer cancel() - err := c.CallContext(ctx, &result, "rpc_modules") - return result, err -} - -// Close closes the client, aborting any in-flight requests. -func (c *Client) Close() { - if c.isHTTP { - return - } - select { - case c.close <- struct{}{}: - <-c.didClose - case <-c.didClose: - } -} - -// Call performs a JSON-RPC call with the given arguments and unmarshals into -// result if no error occurred. -// -// The result must be a pointer so that package json can unmarshal into it. You -// can also pass nil, in which case the result is ignored. -func (c *Client) Call(result interface{}, method string, args ...interface{}) error { - ctx := context.Background() - return c.CallContext(ctx, result, method, args...) -} - -// CallContext performs a JSON-RPC call with the given arguments. If the context is -// canceled before the call has successfully returned, CallContext returns immediately. -// -// The result must be a pointer so that package json can unmarshal into it. You -// can also pass nil, in which case the result is ignored. -func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - msg, err := c.newMessage(method, args...) - if err != nil { - return err - } - op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} - - if c.isHTTP { - err = c.sendHTTP(ctx, op, msg) - } else { - err = c.send(ctx, op, msg) - } - if err != nil { - return err - } - - // dispatch has accepted the request and will close the channel when it quits. - switch resp, err := op.wait(ctx, c); { - case err != nil: - return err - case resp.Error != nil: - return resp.Error - case len(resp.Result) == 0: - return errNoResult - default: - return json.Unmarshal(resp.Result, &result) - } -} - -// BatchCall sends all given requests as a single batch and waits for the server -// to return a response for all of them. -// -// In contrast to Call, BatchCall only returns I/O errors. Any error specific to -// a request is reported through the Error field of the corresponding BatchElem. -// -// Note that batch calls may not be executed atomically on the server side. -func (c *Client) BatchCall(b []BatchElem) error { - ctx := context.Background() - return c.BatchCallContext(ctx, b) -} - -// BatchCallContext sends all given requests as a single batch and waits for the server -// to return a response for all of them. The wait duration is bounded by the -// context's deadline. -// -// In contrast to CallContext, BatchCallContext only returns errors that have occurred -// while sending the request. Any error specific to a request is reported through the -// Error field of the corresponding BatchElem. -// -// Note that batch calls may not be executed atomically on the server side. -func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { - msgs := make([]*jsonrpcMessage, len(b)) - op := &requestOp{ - ids: make([]json.RawMessage, len(b)), - resp: make(chan *jsonrpcMessage, len(b)), - } - for i, elem := range b { - msg, err := c.newMessage(elem.Method, elem.Args...) - if err != nil { - return err - } - msgs[i] = msg - op.ids[i] = msg.ID - } - - var err error - if c.isHTTP { - err = c.sendBatchHTTP(ctx, op, msgs) - } else { - err = c.send(ctx, op, msgs) - } - - // Wait for all responses to come back. - for n := 0; n < len(b) && err == nil; n++ { - var resp *jsonrpcMessage - resp, err = op.wait(ctx, c) - if err != nil { - break - } - // Find the element corresponding to this response. - // The element is guaranteed to be present because dispatch - // only sends valid IDs to our channel. - var elem *BatchElem - for i := range msgs { - if bytes.Equal(msgs[i].ID, resp.ID) { - elem = &b[i] - break - } - } - if resp.Error != nil { - elem.Error = resp.Error - continue - } - if len(resp.Result) == 0 { - elem.Error = errNoResult - continue - } - elem.Error = json.Unmarshal(resp.Result, elem.Result) - } - return err -} - -// Notify sends a notification, i.e. a method call that doesn't expect a response. -func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error { - op := new(requestOp) - msg, err := c.newMessage(method, args...) - if err != nil { - return err - } - msg.ID = nil - - if c.isHTTP { - return c.sendHTTP(ctx, op, msg) - } - return c.send(ctx, op, msg) -} - -// EthSubscribe registers a subscripion under the "eth" namespace. -func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - return c.Subscribe(ctx, "eth", channel, args...) -} - -// ShhSubscribe registers a subscripion under the "shh" namespace. -func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - return c.Subscribe(ctx, "shh", channel, args...) -} - -// Subscribe calls the "_subscribe" method with the given arguments, -// registering a subscription. Server notifications for the subscription are -// sent to the given channel. The element type of the channel must match the -// expected type of content returned by the subscription. -// -// The context argument cancels the RPC request that sets up the subscription but has no -// effect on the subscription after Subscribe has returned. -// -// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications -// before considering the subscriber dead. The subscription Err channel will receive -// errSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure -// that the channel usually has at least one reader to prevent this issue. -func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - // Check type of channel first. - chanVal := reflect.ValueOf(channel) - if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic("first argument to Subscribe must be a writable channel") - } - if chanVal.IsNil() { - panic("channel given to Subscribe must not be nil") - } - if c.isHTTP { - return nil, ErrNotificationsUnsupported - } - - msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) - if err != nil { - return nil, err - } - op := &requestOp{ - ids: []json.RawMessage{msg.ID}, - resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, namespace, chanVal), - } - - // Send the subscription request. - // The arrival and validity of the response is signaled on sub.quit. - if err := c.send(ctx, op, msg); err != nil { - return nil, err - } - if _, err := op.wait(ctx, c); err != nil { - return nil, err - } - return op.sub, nil -} - -func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { - msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method} - if paramsIn != nil { // prevent sending "params":null - var err error - if msg.Params, err = json.Marshal(paramsIn); err != nil { - return nil, err - } - } - return msg, nil -} - -// send registers op with the dispatch loop, then sends msg on the connection. -// if sending fails, op is deregistered. -func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { - select { - case c.reqInit <- op: - err := c.write(ctx, msg) - c.reqSent <- err - return err - case <-ctx.Done(): - // This can happen if the client is overloaded or unable to keep up with - // subscription notifications. - return ctx.Err() - case <-c.closing: - return errClientQuit - } -} - -func (c *Client) write(ctx context.Context, msg interface{}) error { - // The previous write failed. Try to establish a new connection. - if c.writeConn == nil { - if err := c.reconnect(ctx); err != nil { - return err - } - } - err := c.writeConn.Write(ctx, msg) - if err != nil { - c.writeConn = nil - } - return err -} - -func (c *Client) reconnect(ctx context.Context) error { - if c.reconnectFunc == nil { - return errDead - } - - if _, ok := ctx.Deadline(); !ok { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() - } - newconn, err := c.reconnectFunc(ctx) - if err != nil { - log.Trace("RPC client reconnect failed", "err", err) - return err - } - select { - case c.reconnected <- newconn: - c.writeConn = newconn - return nil - case <-c.didClose: - newconn.Close() - return errClientQuit - } -} - -// dispatch is the main loop of the client. -// It sends read messages to waiting calls to Call and BatchCall -// and subscription notifications to registered subscriptions. -func (c *Client) dispatch(codec ServerCodec) { - var ( - lastOp *requestOp // tracks last send operation - reqInitLock = c.reqInit // nil while the send lock is held - conn = c.newClientConn(codec) - reading = true - ) - defer func() { - close(c.closing) - if reading { - conn.close(errClientQuit, nil) - c.drainRead() - } - close(c.didClose) - }() - - // Spawn the initial read loop. - go c.read(codec) - - for { - select { - case <-c.close: - return - - // Read path: - case op := <-c.readOp: - if op.batch { - conn.handler.handleBatch(op.msgs) - } else { - conn.handler.handleMsg(op.msgs[0]) - } - - case err := <-c.readErr: - conn.handler.log.Debug("RPC connection read error", "err", err) - conn.close(err, lastOp) - reading = false - - // Reconnect: - case newcodec := <-c.reconnected: - log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr()) - if reading { - // Wait for the previous read loop to exit. This is a rare case which - // happens if this loop isn't notified in time after the connection breaks. - // In those cases the caller will notice first and reconnect. Closing the - // handler terminates all waiting requests (closing op.resp) except for - // lastOp, which will be transferred to the new handler. - conn.close(errClientReconnected, lastOp) - c.drainRead() - } - go c.read(newcodec) - reading = true - conn = c.newClientConn(newcodec) - // Re-register the in-flight request on the new handler - // because that's where it will be sent. - conn.handler.addRequestOp(lastOp) - - // Send path: - case op := <-reqInitLock: - // Stop listening for further requests until the current one has been sent. - reqInitLock = nil - lastOp = op - conn.handler.addRequestOp(op) - - case err := <-c.reqSent: - if err != nil { - // Remove response handlers for the last send. When the read loop - // goes down, it will signal all other current operations. - conn.handler.removeRequestOp(lastOp) - } - // Let the next request in. - reqInitLock = c.reqInit - lastOp = nil - - case op := <-c.reqTimeout: - conn.handler.removeRequestOp(op) - } - } -} - -// drainRead drops read messages until an error occurs. -func (c *Client) drainRead() { - for { - select { - case <-c.readOp: - case <-c.readErr: - return - } - } -} - -// read decodes RPC messages from a codec, feeding them into dispatch. -func (c *Client) read(codec ServerCodec) { - for { - msgs, batch, err := codec.Read() - if _, ok := err.(*json.SyntaxError); ok { - codec.Write(context.Background(), errorMessage(&parseError{err.Error()})) - } - if err != nil { - c.readErr <- err - return - } - c.readOp <- readOp{msgs, batch} - } -} diff --git a/rpc/endpoints.go b/rpc/endpoints.go deleted file mode 100644 index 3fe0f1767..000000000 --- a/rpc/endpoints.go +++ /dev/null @@ -1,79 +0,0 @@ -package rpc - -import ( - "net" - - "github.com/ethereum/go-ethereum/log" -) - -// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules -func StartHTTPEndpoint(endpoint string, apis []API) (net.Listener, *Server, error) { - // Register all the APIs exposed by the services - handler := NewServer() - for _, api := range apis { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return nil, nil, err - } - log.Debug("HTTP registered", "namespace", api.Namespace) - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { - return nil, nil, err - } - go NewHTTPServer(handler).Serve(listener) - return listener, handler, err -} - -// StartWSEndpoint starts a websocket endpoint -func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) { - - // Generate the whitelist based on the allowed modules - whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - } - // Register all the APIs exposed by the services - handler := NewServer() - for _, api := range apis { - if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return nil, nil, err - } - log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) - } - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { - return nil, nil, err - } - go NewWSServer(wsOrigins, handler).Serve(listener) - return listener, handler, err - -} - -// StartIPCEndpoint starts an IPC endpoint. -func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { - // Register all the APIs exposed by the services. - handler := NewServer() - for _, api := range apis { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return nil, nil, err - } - log.Debug("IPC registered", "namespace", api.Namespace) - } - // All APIs registered, start the IPC listener. - listener, err := ipcListen(ipcEndpoint) - if err != nil { - return nil, nil, err - } - go handler.ServeListener(listener) - return listener, handler, nil -} diff --git a/rpc/errors.go b/rpc/errors.go deleted file mode 100644 index 50bb0692c..000000000 --- a/rpc/errors.go +++ /dev/null @@ -1,49 +0,0 @@ -package rpc - -import "fmt" - -const defaultErrorCode = -32000 - -type methodNotFoundError struct{ method string } - -func (e *methodNotFoundError) ErrorCode() int { return -32601 } - -func (e *methodNotFoundError) Error() string { - return fmt.Sprintf("the method %s does not exist/is not available", e.method) -} - -type subscriptionNotFoundError struct{ namespace, subscription string } - -func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } - -func (e *subscriptionNotFoundError) Error() string { - return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) -} - -// Invalid JSON was received by the server. -type parseError struct{ message string } - -func (e *parseError) ErrorCode() int { return -32700 } - -func (e *parseError) Error() string { return e.message } - -// received message isn't a valid request -type invalidRequestError struct{ message string } - -func (e *invalidRequestError) ErrorCode() int { return -32600 } - -func (e *invalidRequestError) Error() string { return e.message } - -// received message is invalid -type invalidMessageError struct{ message string } - -func (e *invalidMessageError) ErrorCode() int { return -32700 } - -func (e *invalidMessageError) Error() string { return e.message } - -// unable to decode supplied params, or an invalid number of parameters -type invalidParamsError struct{ message string } - -func (e *invalidParamsError) ErrorCode() int { return -32602 } - -func (e *invalidParamsError) Error() string { return e.message } diff --git a/rpc/handler.go b/rpc/handler.go deleted file mode 100644 index ca0fb20df..000000000 --- a/rpc/handler.go +++ /dev/null @@ -1,381 +0,0 @@ -package rpc - -import ( - "context" - "encoding/json" - "reflect" - "strconv" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" -) - -// handler handles JSON-RPC messages. There is one handler per connection. Note that -// handler is not safe for concurrent use. Message handling never blocks indefinitely -// because RPCs are processed on background goroutines launched by handler. -// -// The entry points for incoming messages are: -// -// h.handleMsg(message) -// h.handleBatch(message) -// -// Outgoing calls use the requestOp struct. Register the request before sending it -// on the connection: -// -// op := &requestOp{ids: ...} -// h.addRequestOp(op) -// -// Now send the request, then wait for the reply to be delivered through handleMsg: -// -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } -// -type handler struct { - reg *serviceRegistry - unsubscribeCb *callback - idgen func() ID // subscription ID generator - respWait map[string]*requestOp // active client requests - clientSubs map[string]*ClientSubscription // active client subscriptions - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent - log log.Logger - allowSubscribe bool - - subLock sync.Mutex - serverSubs map[ID]*Subscription -} - -type callProc struct { - ctx context.Context - notifiers []*Notifier -} - -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { - rootCtx, cancelRoot := context.WithCancel(connCtx) - h := &handler{ - reg: reg, - idgen: idgen, - conn: conn, - respWait: make(map[string]*requestOp), - clientSubs: make(map[string]*ClientSubscription), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - allowSubscribe: true, - serverSubs: make(map[ID]*Subscription), - log: log.Root(), - } - if conn.RemoteAddr() != "" { - h.log = h.log.New("conn", conn.RemoteAddr()) - } - h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) - return h -} - -// handleBatch executes all messages in a batch and returns the responses. -func (h *handler) handleBatch(msgs []*jsonrpcMessage) { - // Emit error response for empty batches: - if len(msgs) == 0 { - h.startCallProc(func(cp *callProc) { - h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) - }) - return - } - - // Handle non-call messages first: - calls := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range msgs { - if handled := h.handleImmediate(msg); !handled { - calls = append(calls, msg) - } - } - if len(calls) == 0 { - return - } - // Process calls on a goroutine because they may block indefinitely: - h.startCallProc(func(cp *callProc) { - answers := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range calls { - if answer := h.handleCallMsg(cp, msg); answer != nil { - answers = append(answers, answer) - } - } - h.addSubscriptions(cp.notifiers) - if len(answers) > 0 { - h.conn.Write(cp.ctx, answers) - } - for _, n := range cp.notifiers { - n.activate() - } - }) -} - -// handleMsg handles a single message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { - if ok := h.handleImmediate(msg); ok { - return - } - h.startCallProc(func(cp *callProc) { - answer := h.handleCallMsg(cp, msg) - h.addSubscriptions(cp.notifiers) - if answer != nil { - h.conn.Write(cp.ctx, answer) - } - for _, n := range cp.notifiers { - n.activate() - } - }) -} - -// close cancels all requests except for inflightReq and waits for -// call goroutines to shut down. -func (h *handler) close(err error, inflightReq *requestOp) { - h.cancelAllRequests(err, inflightReq) - h.cancelRoot() - h.callWG.Wait() - h.cancelServerSubscriptions(err) -} - -// addRequestOp registers a request operation. -func (h *handler) addRequestOp(op *requestOp) { - for _, id := range op.ids { - h.respWait[string(id)] = op - } -} - -// removeRequestOps stops waiting for the given request IDs. -func (h *handler) removeRequestOp(op *requestOp) { - for _, id := range op.ids { - delete(h.respWait, string(id)) - } -} - -// cancelAllRequests unblocks and removes pending requests and active subscriptions. -func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { - didClose := make(map[*requestOp]bool) - if inflightReq != nil { - didClose[inflightReq] = true - } - - for id, op := range h.respWait { - // Remove the op so that later calls will not close op.resp again. - delete(h.respWait, id) - - if !didClose[op] { - op.err = err - close(op.resp) - didClose[op] = true - } - } - for id, sub := range h.clientSubs { - delete(h.clientSubs, id) - sub.quitWithError(false, err) - } -} - -func (h *handler) addSubscriptions(nn []*Notifier) { - h.subLock.Lock() - defer h.subLock.Unlock() - - for _, n := range nn { - if sub := n.takeSubscription(); sub != nil { - h.serverSubs[sub.ID] = sub - } - } -} - -// cancelServerSubscriptions removes all subscriptions and closes their error channels. -func (h *handler) cancelServerSubscriptions(err error) { - h.subLock.Lock() - defer h.subLock.Unlock() - - for id, s := range h.serverSubs { - s.err <- err - close(s.err) - delete(h.serverSubs, id) - } -} - -// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. -func (h *handler) startCallProc(fn func(*callProc)) { - h.callWG.Add(1) - go func() { - ctx, cancel := context.WithCancel(h.rootCtx) - defer h.callWG.Done() - defer cancel() - fn(&callProc{ctx: ctx}) - }() -} - -// handleImmediate executes non-call messages. It returns false if the message is a -// call or requires a reply. -func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { - start := time.Now() - switch { - case msg.isNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { - h.handleSubscriptionResult(msg) - return true - } - return false - case msg.isResponse(): - h.handleResponse(msg) - h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start)) - return true - default: - return false - } -} - -// handleSubscriptionResult processes subscription notifications. -func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { - var result subscriptionResult - if err := json.Unmarshal(msg.Params, &result); err != nil { - h.log.Debug("Dropping invalid subscription message") - return - } - if h.clientSubs[result.ID] != nil { - h.clientSubs[result.ID].deliver(result.Result) - } -} - -// handleResponse processes method call responses. -func (h *handler) handleResponse(msg *jsonrpcMessage) { - op := h.respWait[string(msg.ID)] - if op == nil { - h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) - return - } - delete(h.respWait, string(msg.ID)) - // For normal responses, just forward the reply to Call/BatchCall. - if op.sub == nil { - op.resp <- msg - return - } - // For subscription responses, start the subscription if the server - // indicates success. EthSubscribe gets unblocked in either case through - // the op.resp channel. - defer close(op.resp) - if msg.Error != nil { - op.err = msg.Error - return - } - if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.start() - h.clientSubs[op.sub.subid] = op.sub - } -} - -// handleCallMsg executes a call message and returns the answer. -func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - start := time.Now() - switch { - case msg.isNotification(): - h.handleCall(ctx, msg) - h.log.Debug("Served "+msg.Method, "t", time.Since(start)) - return nil - case msg.isCall(): - resp := h.handleCall(ctx, msg) - if resp.Error != nil { - h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) - } else { - h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) - } - return resp - case msg.hasValidID(): - return msg.errorResponse(&invalidRequestError{"invalid request"}) - default: - return errorMessage(&invalidRequestError{"invalid request"}) - } -} - -// handleCall processes method calls. -func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - if msg.isSubscribe() { - return h.handleSubscribe(cp, msg) - } - var callb *callback - if msg.isUnsubscribe() { - callb = h.unsubscribeCb - } else { - callb = h.reg.callback(msg.Method) - } - if callb == nil { - return msg.errorResponse(&methodNotFoundError{method: msg.Method}) - } - args, err := parsePositionalArguments(msg.Params, callb.argTypes) - if err != nil { - return msg.errorResponse(&invalidParamsError{err.Error()}) - } - - return h.runMethod(cp.ctx, msg, callb, args) -} - -// handleSubscribe processes *_subscribe method calls. -func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - if !h.allowSubscribe { - return msg.errorResponse(ErrNotificationsUnsupported) - } - - // Subscription method name is first argument. - name, err := parseSubscriptionName(msg.Params) - if err != nil { - return msg.errorResponse(&invalidParamsError{err.Error()}) - } - namespace := msg.namespace() - callb := h.reg.subscription(namespace, name) - if callb == nil { - return msg.errorResponse(&subscriptionNotFoundError{namespace, name}) - } - - // Parse subscription name arg too, but remove it before calling the callback. - argTypes := append([]reflect.Type{stringType}, callb.argTypes...) - args, err := parsePositionalArguments(msg.Params, argTypes) - if err != nil { - return msg.errorResponse(&invalidParamsError{err.Error()}) - } - args = args[1:] - - // Install notifier in context so the subscription handler can find it. - n := &Notifier{h: h, namespace: namespace} - cp.notifiers = append(cp.notifiers, n) - ctx := context.WithValue(cp.ctx, notifierKey{}, n) - - return h.runMethod(ctx, msg, callb, args) -} - -// runMethod runs the Go callback for an RPC method. -func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage { - result, err := callb.call(ctx, msg.Method, args) - if err != nil { - return msg.errorResponse(err) - } - return msg.response(result) -} - -// unsubscribe is the callback function for all *_unsubscribe calls. -func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) { - h.subLock.Lock() - defer h.subLock.Unlock() - - s := h.serverSubs[id] - if s == nil { - return false, ErrSubscriptionNotFound - } - close(s.err) - delete(h.serverSubs, id) - return true, nil -} - -type idForLog struct{ json.RawMessage } - -func (id idForLog) String() string { - if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { - return s - } - return string(id.RawMessage) -} diff --git a/rpc/http.go b/rpc/http.go deleted file mode 100644 index c6a0d4c36..000000000 --- a/rpc/http.go +++ /dev/null @@ -1,322 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" - "net" - "net/http" - "sync" - "time" -) - -const ( - maxRequestContentLength = 1024 * 512 - contentType = "application/json" -) - -// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 -var acceptedContentTypes = []string{contentType, "application/json-rpc", "application/jsonrequest"} - -type httpConn struct { - client *http.Client - req *http.Request - closeOnce sync.Once - closed chan interface{} -} - -// httpConn is treated specially by Client. -func (hc *httpConn) Write(context.Context, interface{}) error { - panic("Write called on httpConn") -} - -func (hc *httpConn) RemoteAddr() string { - return hc.req.URL.String() -} - -func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) { - <-hc.closed - return nil, false, io.EOF -} - -func (hc *httpConn) Close() { - hc.closeOnce.Do(func() { close(hc.closed) }) -} - -func (hc *httpConn) Closed() <-chan interface{} { - return hc.closed -} - -// HTTPTimeouts represents the configuration params for the HTTP RPC server. -type HTTPTimeouts struct { - // ReadTimeout is the maximum duration for reading the entire - // request, including the body. - // - // Because ReadTimeout does not let Handlers make per-request - // decisions on each request body's acceptable deadline or - // upload rate, most users will prefer to use - // ReadHeaderTimeout. It is valid to use them both. - ReadTimeout time.Duration - - // WriteTimeout is the maximum duration before timing out - // writes of the response. It is reset whenever a new - // request's header is read. Like ReadTimeout, it does not - // let Handlers make decisions on a per-request basis. - WriteTimeout time.Duration - - // IdleTimeout is the maximum amount of time to wait for the - // next request when keep-alives are enabled. If IdleTimeout - // is zero, the value of ReadTimeout is used. If both are - // zero, ReadHeaderTimeout is used. - IdleTimeout time.Duration -} - -// DefaultHTTPTimeouts represents the default timeout values used if further -// configuration is not provided. -var DefaultHTTPTimeouts = HTTPTimeouts{ - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 120 * time.Second, -} - -// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP -// using the provided HTTP Client. -func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { - req, err := http.NewRequest(http.MethodPost, endpoint, nil) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", contentType) - req.Header.Set("Accept", contentType) - - initctx := context.Background() - return newClient(initctx, func(context.Context) (ServerCodec, error) { - return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil - }) -} - -// DialHTTP creates a new RPC client that connects to an RPC server over HTTP. -func DialHTTP(endpoint string) (*Client, error) { - return DialHTTPWithClient(endpoint, new(http.Client)) -} - -func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { - hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msg) - if respBody != nil { - defer respBody.Close() - } - - if err != nil { - if respBody != nil { - buf := new(bytes.Buffer) - if _, err2 := buf.ReadFrom(respBody); err2 == nil { - return fmt.Errorf("%v %v", err, buf.String()) - } - } - return err - } - var respmsg jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { - return err - } - op.resp <- &respmsg - return nil -} - -func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { - hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msgs) - if err != nil { - return err - } - defer respBody.Close() - var respmsgs []jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { - return err - } - for i := 0; i < len(respmsgs); i++ { - op.resp <- &respmsgs[i] - } - return nil -} - -func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { - body, err := json.Marshal(msg) - if err != nil { - return nil, err - } - req := hc.req.WithContext(ctx) - req.Body = ioutil.NopCloser(bytes.NewReader(body)) - req.ContentLength = int64(len(body)) - - resp, err := hc.client.Do(req) - if err != nil { - return nil, err - } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return resp.Body, errors.New(resp.Status) - } - return resp.Body, nil -} - -// httpServerConn turns a HTTP connection into a Conn. -type httpServerConn struct { - io.Reader - io.Writer - r *http.Request -} - -func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec { - body := io.LimitReader(r.Body, maxRequestContentLength) - conn := &httpServerConn{Reader: body, Writer: w, r: r} - return NewJSONCodec(conn) -} - -// Close does nothing and always returns nil. -func (t *httpServerConn) Close() error { return nil } - -// RemoteAddr returns the peer address of the underlying connection. -func (t *httpServerConn) RemoteAddr() string { - return t.r.RemoteAddr -} - -// SetWriteDeadline does nothing and always returns nil. -func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil } - -// NewHTTPServer creates a new HTTP RPC server around an API provider. -// -// Deprecated: Server implements http.Handler -func NewHTTPServer(srv http.Handler) *http.Server { - // TODO(ricl): eth wraps it in CORS-handler. Might need to port if we want to use cors. - handler := srv - - // TODO(ricl): port timeout handlers if necessary - // Bundle and start the HTTP server - return &http.Server{ - Handler: handler, - } -} - -// ServeHTTP serves JSON-RPC requests over HTTP. -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Permit dumb empty requests for remote health-checks (AWS) - if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { - return - } - if code, err := validateRequest(r); err != nil { - http.Error(w, err.Error(), code) - return - } - // All checks passed, create a codec that reads direct from the request body - // untilEOF and writes the response to w and order the server to process a - // single request. - ctx := r.Context() - ctx = context.WithValue(ctx, interface{}("remote"), r.RemoteAddr) - ctx = context.WithValue(ctx, interface{}("scheme"), r.Proto) - ctx = context.WithValue(ctx, interface{}("local"), r.Host) - if ua := r.Header.Get("User-Agent"); ua != "" { - ctx = context.WithValue(ctx, interface{}("User-Agent"), ua) - } - if origin := r.Header.Get("Origin"); origin != "" { - ctx = context.WithValue(ctx, interface{}("Origin"), origin) - } - - w.Header().Set("content-type", contentType) - codec := newHTTPServerConn(r, w) - defer codec.Close() - s.serveSingleRequest(ctx, codec) -} - -// validateRequest returns a non-zero response code and error message if the -// request is invalid. -func validateRequest(r *http.Request) (int, error) { - if r.Method == http.MethodPut || r.Method == http.MethodDelete { - return http.StatusMethodNotAllowed, errors.New("method not allowed") - } - if r.ContentLength > maxRequestContentLength { - err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength) - return http.StatusRequestEntityTooLarge, err - } - // Allow OPTIONS (regardless of content-type) - if r.Method == http.MethodOptions { - return 0, nil - } - // Check content-type - if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil { - for _, accepted := range acceptedContentTypes { - if accepted == mt { - return 0, nil - } - } - } - // Invalid content-type - err := fmt.Errorf("invalid content type, only %s is supported", contentType) - return http.StatusUnsupportedMediaType, err -} - -// func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler { -// // disable CORS support if user has not specified a custom CORS configuration -// if len(allowedOrigins) == 0 { -// return srv -// } -// c := cors.New(cors.Options{ -// AllowedOrigins: allowedOrigins, -// AllowedMethods: []string{http.MethodPost, http.MethodGet}, -// MaxAge: 600, -// AllowedHeaders: []string{"*"}, -// }) -// return c.Handler(srv) -// } - -// virtualHostHandler is a handler which validates the Host-header of incoming requests. -// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers, -// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header -// which domain was used, and validate that against a whitelist. -type virtualHostHandler struct { - vhosts map[string]struct{} - next http.Handler -} - -// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler -func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // if r.Host is not set, we can continue serving since a browser would set the Host header - if r.Host == "" { - h.next.ServeHTTP(w, r) - return - } - host, _, err := net.SplitHostPort(r.Host) - if err != nil { - // Either invalid (too many colons) or no port specified - host = r.Host - } - if ipAddr := net.ParseIP(host); ipAddr != nil { - // It's an IP address, we can serve that - h.next.ServeHTTP(w, r) - return - - } - // Not an ip address, but a hostname. Need to validate - if _, exist := h.vhosts["*"]; exist { - h.next.ServeHTTP(w, r) - return - } - if _, exist := h.vhosts[host]; exist { - h.next.ServeHTTP(w, r) - return - } - http.Error(w, "invalid host specified", http.StatusForbidden) -} - -// func newVHostHandler(vhosts []string, next http.Handler) http.Handler { -// vhostMap := make(map[string]struct{}) -// for _, allowedHost := range vhosts { -// vhostMap[strings.ToLower(allowedHost)] = struct{}{} -// } -// return &virtualHostHandler{vhostMap, next} -// } diff --git a/rpc/ipc.go b/rpc/ipc.go deleted file mode 100644 index 9f7fee254..000000000 --- a/rpc/ipc.go +++ /dev/null @@ -1,40 +0,0 @@ -package rpc - -import ( - "context" - "net" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p/netutil" -) - -// ServeListener accepts connections on l, serving JSON-RPC on them. -func (s *Server) ServeListener(l net.Listener) error { - for { - conn, err := l.Accept() - if netutil.IsTemporaryError(err) { - log.Warn("RPC accept error", "err", err) - continue - } else if err != nil { - return err - } - log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) - go s.ServeCodec(NewJSONCodec(conn)) - } -} - -// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes -// the endpoint is the full path to a unix socket, and Windows the endpoint is an -// identifier for a named pipe. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialIPC(ctx context.Context, endpoint string) (*Client, error) { - return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { - conn, err := newIPCConnection(ctx, endpoint) - if err != nil { - return nil, err - } - return NewJSONCodec(conn), err - }) -} diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go deleted file mode 100644 index fb957c0b5..000000000 --- a/rpc/ipc_unix.go +++ /dev/null @@ -1,46 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "net" - "os" - "path/filepath" - - "github.com/ethereum/go-ethereum/log" -) - -/* -#include - -int max_socket_path_size() { -struct sockaddr_un s; -return sizeof(s.sun_path); -} -*/ -import "C" - -// ipcListen will create a Unix socket on the given endpoint. -func ipcListen(endpoint string) (net.Listener, error) { - if len(endpoint) > int(C.max_socket_path_size()) { - log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", C.max_socket_path_size()), - "endpoint", endpoint) - } - - // Ensure the IPC path exists and remove any previous leftover - if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil { - return nil, err - } - os.Remove(endpoint) - l, err := net.Listen("unix", endpoint) - if err != nil { - return nil, err - } - os.Chmod(endpoint, 0600) - return l, nil -} - -// newIPCConnection will connect to a Unix socket on the given endpoint. -func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { - return dialContext(ctx, "unix", endpoint) -} diff --git a/rpc/json.go b/rpc/json.go deleted file mode 100644 index 7cdcdbe4f..000000000 --- a/rpc/json.go +++ /dev/null @@ -1,314 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "reflect" - "strings" - "sync" - "time" -) - -const ( - vsn = "2.0" - serviceMethodSeparator = "_" - subscribeMethodSuffix = "_subscribe" - unsubscribeMethodSuffix = "_unsubscribe" - notificationMethodSuffix = "_subscription" - - defaultWriteTimeout = 10 * time.Second // used if context has no deadline -) - -var null = json.RawMessage("null") - -type subscriptionResult struct { - ID string `json:"subscription"` - Result json.RawMessage `json:"result,omitempty"` -} - -// A value of this type can a JSON-RPC request, notification, successful response or -// error response. Which one it is depends on the fields. -type jsonrpcMessage struct { - Version string `json:"jsonrpc,omitempty"` - ID json.RawMessage `json:"id,omitempty"` - Method string `json:"method,omitempty"` - Params json.RawMessage `json:"params,omitempty"` - Error *jsonError `json:"error,omitempty"` - Result json.RawMessage `json:"result,omitempty"` -} - -func (msg *jsonrpcMessage) isNotification() bool { - return msg.ID == nil && msg.Method != "" -} - -func (msg *jsonrpcMessage) isCall() bool { - return msg.hasValidID() && msg.Method != "" -} - -func (msg *jsonrpcMessage) isResponse() bool { - return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil) -} - -func (msg *jsonrpcMessage) hasValidID() bool { - return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' -} - -func (msg *jsonrpcMessage) isSubscribe() bool { - return strings.HasSuffix(msg.Method, subscribeMethodSuffix) -} - -func (msg *jsonrpcMessage) isUnsubscribe() bool { - return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix) -} - -func (msg *jsonrpcMessage) namespace() string { - elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2) - return elem[0] -} - -func (msg *jsonrpcMessage) String() string { - b, _ := json.Marshal(msg) - return string(b) -} - -func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { - resp := errorMessage(err) - resp.ID = msg.ID - return resp -} - -func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { - enc, err := json.Marshal(result) - if err != nil { - // TODO: wrap with 'internal server error' - return msg.errorResponse(err) - } - return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc} -} - -func errorMessage(err error) *jsonrpcMessage { - msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{ - Code: defaultErrorCode, - Message: err.Error(), - }} - ec, ok := err.(Error) - if ok { - msg.Error.Code = ec.ErrorCode() - } - return msg -} - -type jsonError struct { - Code int `json:"code"` - Message string `json:"message"` - Data interface{} `json:"data,omitempty"` -} - -func (err *jsonError) Error() string { - if err.Message == "" { - return fmt.Sprintf("json-rpc error %d", err.Code) - } - return err.Message -} - -func (err *jsonError) ErrorCode() int { - return err.Code -} - -// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec. -type Conn interface { - io.ReadWriteCloser - SetWriteDeadline(time.Time) error -} - -// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description -// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this -// description is used in log messages. -type ConnRemoteAddr interface { - RemoteAddr() string -} - -// connWithRemoteAddr overrides the remote address of a connection. -type connWithRemoteAddr struct { - Conn - addr string -} - -func (c connWithRemoteAddr) RemoteAddr() string { return c.addr } - -// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has -// support for parsing arguments and serializing (result) objects. -type jsonCodec struct { - remoteAddr string - closer sync.Once // close closed channel once - closed chan interface{} // closed on Close - decode func(v interface{}) error // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode func(v interface{}) error // encoder to allow multiple transports - conn Conn -} - -// NewCodec creates a new RPC server codec with support for JSON-RPC 2.0 based -// on explicitly given encoding and decoding methods. -func NewCodec(conn Conn, encode, decode func(v interface{}) error) ServerCodec { - codec := &jsonCodec{ - closed: make(chan interface{}), - encode: encode, - decode: decode, - conn: conn, - } - if ra, ok := conn.(ConnRemoteAddr); ok { - codec.remoteAddr = ra.RemoteAddr() - } - return codec -} - -// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0. -func NewJSONCodec(conn Conn) ServerCodec { - enc := json.NewEncoder(conn) - dec := json.NewDecoder(conn) - dec.UseNumber() - return NewCodec(conn, enc.Encode, dec.Decode) -} - -func (c *jsonCodec) RemoteAddr() string { - return c.remoteAddr -} - -func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) { - // Decode the next JSON object in the input stream. - // This verifies basic syntax, etc. - var rawmsg json.RawMessage - if err := c.decode(&rawmsg); err != nil { - return nil, false, err - } - msg, batch = parseMessage(rawmsg) - return msg, batch, nil -} - -// Write sends a message to client. -func (c *jsonCodec) Write(ctx context.Context, v interface{}) error { - c.encMu.Lock() - defer c.encMu.Unlock() - - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(defaultWriteTimeout) - } - c.conn.SetWriteDeadline(deadline) - return c.encode(v) -} - -// Close the underlying connection -func (c *jsonCodec) Close() { - c.closer.Do(func() { - close(c.closed) - c.conn.Close() - }) -} - -// Closed returns a channel which will be closed when Close is called -func (c *jsonCodec) Closed() <-chan interface{} { - return c.closed -} - -// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error -// checks in this function because the raw message has already been syntax-checked when it -// is called. Any non-JSON-RPC messages in the input return the zero value of -// jsonrpcMessage. -func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { - if !isBatch(raw) { - msgs := []*jsonrpcMessage{{}} - json.Unmarshal(raw, &msgs[0]) - return msgs, false - } - dec := json.NewDecoder(bytes.NewReader(raw)) - dec.Token() // skip '[' - var msgs []*jsonrpcMessage - for dec.More() { - msgs = append(msgs, new(jsonrpcMessage)) - dec.Decode(&msgs[len(msgs)-1]) - } - return msgs, true -} - -// isBatch returns true when the first non-whitespace characters is '[' -func isBatch(raw json.RawMessage) bool { - for _, c := range raw { - // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) - if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { - continue - } - return c == '[' - } - return false -} - -// parsePositionalArguments tries to parse the given args to an array of values with the -// given types. It returns the parsed values or an error when the args could not be -// parsed. Missing optional arguments are returned as reflect.Zero values. -func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) { - dec := json.NewDecoder(bytes.NewReader(rawArgs)) - var args []reflect.Value - tok, err := dec.Token() - switch { - case err == io.EOF || tok == nil && err == nil: - // "params" is optional and may be empty. Also allow "params":null even though it's - // not in the spec because our own client used to send it. - case err != nil: - return nil, err - case tok == json.Delim('['): - // Read argument array. - if args, err = parseArgumentArray(dec, types); err != nil { - return nil, err - } - default: - return nil, errors.New("non-array args") - } - // Set any missing args to nil. - for i := len(args); i < len(types); i++ { - if types[i].Kind() != reflect.Ptr { - return nil, fmt.Errorf("missing value for required argument %d", i) - } - args = append(args, reflect.Zero(types[i])) - } - return args, nil -} - -func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) { - args := make([]reflect.Value, 0, len(types)) - for i := 0; dec.More(); i++ { - if i >= len(types) { - return args, fmt.Errorf("too many arguments, want at most %d", len(types)) - } - argval := reflect.New(types[i]) - if err := dec.Decode(argval.Interface()); err != nil { - return args, fmt.Errorf("invalid argument %d: %v", i, err) - } - if argval.IsNil() && types[i].Kind() != reflect.Ptr { - return args, fmt.Errorf("missing value for required argument %d", i) - } - args = append(args, argval.Elem()) - } - // Read end of args array. - _, err := dec.Token() - return args, err -} - -// parseSubscriptionName extracts the subscription name from an encoded argument array. -func parseSubscriptionName(rawArgs json.RawMessage) (string, error) { - dec := json.NewDecoder(bytes.NewReader(rawArgs)) - if tok, _ := dec.Token(); tok != json.Delim('[') { - return "", errors.New("non-array args") - } - v, _ := dec.Token() - method, ok := v.(string) - if !ok { - return "", errors.New("expected subscription name as first argument") - } - return method, nil -} diff --git a/rpc/server.go b/rpc/server.go deleted file mode 100644 index 416aff8e6..000000000 --- a/rpc/server.go +++ /dev/null @@ -1,116 +0,0 @@ -package rpc - -import ( - "context" - "io" - "sync/atomic" - - mapset "github.com/deckarep/golang-set" - "github.com/ethereum/go-ethereum/log" -) - -const metadataAPI = "rpc" - -// Server is an RPC server. -type Server struct { - services serviceRegistry - idgen func() ID - run int32 - codecs mapset.Set -} - -// NewServer creates a new server instance with no registered handlers. -func NewServer() *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} - // Register the default service providing meta information about the RPC service such - // as the services and methods it offers. - rpcService := &Service{server} - server.RegisterName(metadataAPI, rpcService) - return server -} - -// RegisterName creates a service for the given receiver type under the given name. When no -// methods on the given receiver match the criteria to be either a RPC method or a -// subscription an error is returned. Otherwise a new service is created and added to the -// service collection this server provides to clients. -func (s *Server) RegisterName(name string, receiver interface{}) error { - return s.services.registerName(name, receiver) -} - -// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes -// the response back using the given codec. It will block until the codec is closed or the -// server is stopped. In either case the codec is closed. -func (s *Server) ServeCodec(codec ServerCodec) { - defer codec.Close() - - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { - return - } - - // Add the codec to the set so it can be closed by Stop. - s.codecs.Add(codec) - defer s.codecs.Remove(codec) - - c := initClient(codec, s.idgen, &s.services) - <-codec.Closed() - c.Close() -} - -// serveSingleRequest reads and processes a single RPC request from the given codec. This -// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in -// this mode. -func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { - return - } - - h := newHandler(ctx, codec, s.idgen, &s.services) - h.allowSubscribe = false - defer h.close(io.EOF, nil) - - reqs, batch, err := codec.Read() - if err != nil { - if err != io.EOF { - codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"})) - } - return - } - if batch { - h.handleBatch(reqs) - } else { - h.handleMsg(reqs[0]) - } -} - -// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending -// requests to finish, then closes all codecs which will cancel pending requests and -// subscriptions. -func (s *Server) Stop() { - if atomic.CompareAndSwapInt32(&s.run, 1, 0) { - log.Debug("RPC server shutting down") - s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).Close() - return true - }) - } -} - -// Service gives meta information about the server. -// e.g. gives information about the loaded modules. -type Service struct { - server *Server -} - -// Modules returns the list of RPC services with their version number -func (s *Service) Modules() map[string]string { - s.server.services.mu.Lock() - defer s.server.services.mu.Unlock() - - modules := make(map[string]string) - for name := range s.server.services.services { - modules[name] = "1.0" - } - return modules -} diff --git a/rpc/service.go b/rpc/service.go deleted file mode 100644 index 34278f3e4..000000000 --- a/rpc/service.go +++ /dev/null @@ -1,269 +0,0 @@ -package rpc - -import ( - "context" - "errors" - "fmt" - "reflect" - "runtime" - "strings" - "sync" - "unicode" - "unicode/utf8" - - "github.com/ethereum/go-ethereum/log" -) - -var ( - contextType = reflect.TypeOf((*context.Context)(nil)).Elem() - errorType = reflect.TypeOf((*error)(nil)).Elem() - subscriptionType = reflect.TypeOf(Subscription{}) - stringType = reflect.TypeOf("") -) - -type serviceRegistry struct { - mu sync.Mutex - services map[string]service -} - -// service represents a registered object. -type service struct { - name string // name for service - callbacks map[string]*callback // registered handlers - subscriptions map[string]*callback // available subscriptions/notifications -} - -// callback is a method callback which was registered in the server -type callback struct { - fn reflect.Value // the function - rcvr reflect.Value // receiver object of method, set if fn is method - argTypes []reflect.Type // input argument types - hasCtx bool // method's first argument is a context (not included in argTypes) - errPos int // err return idx, of -1 when method cannot return error - isSubscribe bool // true if this is a subscription callback -} - -func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { - rcvrVal := reflect.ValueOf(rcvr) - if name == "" { - return fmt.Errorf("no service name for type %s", rcvrVal.Type().String()) - } - callbacks := suitableCallbacks(rcvrVal) - if len(callbacks) == 0 { - return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr) - } - - r.mu.Lock() - defer r.mu.Unlock() - if r.services == nil { - r.services = make(map[string]service) - } - svc, ok := r.services[name] - if !ok { - svc = service{ - name: name, - callbacks: make(map[string]*callback), - subscriptions: make(map[string]*callback), - } - r.services[name] = svc - } - for name, cb := range callbacks { - if cb.isSubscribe { - svc.subscriptions[name] = cb - } else { - svc.callbacks[name] = cb - } - } - return nil -} - -// callback returns the callback corresponding to the given RPC method name. -func (r *serviceRegistry) callback(method string) *callback { - elem := strings.SplitN(method, serviceMethodSeparator, 2) - if len(elem) != 2 { - return nil - } - r.mu.Lock() - defer r.mu.Unlock() - return r.services[elem[0]].callbacks[elem[1]] -} - -// subscription returns a subscription callback in the given service. -func (r *serviceRegistry) subscription(service, name string) *callback { - r.mu.Lock() - defer r.mu.Unlock() - return r.services[service].subscriptions[name] -} - -// suitableCallbacks iterates over the methods of the given type. It determines if a method -// satisfies the criteria for a RPC callback or a subscription callback and adds it to the -// collection of callbacks. See server documentation for a summary of these criteria. -func suitableCallbacks(receiver reflect.Value) map[string]*callback { - typ := receiver.Type() - callbacks := make(map[string]*callback) - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - if method.PkgPath != "" { - continue // method not exported - } - cb := newCallback(receiver, method.Func) - if cb == nil { - continue // function invalid - } - name := formatName(method.Name) - callbacks[name] = cb - } - return callbacks -} - -// newCallback turns fn (a function) into a callback object. It returns nil if the function -// is unsuitable as an RPC callback. -func newCallback(receiver, fn reflect.Value) *callback { - fntype := fn.Type() - c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)} - // Determine parameter types. They must all be exported or builtin types. - c.makeArgTypes() - if !allExportedOrBuiltin(c.argTypes) { - return nil - } - // Verify return types. The function must return at most one error - // and/or one other non-error value. - outs := make([]reflect.Type, fntype.NumOut()) - for i := 0; i < fntype.NumOut(); i++ { - outs[i] = fntype.Out(i) - } - if len(outs) > 2 || !allExportedOrBuiltin(outs) { - return nil - } - // If an error is returned, it must be the last returned value. - switch { - case len(outs) == 1 && isErrorType(outs[0]): - c.errPos = 0 - case len(outs) == 2: - if isErrorType(outs[0]) || !isErrorType(outs[1]) { - return nil - } - c.errPos = 1 - } - return c -} - -// makeArgTypes composes the argTypes list. -func (c *callback) makeArgTypes() { - fntype := c.fn.Type() - // Skip receiver and context.Context parameter (if present). - firstArg := 0 - if c.rcvr.IsValid() { - firstArg++ - } - if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType { - c.hasCtx = true - firstArg++ - } - // Add all remaining parameters. - c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg) - for i := firstArg; i < fntype.NumIn(); i++ { - c.argTypes[i-firstArg] = fntype.In(i) - } -} - -// call invokes the callback. -func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) { - // Create the argument slice. - fullargs := make([]reflect.Value, 0, 2+len(args)) - if c.rcvr.IsValid() { - fullargs = append(fullargs, c.rcvr) - } - if c.hasCtx { - fullargs = append(fullargs, reflect.ValueOf(ctx)) - } - fullargs = append(fullargs, args...) - - // Catch panic while running the callback. - defer func() { - if err := recover(); err != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf)) - errRes = errors.New("method handler crashed") - } - }() - // Run the callback. - results := c.fn.Call(fullargs) - if len(results) == 0 { - return nil, nil - } - if c.errPos >= 0 && !results[c.errPos].IsNil() { - // Method has returned non-nil error value. - err := results[c.errPos].Interface().(error) - return reflect.Value{}, err - } - return results[0].Interface(), nil -} - -// Is this an exported - upper case - name? -func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) -} - -// Are all those types exported or built-in? -func allExportedOrBuiltin(types []reflect.Type) bool { - for _, typ := range types { - for typ.Kind() == reflect.Ptr { - typ = typ.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - if !isExported(typ.Name()) && typ.PkgPath() != "" { - return false - } - } - return true -} - -// Is t context.Context or *context.Context? -func isContextType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t == contextType -} - -// Does t satisfy the error interface? -func isErrorType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t.Implements(errorType) -} - -// Is t Subscription or *Subscription? -func isSubscriptionType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t == subscriptionType -} - -// isPubSub tests whether the given method has as as first argument a context.Context and -// returns the pair (Subscription, error). -func isPubSub(methodType reflect.Type) bool { - // numIn(0) is the receiver type - if methodType.NumIn() < 2 || methodType.NumOut() != 2 { - return false - } - return isContextType(methodType.In(1)) && - isSubscriptionType(methodType.Out(0)) && - isErrorType(methodType.Out(1)) -} - -// formatName converts to first character of name to lowercase. -func formatName(name string) string { - ret := []rune(name) - if len(ret) > 0 { - ret[0] = unicode.ToLower(ret[0]) - } - return string(ret) -} diff --git a/rpc/stdio.go b/rpc/stdio.go deleted file mode 100644 index c7497e831..000000000 --- a/rpc/stdio.go +++ /dev/null @@ -1,50 +0,0 @@ -package rpc - -import ( - "context" - "errors" - "io" - "net" - "os" - "time" -) - -// DialStdIO creates a client on stdin/stdout. -func DialStdIO(ctx context.Context) (*Client, error) { - return DialIO(ctx, os.Stdin, os.Stdout) -} - -// DialIO creates a client which uses the given IO channels -func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) { - return newClient(ctx, func(_ context.Context) (ServerCodec, error) { - return NewJSONCodec(stdioConn{ - in: in, - out: out, - }), nil - }) -} - -type stdioConn struct { - in io.Reader - out io.Writer -} - -func (io stdioConn) Read(b []byte) (n int, err error) { - return io.in.Read(b) -} - -func (io stdioConn) Write(b []byte) (n int, err error) { - return io.out.Write(b) -} - -func (io stdioConn) Close() error { - return nil -} - -func (io stdioConn) RemoteAddr() string { - return "/dev/stdin" -} - -func (io stdioConn) SetWriteDeadline(t time.Time) error { - return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} -} diff --git a/rpc/subscription.go b/rpc/subscription.go deleted file mode 100644 index a1e0a9435..000000000 --- a/rpc/subscription.go +++ /dev/null @@ -1,311 +0,0 @@ -package rpc - -import ( - "bufio" - "container/list" - "context" - crand "crypto/rand" - "encoding/binary" - "encoding/hex" - "encoding/json" - "errors" - "math/rand" - "reflect" - "strings" - "sync" - "time" -) - -var ( - // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("notifications not supported") - // ErrSubscriptionNotFound is returned when the subscription for the given id is not found - ErrSubscriptionNotFound = errors.New("subscription not found") -) - -var globalGen = randomIDGenerator() - -// ID defines a pseudo random number that is used to identify RPC subscriptions. -type ID string - -// NewID returns a new, random ID. -func NewID() ID { - return globalGen() -} - -// randomIDGenerator returns a function generates a random IDs. -func randomIDGenerator() func() ID { - seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) - if err != nil { - seed = int64(time.Now().Nanosecond()) - } - var ( - mu sync.Mutex - rng = rand.New(rand.NewSource(seed)) - ) - return func() ID { - mu.Lock() - defer mu.Unlock() - id := make([]byte, 16) - rng.Read(id) - return encodeID(id) - } -} - -func encodeID(b []byte) ID { - id := hex.EncodeToString(b) - id = strings.TrimLeft(id, "0") - if id == "" { - id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0. - } - return ID("0x" + id) -} - -type notifierKey struct{} - -// NotifierFromContext returns the Notifier value stored in ctx, if any. -func NotifierFromContext(ctx context.Context) (*Notifier, bool) { - n, ok := ctx.Value(notifierKey{}).(*Notifier) - return n, ok -} - -// Notifier is tied to a RPC connection that supports subscriptions. -// Server callbacks use the notifier to send notifications. -type Notifier struct { - h *handler - namespace string - - mu sync.Mutex - sub *Subscription - buffer []json.RawMessage - callReturned bool - activated bool -} - -// CreateSubscription returns a new subscription that is coupled to the -// RPC connection. By default subscriptions are inactive and notifications -// are dropped until the subscription is marked as active. This is done -// by the RPC server after the subscription ID is send to the client. -func (n *Notifier) CreateSubscription() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - - if n.sub != nil { - panic("can't create multiple subscriptions with Notifier") - } else if n.callReturned { - panic("can't create subscription after subscribe call has returned") - } - n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)} - return n.sub -} - -// Notify sends a notification to the client with the given data as payload. -// If an error occurs the RPC connection is closed and the error is returned. -func (n *Notifier) Notify(id ID, data interface{}) error { - enc, err := json.Marshal(data) - if err != nil { - return err - } - - n.mu.Lock() - defer n.mu.Unlock() - - if n.sub == nil { - panic("can't Notify before subscription is created") - } else if n.sub.ID != id { - panic("Notify with wrong ID") - } - if n.activated { - return n.send(n.sub, enc) - } - n.buffer = append(n.buffer, enc) - return nil -} - -// Closed returns a channel that is closed when the RPC connection is closed. -// Deprecated: use subscription error channel -func (n *Notifier) Closed() <-chan interface{} { - return n.h.conn.Closed() -} - -// takeSubscription returns the subscription (if one has been created). No subscription can -// be created after this call. -func (n *Notifier) takeSubscription() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - n.callReturned = true - return n.sub -} - -// acticate is called after the subscription ID was sent to client. Notifications are -// buffered before activation. This prevents notifications being sent to the client before -// the subscription ID is sent to the client. -func (n *Notifier) activate() error { - n.mu.Lock() - defer n.mu.Unlock() - - for _, data := range n.buffer { - if err := n.send(n.sub, data); err != nil { - return err - } - } - n.activated = true - return nil -} - -func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { - params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) - ctx := context.Background() - return n.h.conn.Write(ctx, &jsonrpcMessage{ - Version: vsn, - Method: n.namespace + notificationMethodSuffix, - Params: params, - }) -} - -// A Subscription is created by a notifier and tight to that notifier. The client can use -// this subscription to wait for an unsubscribe request for the client, see Err(). -type Subscription struct { - ID ID - namespace string - err chan error // closed on unsubscribe -} - -// Err returns a channel that is closed when the client send an unsubscribe request. -func (s *Subscription) Err() <-chan error { - return s.err -} - -// MarshalJSON marshals a subscription as its ID. -func (s *Subscription) MarshalJSON() ([]byte, error) { - return json.Marshal(s.ID) -} - -// ClientSubscription is a subscription established through the Client's Subscribe or -// EthSubscribe methods. -type ClientSubscription struct { - client *Client - etype reflect.Type - channel reflect.Value - namespace string - subid string - in chan json.RawMessage - - quitOnce sync.Once // ensures quit is closed once - quit chan struct{} // quit is closed when the subscription exits - errOnce sync.Once // ensures err is closed once - err chan error -} - -func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { - sub := &ClientSubscription{ - client: c, - namespace: namespace, - etype: channel.Type().Elem(), - channel: channel, - quit: make(chan struct{}), - err: make(chan error, 1), - in: make(chan json.RawMessage), - } - return sub -} - -// Err returns the subscription error channel. The intended use of Err is to schedule -// resubscription when the client connection is closed unexpectedly. -// -// The error channel receives a value when the subscription has ended due -// to an error. The received error is nil if Close has been called -// on the underlying client and no other error has occurred. -// -// The error channel is closed when Unsubscribe is called on the subscription. -func (sub *ClientSubscription) Err() <-chan error { - return sub.err -} - -// Unsubscribe unsubscribes the notification and closes the error channel. -// It can safely be called more than once. -func (sub *ClientSubscription) Unsubscribe() { - sub.quitWithError(true, nil) - sub.errOnce.Do(func() { close(sub.err) }) -} - -func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { - sub.quitOnce.Do(func() { - // The dispatch loop won't be able to execute the unsubscribe call - // if it is blocked on deliver. Close sub.quit first because it - // unblocks deliver. - close(sub.quit) - if unsubscribeServer { - sub.requestUnsubscribe() - } - if err != nil { - if err == errClientQuit { - err = nil // Adhere to subscription semantics. - } - sub.err <- err - } - }) -} - -func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { - select { - case sub.in <- result: - return true - case <-sub.quit: - return false - } -} - -func (sub *ClientSubscription) start() { - sub.quitWithError(sub.forward()) -} - -func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { - cases := []reflect.SelectCase{ - {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, - {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, - {Dir: reflect.SelectSend, Chan: sub.channel}, - } - buffer := list.New() - defer buffer.Init() - for { - var chosen int - var recv reflect.Value - if buffer.Len() == 0 { - // Idle, omit send case. - chosen, recv, _ = reflect.Select(cases[:2]) - } else { - // Non-empty buffer, send the first queued item. - cases[2].Send = reflect.ValueOf(buffer.Front().Value) - chosen, recv, _ = reflect.Select(cases) - } - - switch chosen { - case 0: // <-sub.quit - return false, nil - case 1: // <-sub.in - val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) - if err != nil { - return true, err - } - if buffer.Len() == maxClientSubscriptionBuffer { - return true, errSubscriptionQueueOverflow - } - buffer.PushBack(val) - case 2: // sub.channel<- - cases[2].Send = reflect.Value{} // Don't hold onto the value. - buffer.Remove(buffer.Front()) - } - } -} - -func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { - val := reflect.New(sub.etype) - err := json.Unmarshal(result, val.Interface()) - return val.Elem().Interface(), err -} - -func (sub *ClientSubscription) requestUnsubscribe() error { - var result interface{} - return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) -} diff --git a/rpc/types.go b/rpc/types.go deleted file mode 100644 index 1b0025fdf..000000000 --- a/rpc/types.go +++ /dev/null @@ -1,94 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "math" - "strings" - - "github.com/ethereum/go-ethereum/common/hexutil" -) - -// API describes the set of methods offered over the RPC interface -type API struct { - Namespace string // namespace under which the rpc methods of Service are exposed - Version string // api version for DApp's - Service interface{} // receiver instance which holds the methods - Public bool // indication if the methods must be considered safe for public use -} - -// Error wraps RPC errors, which contain an error code in addition to the message. -type Error interface { - Error() string // returns the message - ErrorCode() int // returns the code -} - -// ServerCodec implements reading, parsing and writing RPC messages for the server side of -// a RPC session. Implementations must be go-routine safe since the codec can be called in -// multiple go-routines concurrently. -type ServerCodec interface { - Read() (msgs []*jsonrpcMessage, isBatch bool, err error) - Close() - jsonWriter -} - -// jsonWriter can write JSON messages to its underlying connection. -// Implementations must be safe for concurrent use. -type jsonWriter interface { - Write(context.Context, interface{}) error - // Closed returns a channel which is closed when the connection is closed. - Closed() <-chan interface{} - // RemoteAddr returns the peer address of the connection. - RemoteAddr() string -} - -// BlockNumber ... -type BlockNumber int64 - -const ( - // PendingBlockNumber ... - PendingBlockNumber = BlockNumber(-2) - latestBlockNumber = BlockNumber(-1) - earliestBlockNumber = BlockNumber(0) -) - -// UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports: -// - "latest", "earliest" or "pending" as string arguments -// - the block number -// Returned errors: -// - an invalid block number error when the given argument isn't a known strings -// - an out of range error when the given block number is either too little or too large -func (bn *BlockNumber) UnmarshalJSON(data []byte) error { - input := strings.TrimSpace(string(data)) - if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { - input = input[1 : len(input)-1] - } - - switch input { - case "earliest": - *bn = earliestBlockNumber - return nil - case "latest": - *bn = latestBlockNumber - return nil - case "pending": - *bn = PendingBlockNumber - return nil - } - - blckNum, err := hexutil.DecodeUint64(input) - if err != nil { - return err - } - if blckNum > math.MaxInt64 { - return fmt.Errorf("Blocknumber too high") - } - - *bn = BlockNumber(blckNum) - return nil -} - -// Int64 turns blockNumber to int64 -func (bn BlockNumber) Int64() int64 { - return (int64)(bn) -} diff --git a/rpc/websocket.go b/rpc/websocket.go deleted file mode 100644 index f0a820137..000000000 --- a/rpc/websocket.go +++ /dev/null @@ -1,222 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "net" - "net/http" - "net/url" - "os" - "strings" - "time" - - mapset "github.com/deckarep/golang-set" - "github.com/ethereum/go-ethereum/log" - "golang.org/x/net/websocket" -) - -// websocketJSONCodec is a custom JSON codec with payload size enforcement and -// special number parsing. -var websocketJSONCodec = websocket.Codec{ - // Marshal is the stock JSON marshaller used by the websocket library too. - Marshal: func(v interface{}) ([]byte, byte, error) { - msg, err := json.Marshal(v) - return msg, websocket.TextFrame, err - }, - // Unmarshal is a specialized unmarshaller to properly convert numbers. - Unmarshal: func(msg []byte, payloadType byte, v interface{}) error { - dec := json.NewDecoder(bytes.NewReader(msg)) - dec.UseNumber() - - return dec.Decode(v) - }, -} - -// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. -// -// allowedOrigins should be a comma-separated list of allowed origin URLs. -// To allow connections with any origin, pass "*". -func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { - return websocket.Server{ - Handshake: wsHandshakeValidator(allowedOrigins), - Handler: func(conn *websocket.Conn) { - codec := newWebsocketCodec(conn) - s.ServeCodec(codec) - }, - } -} - -func newWebsocketCodec(conn *websocket.Conn) ServerCodec { - // Create a custom encode/decode pair to enforce payload size and number encoding - conn.MaxPayloadBytes = maxRequestContentLength - encoder := func(v interface{}) error { - return websocketJSONCodec.Send(conn, v) - } - decoder := func(v interface{}) error { - return websocketJSONCodec.Receive(conn, v) - } - rpcconn := Conn(conn) - if conn.IsServerConn() { - // Override remote address with the actual socket address because - // package websocket crashes if there is no request origin. - addr := conn.Request().RemoteAddr - if wsaddr := conn.RemoteAddr().(*websocket.Addr); wsaddr.URL != nil { - // Add origin if present. - addr += "(" + wsaddr.URL.String() + ")" - } - rpcconn = connWithRemoteAddr{conn, addr} - } - return NewCodec(rpcconn, encoder, decoder) -} - -// NewWSServer creates a new websocket RPC server around an API provider. -// -// Deprecated: use Server.WebsocketHandler -func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { - return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} -} - -// wsHandshakeValidator returns a handler that verifies the origin during the -// websocket upgrade process. When a '*' is specified as an allowed origins all -// connections are accepted. -func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error { - origins := mapset.NewSet() - allowAllOrigins := false - - for _, origin := range allowedOrigins { - if origin == "*" { - allowAllOrigins = true - } - if origin != "" { - origins.Add(strings.ToLower(origin)) - } - } - - // allow localhost if no allowedOrigins are specified. - if len(origins.ToSlice()) == 0 { - origins.Add("http://localhost") - if hostname, err := os.Hostname(); err == nil { - origins.Add("http://" + strings.ToLower(hostname)) - } - } - - log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice())) - - f := func(cfg *websocket.Config, req *http.Request) error { - // Skip origin verification if no Origin header is present. The origin check - // is supposed to protect against browser based attacks. Browsers always set - // Origin. Non-browser software can put anything in origin and checking it doesn't - // provide additional security. - if _, ok := req.Header["Origin"]; !ok { - return nil - } - // Verify origin against whitelist. - origin := strings.ToLower(req.Header.Get("Origin")) - if allowAllOrigins || origins.Contains(origin) { - return nil - } - log.Warn("Rejected WebSocket connection", "origin", origin) - return errors.New("origin not allowed") - } - - return f -} - -func wsGetConfig(endpoint, origin string) (*websocket.Config, error) { - if origin == "" { - var err error - if origin, err = os.Hostname(); err != nil { - return nil, err - } - if strings.HasPrefix(endpoint, "wss") { - origin = "https://" + strings.ToLower(origin) - } else { - origin = "http://" + strings.ToLower(origin) - } - } - config, err := websocket.NewConfig(endpoint, origin) - if err != nil { - return nil, err - } - - if config.Location.User != nil { - b64auth := base64.StdEncoding.EncodeToString([]byte(config.Location.User.String())) - config.Header.Add("Authorization", "Basic "+b64auth) - config.Location.User = nil - } - return config, nil -} - -// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server -// that is listening on the given endpoint. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { - config, err := wsGetConfig(endpoint, origin) - if err != nil { - return nil, err - } - - return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { - conn, err := wsDialContext(ctx, config) - if err != nil { - return nil, err - } - return newWebsocketCodec(conn), nil - }) -} - -func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) { - var conn net.Conn - var err error - switch config.Location.Scheme { - case "ws": - conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location)) - case "wss": - dialer := contextDialer(ctx) - conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig) - default: - err = websocket.ErrBadScheme - } - if err != nil { - return nil, err - } - ws, err := websocket.NewClient(config, conn) - if err != nil { - conn.Close() - return nil, err - } - return ws, err -} - -var wsPortMap = map[string]string{"ws": "80", "wss": "443"} - -func wsDialAddress(location *url.URL) string { - if _, ok := wsPortMap[location.Scheme]; ok { - if _, _, err := net.SplitHostPort(location.Host); err != nil { - return net.JoinHostPort(location.Host, wsPortMap[location.Scheme]) - } - } - return location.Host -} - -func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { - d := &net.Dialer{KeepAlive: tcpKeepAliveInterval} - return d.DialContext(ctx, network, addr) -} - -func contextDialer(ctx context.Context) *net.Dialer { - dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} - if deadline, ok := ctx.Deadline(); ok { - dialer.Deadline = deadline - } else { - dialer.Deadline = time.Now().Add(defaultDialTimeout) - } - return dialer -}