diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index e2bc83959..14ef89238 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -1,6 +1,7 @@ package blockproposal import ( + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" ) @@ -54,3 +55,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/clientsupport/service.go b/api/service/clientsupport/service.go index 3cbdaa020..7913db674 100644 --- a/api/service/clientsupport/service.go +++ b/api/service/clientsupport/service.go @@ -4,6 +4,7 @@ import ( "strconv" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" clientService "github.com/harmony-one/harmony/api/client/service" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/state" @@ -55,3 +56,8 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { func (s *Service) NotifyService(params map[string]interface{}) { return } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/consensus/service.go b/api/service/consensus/service.go index 139ed55d3..9211f6bbc 100644 --- a/api/service/consensus/service.go +++ b/api/service/consensus/service.go @@ -1,6 +1,7 @@ package consensus import ( + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" @@ -52,3 +53,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index b354169bf..204c19632 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -3,6 +3,7 @@ package discovery import ( "time" + "github.com/ethereum/go-ethereum/rpc" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/service" @@ -147,3 +148,8 @@ func (s *Service) Init() { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index ae615c9f1..754ba96ab 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/mux" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/types" @@ -302,3 +303,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/manager.go b/api/service/manager.go index aacae3521..403b4863a 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -3,6 +3,7 @@ package service import ( "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" ) @@ -34,7 +35,6 @@ const ( PeerDiscovery Resharding Staking - RPC Test Done ) @@ -65,8 +65,6 @@ func (t Type) String() string { return "PeerDiscovery" case Resharding: return "Resharding" - case RPC: - return "RPC" case Test: return "Test" case Done: @@ -95,6 +93,9 @@ type Interface interface { SetMessageChan(msgChan chan *msg_pb.Message) StopService() NotifyService(map[string]interface{}) + + // APIs retrieves the list of RPC descriptors the service provides + APIs() []rpc.API } // Manager stores all services for service manager. diff --git a/api/service/manager_test.go b/api/service/manager_test.go index 346181756..0e147f78e 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/p2p" @@ -40,6 +41,8 @@ func (s *SupportSyncingTest) SetMessageChan(msgChan chan *msg_pb.Message) { s.msgChan = msgChan } +func (s *SupportSyncingTest) APIs() []rpc.API { return nil } + // Test TakeAction. func TestTakeAction(t *testing.T) { m := &Manager{} diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 069f2bac5..7145399a5 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" @@ -226,3 +227,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/newclientsupport/service.go b/api/service/newclientsupport/service.go index ffff8a4db..a0858e628 100644 --- a/api/service/newclientsupport/service.go +++ b/api/service/newclientsupport/service.go @@ -3,6 +3,7 @@ package newclientsupport import ( "math/big" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" ) @@ -42,3 +43,8 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { func (s *Service) NotifyService(params map[string]interface{}) { return } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/randomness/service.go b/api/service/randomness/service.go index a32bdc926..8fb11513b 100644 --- a/api/service/randomness/service.go +++ b/api/service/randomness/service.go @@ -1,6 +1,7 @@ package randomness import ( + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/internal/utils" @@ -43,3 +44,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/resharding/service.go b/api/service/resharding/service.go index 9377ecf88..2cb04571d 100644 --- a/api/service/resharding/service.go +++ b/api/service/resharding/service.go @@ -3,6 +3,7 @@ package resharding import ( "time" + "github.com/ethereum/go-ethereum/rpc" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" @@ -89,3 +90,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/restclientsupport/service.go b/api/service/restclientsupport/service.go index 1f98095ff..0ca7affcc 100644 --- a/api/service/restclientsupport/service.go +++ b/api/service/restclientsupport/service.go @@ -11,6 +11,7 @@ import ( "strconv" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" "github.com/gorilla/mux" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" @@ -259,3 +260,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/api/service/rpc/service.go b/api/service/rpc/service.go deleted file mode 100644 index e3cf6def4..000000000 --- a/api/service/rpc/service.go +++ /dev/null @@ -1,128 +0,0 @@ -package rpcservice - -import ( - "context" - "fmt" - "net" - "net/http" - "strconv" - - msg_pb "github.com/harmony-one/harmony/api/proto/message" - "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/rpc" - "github.com/harmony-one/harmony/rpc/hmyapi" -) - -const ( - rpcPortDiff = 123 -) - -// Service is the struct for rpc service. -type Service struct { - messageChan chan *msg_pb.Message - server *http.Server - // Util - peer *p2p.Peer - blockchain *core.BlockChain - txPool *core.TxPool - // HTTP RPC - rpcAPIs []rpc.API // List of APIs currently provided by the node - httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) - httpWhitelist []string // HTTP RPC modules to allow through this endpoint - httpListener net.Listener // HTTP RPC listener socket to server API requests - httpHandler *rpc.Server // HTTP RPC request handler to process the API requests -} - -// New returns RPC service. -func New(b *core.BlockChain, p *p2p.Peer, t *core.TxPool) *Service { - return &Service{ - blockchain: b, - peer: p, - txPool: t, - } -} - -// StartService starts RPC service. -func (s *Service) StartService() { - utils.GetLogInstance().Info("Starting RPC service.") - if err := s.startRPC(); err != nil { - utils.GetLogInstance().Error("Failed to start RPC service.") - } else { - utils.GetLogInstance().Info("Started RPC service.") - } -} - -// StopService shutdowns RPC service. -func (s *Service) StopService() { - utils.GetLogInstance().Info("Shutting down RPC service.") - if err := s.server.Shutdown(context.Background()); err != nil { - utils.GetLogInstance().Error("Error when shutting down RPC server", "error", err) - } else { - utils.GetLogInstance().Error("Shutting down RPC server successufully") - } -} - -// NotifyService notify service -func (s *Service) NotifyService(params map[string]interface{}) { - return -} - -// SetMessageChan sets up message channel to service. -func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { - s.messageChan = messageChan -} - -// startRPC is a helper method to start all the various RPC endpoint during node -// startup. It's not meant to be called at any time afterwards as it makes certain -// assumptions about the state of the node. -func (s *Service) startRPC() error { - apis := hmyapi.GetAPIs(rpc.NewBackend(s.blockchain, s.txPool)) - - port, _ := strconv.Atoi(s.peer.Port) - s.httpEndpoint = fmt.Sprintf("localhost:%v", port+rpcPortDiff) - if err := s.startHTTP(s.httpEndpoint, apis); err != nil { - utils.GetLogInstance().Debug("Failed to start RPC HTTP") - return err - } - utils.GetLogInstance().Debug("Started RPC HTTP") - - // All API endpoints started successfully - s.rpcAPIs = apis - return nil -} - -// startHTTP initializes and starts the HTTP RPC endpoint. -func (s *Service) startHTTP(endpoint string, apis []rpc.API) error { - utils.GetLogInstance().Debug("rpc startHTTP", "endpoint", endpoint, "apis", apis) - // Short circuit if the HTTP endpoint isn't being exposed - if endpoint == "" { - return nil - } - listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis) - if err != nil { - return err - } - utils.GetLogInstance().Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint)) - // All listeners booted successfully - s.httpEndpoint = endpoint - s.httpListener = listener - s.httpHandler = handler - - return nil -} - -// stopHTTP terminates the HTTP RPC endpoint. -func (s *Service) stopHTTP() { - if s.httpListener != nil { - s.httpListener.Close() - s.httpListener = nil - - utils.GetLogInstance().Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", s.httpEndpoint)) - } - if s.httpHandler != nil { - s.httpHandler.Stop() - s.httpHandler = nil - } -} diff --git a/api/service/staking/service.go b/api/service/staking/service.go index 896fe1ad6..a3342182a 100644 --- a/api/service/staking/service.go +++ b/api/service/staking/service.go @@ -16,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" protobuf "github.com/golang/protobuf/proto" proto "github.com/harmony-one/harmony/api/client/service/proto" proto_common "github.com/harmony-one/harmony/api/proto" @@ -250,3 +251,8 @@ func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { s.messageChan = messageChan } + +// APIs for the services. +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index c62a72778..f3280f86e 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -354,6 +354,7 @@ func main() { go currentNode.SupportSyncing() currentNode.ServiceManagerSetup() + currentNode.StartRPC(*port) currentNode.RunServices() currentNode.StartServer() } diff --git a/rpc/backend.go b/core/api_backend.go similarity index 78% rename from rpc/backend.go rename to core/api_backend.go index a90a0eed8..badcad21c 100644 --- a/rpc/backend.go +++ b/core/api_backend.go @@ -1,4 +1,4 @@ -package rpc +package core import ( "context" @@ -6,25 +6,25 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" - "github.com/harmony-one/harmony/core" + "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" ) // HmyAPIBackend ... type HmyAPIBackend struct { - blockchain *core.BlockChain - txPool *core.TxPool + blockchain *BlockChain + txPool *TxPool } // NewBackend ... -func NewBackend(blockchain *core.BlockChain, txPool *core.TxPool) *HmyAPIBackend { +func NewBackend(blockchain *BlockChain, txPool *TxPool) *HmyAPIBackend { return &HmyAPIBackend{blockchain, txPool} } // ChainDb ... func (b *HmyAPIBackend) ChainDb() ethdb.Database { - return b.blockchain.ChainDb() + return b.blockchain.db } // GetBlock ... @@ -38,22 +38,22 @@ func (b *HmyAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction } // BlockByNumber ... -func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr BlockNumber) (*types.Block, error) { +func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { // Pending block is only known by the miner - if blockNr == PendingBlockNumber { + if blockNr == rpc.PendingBlockNumber { return nil, errors.New("not implemented") } // Otherwise resolve and return the block - if blockNr == latestBlockNumber { + if blockNr == rpc.LatestBlockNumber { return b.blockchain.CurrentBlock(), nil } return b.blockchain.GetBlockByNumber(uint64(blockNr)), nil } // StateAndHeaderByNumber ... -func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr BlockNumber) (*state.DB, *types.Header, error) { +func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) { // Pending state is only known by the miner - if blockNr == PendingBlockNumber { + if blockNr == rpc.PendingBlockNumber { return nil, nil, errors.New("not implemented") } // Otherwise resolve the block number and return its state @@ -66,13 +66,13 @@ func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr Bloc } // HeaderByNumber ... -func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr BlockNumber) (*types.Header, error) { +func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { // Pending block is only known by the miner - if blockNr == PendingBlockNumber { + if blockNr == rpc.PendingBlockNumber { return nil, errors.New("not implemented") } // Otherwise resolve and return the block - if blockNr == latestBlockNumber { + if blockNr == rpc.LatestBlockNumber { return b.blockchain.CurrentBlock().Header(), nil } return b.blockchain.GetHeaderByNumber(uint64(blockNr)), nil diff --git a/core/blockchain.go b/core/blockchain.go index b8245867f..5f1ce92cd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1744,6 +1744,3 @@ func (bc *BlockChain) StoreNewShardState(block *types.Block, stakeInfo *map[comm } return shardState } - -// ChainDb returns the database -func (bc *BlockChain) ChainDb() ethdb.Database { return bc.db } diff --git a/rpc/hmyapi/addrlock.go b/internal/hmyapi/addrlock.go similarity index 100% rename from rpc/hmyapi/addrlock.go rename to internal/hmyapi/addrlock.go diff --git a/rpc/hmyapi/backend.go b/internal/hmyapi/backend.go similarity index 64% rename from rpc/hmyapi/backend.go rename to internal/hmyapi/backend.go index bc9950c3c..ae2571a77 100644 --- a/rpc/hmyapi/backend.go +++ b/internal/hmyapi/backend.go @@ -1,22 +1,21 @@ package hmyapi import ( - "github.com/harmony-one/harmony/rpc" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" ) -const namespace = "hmy" - // GetAPIs returns all the APIs. -func GetAPIs(b *rpc.HmyAPIBackend) []rpc.API { +func GetAPIs(b *core.HmyAPIBackend) []rpc.API { nonceLock := new(AddrLocker) return []rpc.API{ { - Namespace: namespace, + Namespace: "hmy", Version: "1.0", Service: NewPublicBlockChainAPI(b), Public: true, }, { - Namespace: namespace, + Namespace: "hmy", Version: "1.0", Service: NewPublicTransactionPoolAPI(b, nonceLock), Public: true, diff --git a/rpc/hmyapi/private.go b/internal/hmyapi/private.go similarity index 100% rename from rpc/hmyapi/private.go rename to internal/hmyapi/private.go diff --git a/rpc/hmyapi/public.go b/internal/hmyapi/public.go similarity index 96% rename from rpc/hmyapi/public.go rename to internal/hmyapi/public.go index 5502222bd..b40d618ee 100644 --- a/rpc/hmyapi/public.go +++ b/internal/hmyapi/public.go @@ -6,21 +6,21 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/api/proto" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/rpc" ) // PublicBlockChainAPI provides an API to access the Harmony blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { - b *rpc.HmyAPIBackend + b *core.HmyAPIBackend } // NewPublicBlockChainAPI creates a new Harmony blockchain API. -func NewPublicBlockChainAPI(b *rpc.HmyAPIBackend) *PublicBlockChainAPI { +func NewPublicBlockChainAPI(b *core.HmyAPIBackend) *PublicBlockChainAPI { return &PublicBlockChainAPI{b} } @@ -124,12 +124,12 @@ func (s *PublicNetAPI) PeerCount() hexutil.Uint { // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - b *rpc.HmyAPIBackend + b *core.HmyAPIBackend nonceLock *AddrLocker } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. -func NewPublicTransactionPoolAPI(b *rpc.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI { +func NewPublicTransactionPoolAPI(b *core.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI { return &PublicTransactionPoolAPI{b, nonceLock} } diff --git a/rpc/hmyapi/types.go b/internal/hmyapi/types.go similarity index 100% rename from rpc/hmyapi/types.go rename to internal/hmyapi/types.go diff --git a/node/rpc.go b/node/rpc.go new file mode 100644 index 000000000..1e30f48c1 --- /dev/null +++ b/node/rpc.go @@ -0,0 +1,133 @@ +package node + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/internal/hmyapi" +) + +const ( + rpcHTTPPortOffset = 10 + rpcWSPortOffset = 20 +) + +var ( + // HTTP RPC + rpcAPIs []rpc.API + httpListener net.Listener + httpHandler *rpc.Server + + wsListener net.Listener + wsHandler *rpc.Server + + httpEndpoint = "" + wsEndpoint = "" + + httpModules = []string{"hmy"} + httpVirtualHosts = []string{"*"} + httpTimeouts = rpc.DefaultHTTPTimeouts + + wsModules = []string{"net", "web3"} + wsOrigins = []string{"*"} + + apiBackend *core.HmyAPIBackend +) + +// StartRPC start RPC service +func (node *Node) StartRPC(nodePort string) error { + // Gather all the possible APIs to surface + apiBackend = core.NewBackend(node.blockchain, node.TxPool) + + apis := hmyapi.GetAPIs(apiBackend) + for _, service := range node.serviceManager.GetServices() { + apis = append(apis, service.APIs()...) + } + + port, _ := strconv.Atoi(nodePort) + + httpEndpoint = fmt.Sprintf(":%v", port+rpcHTTPPortOffset) + if err := node.startHTTP(httpEndpoint, apis, httpModules, nil, httpVirtualHosts, httpTimeouts); err != nil { + return err + } + + wsEndpoint = fmt.Sprintf(":%v", port+rpcWSPortOffset) + if err := node.startWS(wsEndpoint, apis, wsModules, wsOrigins, true); err != nil { + node.stopHTTP() + return err + } + + rpcAPIs = apis + return nil +} + +// startHTTP initializes and starts the HTTP RPC endpoint. +func (node *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error { + // Short circuit if the HTTP endpoint isn't being exposed + if endpoint == "" { + return nil + } + + listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts) + if err != nil { + return err + } + + log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ",")) + // All listeners booted successfully + httpListener = listener + httpHandler = handler + + return nil +} + +// stopHTTP terminates the HTTP RPC endpoint. +func (node *Node) stopHTTP() { + if httpListener != nil { + httpListener.Close() + httpListener = nil + + log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", httpEndpoint)) + } + if httpHandler != nil { + httpHandler.Stop() + httpHandler = nil + } +} + +// startWS initializes and starts the websocket RPC endpoint. +func (node *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error { + // Short circuit if the WS endpoint isn't being exposed + if endpoint == "" { + return nil + } + listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll) + if err != nil { + return err + } + log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr())) + // All listeners booted successfully + wsListener = listener + wsHandler = handler + + return nil +} + +// stopWS terminates the websocket RPC endpoint. +func (node *Node) stopWS() { + if wsListener != nil { + wsListener.Close() + wsListener = nil + + log.Info("WebSocket endpoint closed", "url", fmt.Sprintf("ws://%s", wsEndpoint)) + } + if wsHandler != nil { + wsHandler.Stop() + wsHandler = nil + } +} diff --git a/node/service.go b/node/service.go deleted file mode 100644 index f97acf88d..000000000 --- a/node/service.go +++ /dev/null @@ -1,32 +0,0 @@ -package node - -import ( - "github.com/ethereum/go-ethereum/p2p" - "github.com/harmony-one/harmony/rpc" -) - -// Service is an individual protocol that can be registered into a node. -// -// Notes: -// -// • Service life-cycle management is delegated to the node. The service is allowed to -// initialize itself upon creation, but no goroutines should be spun up outside of the -// Start method. -// -// • Restart logic is not required as the node will create a fresh instance -// every time a service is started. -type Service interface { - // Protocols retrieves the P2P protocols the service wishes to start. - Protocols() []p2p.Protocol - - // APIs retrieves the list of RPC descriptors the service provides - APIs() []rpc.API - - // Start is called after all services have been constructed and the networking - // layer was also initialized to spawn any goroutines required by the service. - Start(server *p2p.Server) error - - // Stop terminates all goroutines belonging to the service, blocking until they - // are all terminated. - Stop() error -} diff --git a/node/service_setup.go b/node/service_setup.go index 6ff593cea..21c10340a 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -11,7 +11,6 @@ import ( "github.com/harmony-one/harmony/api/service/networkinfo" "github.com/harmony-one/harmony/api/service/randomness" "github.com/harmony-one/harmony/api/service/restclientsupport" - rpcservice "github.com/harmony-one/harmony/api/service/rpc" "github.com/harmony-one/harmony/api/service/staking" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -39,9 +38,6 @@ func (node *Node) setupForShardLeader() { node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) - - // Register RPC service - node.serviceManager.RegisterService(service.RPC, rpcservice.New(node.blockchain, &node.SelfPeer, node.TxPool)) } func (node *Node) setupForShardValidator() { diff --git a/rpc/README.md b/rpc/README.md deleted file mode 100644 index b789cd8a1..000000000 --- a/rpc/README.md +++ /dev/null @@ -1,20 +0,0 @@ -Files that are mainly ported over from go-ethereum (might include tslint fix): -* rpc/client.go: the rpc client. -* rpc/endpoints.go: starting HTTP/WS/IPC endpoints. -* rpc/errors.go: error code handling. -* rpc/handler.go: handling requests, running methods. -* rpc/http.go: starting HTTP service. -* rpc/ipc_unix.go: util for ipc unix -* rpc/ipc.go: util for general ipc -* rpc/json.go: json de-/serialization -* rpc/server.go: the rpc server -* rpc/stdio.go: stdio for ipc. -* rpc/subscription.go: ws subscriptions. -* rpc/types.go: type declarations -* rpc/websocket.go: ws connection. -* rpc/hmyapi/addrlock.go - -Richard's changes: -* rpc/service.go: our service (http/ws/ipc) starter/stoper -* other files under `rpc/hmyapi/`. -* other files that are not in `rpc` folder. Added some util functions. \ No newline at end of file diff --git a/rpc/client.go b/rpc/client.go deleted file mode 100644 index 7995640d6..000000000 --- a/rpc/client.go +++ /dev/null @@ -1,623 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "net/url" - "reflect" - "strconv" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/log" -) - -var ( - errClientQuit = errors.New("client is closed") - errNoResult = errors.New("no result in JSON-RPC response") - errSubscriptionQueueOverflow = errors.New("subscription queue overflow") - errClientReconnected = errors.New("client reconnected") - errDead = errors.New("connection lost") -) - -const ( - // Timeouts - tcpKeepAliveInterval = 30 * time.Second - defaultDialTimeout = 10 * time.Second // used if context has no deadline - subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls -) - -const ( - // Subscriptions are removed when the subscriber cannot keep up. - // - // This can be worked around by supplying a channel with sufficiently sized buffer, - // but this can be inconvenient and hard to explain in the docs. Another issue with - // buffered channels is that the buffer is static even though it might not be needed - // most of the time. - // - // The approach taken here is to maintain a per-subscription linked list buffer - // shrinks on demand. If the buffer reaches the size below, the subscription is - // dropped. - maxClientSubscriptionBuffer = 20000 -) - -// BatchElem is an element in a batch request. -type BatchElem struct { - Method string - Args []interface{} - // The result is unmarshaled into this field. Result must be set to a - // non-nil pointer value of the desired type, otherwise the response will be - // discarded. - Result interface{} - // Error is set if the server returns an error for this request, or if - // unmarshaling into Result fails. It is not set for I/O errors. - Error error -} - -// Client represents a connection to an RPC server. -type Client struct { - idgen func() ID // for subscriptions - isHTTP bool - services *serviceRegistry - - idCounter uint32 - - // This function, if non-nil, is called when the connection is lost. - reconnectFunc reconnectFunc - - // writeConn is used for writing to the connection on the caller's goroutine. It should - // only be accessed outside of dispatch, with the write lock held. The write lock is - // taken by sending on requestOp and released by sending on sendDone. - writeConn jsonWriter - - // for dispatch - close chan struct{} - closing chan struct{} // closed when client is quitting - didClose chan struct{} // closed when client quits - reconnected chan ServerCodec // where write/reconnect sends the new connection - readOp chan readOp // read messages - readErr chan error // errors from read - reqInit chan *requestOp // register response IDs, takes write lock - reqSent chan error // signals write completion, releases write lock - reqTimeout chan *requestOp // removes response IDs when call timeout expires -} - -type reconnectFunc func(ctx context.Context) (ServerCodec, error) - -type clientContextKey struct{} - -type clientConn struct { - codec ServerCodec - handler *handler -} - -func (c *Client) newClientConn(conn ServerCodec) *clientConn { - ctx := context.WithValue(context.Background(), clientContextKey{}, c) - handler := newHandler(ctx, conn, c.idgen, c.services) - return &clientConn{conn, handler} -} - -func (cc *clientConn) close(err error, inflightReq *requestOp) { - cc.handler.close(err, inflightReq) - cc.codec.Close() -} - -type readOp struct { - msgs []*jsonrpcMessage - batch bool -} - -type requestOp struct { - ids []json.RawMessage - err error - resp chan *jsonrpcMessage // receives up to len(ids) responses - sub *ClientSubscription // only set for EthSubscribe requests -} - -func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { - select { - case <-ctx.Done(): - // Send the timeout to dispatch so it can remove the request IDs. - select { - case c.reqTimeout <- op: - case <-c.closing: - } - return nil, ctx.Err() - case resp := <-op.resp: - return resp, op.err - } -} - -// Dial creates a new client for the given URL. -// -// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a -// file name with no URL scheme, a local socket connection is established using UNIX -// domain sockets on supported platforms and named pipes on Windows. If you want to -// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. -// -// For websocket connections, the origin is set to the local host name. -// -// The client reconnects automatically if the connection is lost. -func Dial(rawurl string) (*Client, error) { - return DialContext(context.Background(), rawurl) -} - -// DialContext creates a new RPC client, just like Dial. -// -// The context is used to cancel or time out the initial connection establishment. It does -// not affect subsequent interactions with the client. -func DialContext(ctx context.Context, rawurl string) (*Client, error) { - u, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - switch u.Scheme { - case "http", "https": - return DialHTTP(rawurl) - case "ws", "wss": - return DialWebsocket(ctx, rawurl, "") - case "stdio": - return DialStdIO(ctx) - case "": - return DialIPC(ctx, rawurl) - default: - return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) - } -} - -// ClientFromContext Client retrieves the client from the context, if any. This can be used to perform -// 'reverse calls' in a handler method. -func ClientFromContext(ctx context.Context) (*Client, bool) { - client, ok := ctx.Value(clientContextKey{}).(*Client) - return client, ok -} - -func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { - conn, err := connect(initctx) - if err != nil { - return nil, err - } - c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) - c.reconnectFunc = connect - return c, nil -} - -func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { - _, isHTTP := conn.(*httpConn) - c := &Client{ - idgen: idgen, - isHTTP: isHTTP, - services: services, - writeConn: conn, - close: make(chan struct{}), - closing: make(chan struct{}), - didClose: make(chan struct{}), - reconnected: make(chan ServerCodec), - readOp: make(chan readOp), - readErr: make(chan error), - reqInit: make(chan *requestOp), - reqSent: make(chan error, 1), - reqTimeout: make(chan *requestOp), - } - if !isHTTP { - go c.dispatch(conn) - } - return c -} - -// RegisterName creates a service for the given receiver type under the given name. When no -// methods on the given receiver match the criteria to be either a RPC method or a -// subscription an error is returned. Otherwise a new service is created and added to the -// service collection this client provides to the server. -func (c *Client) RegisterName(name string, receiver interface{}) error { - return c.services.registerName(name, receiver) -} - -func (c *Client) nextID() json.RawMessage { - id := atomic.AddUint32(&c.idCounter, 1) - return strconv.AppendUint(nil, uint64(id), 10) -} - -// SupportedModules calls the rpc_modules method, retrieving the list of -// APIs that are available on the server. -func (c *Client) SupportedModules() (map[string]string, error) { - var result map[string]string - ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) - defer cancel() - err := c.CallContext(ctx, &result, "rpc_modules") - return result, err -} - -// Close closes the client, aborting any in-flight requests. -func (c *Client) Close() { - if c.isHTTP { - return - } - select { - case c.close <- struct{}{}: - <-c.didClose - case <-c.didClose: - } -} - -// Call performs a JSON-RPC call with the given arguments and unmarshals into -// result if no error occurred. -// -// The result must be a pointer so that package json can unmarshal into it. You -// can also pass nil, in which case the result is ignored. -func (c *Client) Call(result interface{}, method string, args ...interface{}) error { - ctx := context.Background() - return c.CallContext(ctx, result, method, args...) -} - -// CallContext performs a JSON-RPC call with the given arguments. If the context is -// canceled before the call has successfully returned, CallContext returns immediately. -// -// The result must be a pointer so that package json can unmarshal into it. You -// can also pass nil, in which case the result is ignored. -func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - msg, err := c.newMessage(method, args...) - if err != nil { - return err - } - op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} - - if c.isHTTP { - err = c.sendHTTP(ctx, op, msg) - } else { - err = c.send(ctx, op, msg) - } - if err != nil { - return err - } - - // dispatch has accepted the request and will close the channel when it quits. - switch resp, err := op.wait(ctx, c); { - case err != nil: - return err - case resp.Error != nil: - return resp.Error - case len(resp.Result) == 0: - return errNoResult - default: - return json.Unmarshal(resp.Result, &result) - } -} - -// BatchCall sends all given requests as a single batch and waits for the server -// to return a response for all of them. -// -// In contrast to Call, BatchCall only returns I/O errors. Any error specific to -// a request is reported through the Error field of the corresponding BatchElem. -// -// Note that batch calls may not be executed atomically on the server side. -func (c *Client) BatchCall(b []BatchElem) error { - ctx := context.Background() - return c.BatchCallContext(ctx, b) -} - -// BatchCallContext sends all given requests as a single batch and waits for the server -// to return a response for all of them. The wait duration is bounded by the -// context's deadline. -// -// In contrast to CallContext, BatchCallContext only returns errors that have occurred -// while sending the request. Any error specific to a request is reported through the -// Error field of the corresponding BatchElem. -// -// Note that batch calls may not be executed atomically on the server side. -func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { - msgs := make([]*jsonrpcMessage, len(b)) - op := &requestOp{ - ids: make([]json.RawMessage, len(b)), - resp: make(chan *jsonrpcMessage, len(b)), - } - for i, elem := range b { - msg, err := c.newMessage(elem.Method, elem.Args...) - if err != nil { - return err - } - msgs[i] = msg - op.ids[i] = msg.ID - } - - var err error - if c.isHTTP { - err = c.sendBatchHTTP(ctx, op, msgs) - } else { - err = c.send(ctx, op, msgs) - } - - // Wait for all responses to come back. - for n := 0; n < len(b) && err == nil; n++ { - var resp *jsonrpcMessage - resp, err = op.wait(ctx, c) - if err != nil { - break - } - // Find the element corresponding to this response. - // The element is guaranteed to be present because dispatch - // only sends valid IDs to our channel. - var elem *BatchElem - for i := range msgs { - if bytes.Equal(msgs[i].ID, resp.ID) { - elem = &b[i] - break - } - } - if resp.Error != nil { - elem.Error = resp.Error - continue - } - if len(resp.Result) == 0 { - elem.Error = errNoResult - continue - } - elem.Error = json.Unmarshal(resp.Result, elem.Result) - } - return err -} - -// Notify sends a notification, i.e. a method call that doesn't expect a response. -func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error { - op := new(requestOp) - msg, err := c.newMessage(method, args...) - if err != nil { - return err - } - msg.ID = nil - - if c.isHTTP { - return c.sendHTTP(ctx, op, msg) - } - return c.send(ctx, op, msg) -} - -// EthSubscribe registers a subscripion under the "eth" namespace. -func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - return c.Subscribe(ctx, "eth", channel, args...) -} - -// ShhSubscribe registers a subscripion under the "shh" namespace. -func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - return c.Subscribe(ctx, "shh", channel, args...) -} - -// Subscribe calls the "_subscribe" method with the given arguments, -// registering a subscription. Server notifications for the subscription are -// sent to the given channel. The element type of the channel must match the -// expected type of content returned by the subscription. -// -// The context argument cancels the RPC request that sets up the subscription but has no -// effect on the subscription after Subscribe has returned. -// -// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications -// before considering the subscriber dead. The subscription Err channel will receive -// errSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure -// that the channel usually has at least one reader to prevent this issue. -func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - // Check type of channel first. - chanVal := reflect.ValueOf(channel) - if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic("first argument to Subscribe must be a writable channel") - } - if chanVal.IsNil() { - panic("channel given to Subscribe must not be nil") - } - if c.isHTTP { - return nil, ErrNotificationsUnsupported - } - - msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) - if err != nil { - return nil, err - } - op := &requestOp{ - ids: []json.RawMessage{msg.ID}, - resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, namespace, chanVal), - } - - // Send the subscription request. - // The arrival and validity of the response is signaled on sub.quit. - if err := c.send(ctx, op, msg); err != nil { - return nil, err - } - if _, err := op.wait(ctx, c); err != nil { - return nil, err - } - return op.sub, nil -} - -func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { - msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method} - if paramsIn != nil { // prevent sending "params":null - var err error - if msg.Params, err = json.Marshal(paramsIn); err != nil { - return nil, err - } - } - return msg, nil -} - -// send registers op with the dispatch loop, then sends msg on the connection. -// if sending fails, op is deregistered. -func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { - select { - case c.reqInit <- op: - err := c.write(ctx, msg) - c.reqSent <- err - return err - case <-ctx.Done(): - // This can happen if the client is overloaded or unable to keep up with - // subscription notifications. - return ctx.Err() - case <-c.closing: - return errClientQuit - } -} - -func (c *Client) write(ctx context.Context, msg interface{}) error { - // The previous write failed. Try to establish a new connection. - if c.writeConn == nil { - if err := c.reconnect(ctx); err != nil { - return err - } - } - err := c.writeConn.Write(ctx, msg) - if err != nil { - c.writeConn = nil - } - return err -} - -func (c *Client) reconnect(ctx context.Context) error { - if c.reconnectFunc == nil { - return errDead - } - - if _, ok := ctx.Deadline(); !ok { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout) - defer cancel() - } - newconn, err := c.reconnectFunc(ctx) - if err != nil { - log.Trace("RPC client reconnect failed", "err", err) - return err - } - select { - case c.reconnected <- newconn: - c.writeConn = newconn - return nil - case <-c.didClose: - newconn.Close() - return errClientQuit - } -} - -// dispatch is the main loop of the client. -// It sends read messages to waiting calls to Call and BatchCall -// and subscription notifications to registered subscriptions. -func (c *Client) dispatch(codec ServerCodec) { - var ( - lastOp *requestOp // tracks last send operation - reqInitLock = c.reqInit // nil while the send lock is held - conn = c.newClientConn(codec) - reading = true - ) - defer func() { - close(c.closing) - if reading { - conn.close(errClientQuit, nil) - c.drainRead() - } - close(c.didClose) - }() - - // Spawn the initial read loop. - go c.read(codec) - - for { - select { - case <-c.close: - return - - // Read path: - case op := <-c.readOp: - if op.batch { - conn.handler.handleBatch(op.msgs) - } else { - conn.handler.handleMsg(op.msgs[0]) - } - - case err := <-c.readErr: - conn.handler.log.Debug("RPC connection read error", "err", err) - conn.close(err, lastOp) - reading = false - - // Reconnect: - case newcodec := <-c.reconnected: - log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr()) - if reading { - // Wait for the previous read loop to exit. This is a rare case which - // happens if this loop isn't notified in time after the connection breaks. - // In those cases the caller will notice first and reconnect. Closing the - // handler terminates all waiting requests (closing op.resp) except for - // lastOp, which will be transferred to the new handler. - conn.close(errClientReconnected, lastOp) - c.drainRead() - } - go c.read(newcodec) - reading = true - conn = c.newClientConn(newcodec) - // Re-register the in-flight request on the new handler - // because that's where it will be sent. - conn.handler.addRequestOp(lastOp) - - // Send path: - case op := <-reqInitLock: - // Stop listening for further requests until the current one has been sent. - reqInitLock = nil - lastOp = op - conn.handler.addRequestOp(op) - - case err := <-c.reqSent: - if err != nil { - // Remove response handlers for the last send. When the read loop - // goes down, it will signal all other current operations. - conn.handler.removeRequestOp(lastOp) - } - // Let the next request in. - reqInitLock = c.reqInit - lastOp = nil - - case op := <-c.reqTimeout: - conn.handler.removeRequestOp(op) - } - } -} - -// drainRead drops read messages until an error occurs. -func (c *Client) drainRead() { - for { - select { - case <-c.readOp: - case <-c.readErr: - return - } - } -} - -// read decodes RPC messages from a codec, feeding them into dispatch. -func (c *Client) read(codec ServerCodec) { - for { - msgs, batch, err := codec.Read() - if _, ok := err.(*json.SyntaxError); ok { - codec.Write(context.Background(), errorMessage(&parseError{err.Error()})) - } - if err != nil { - c.readErr <- err - return - } - c.readOp <- readOp{msgs, batch} - } -} diff --git a/rpc/endpoints.go b/rpc/endpoints.go deleted file mode 100644 index 3fe0f1767..000000000 --- a/rpc/endpoints.go +++ /dev/null @@ -1,79 +0,0 @@ -package rpc - -import ( - "net" - - "github.com/ethereum/go-ethereum/log" -) - -// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules -func StartHTTPEndpoint(endpoint string, apis []API) (net.Listener, *Server, error) { - // Register all the APIs exposed by the services - handler := NewServer() - for _, api := range apis { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return nil, nil, err - } - log.Debug("HTTP registered", "namespace", api.Namespace) - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { - return nil, nil, err - } - go NewHTTPServer(handler).Serve(listener) - return listener, handler, err -} - -// StartWSEndpoint starts a websocket endpoint -func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) { - - // Generate the whitelist based on the allowed modules - whitelist := make(map[string]bool) - for _, module := range modules { - whitelist[module] = true - } - // Register all the APIs exposed by the services - handler := NewServer() - for _, api := range apis { - if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return nil, nil, err - } - log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) - } - } - // All APIs registered, start the HTTP listener - var ( - listener net.Listener - err error - ) - if listener, err = net.Listen("tcp", endpoint); err != nil { - return nil, nil, err - } - go NewWSServer(wsOrigins, handler).Serve(listener) - return listener, handler, err - -} - -// StartIPCEndpoint starts an IPC endpoint. -func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { - // Register all the APIs exposed by the services. - handler := NewServer() - for _, api := range apis { - if err := handler.RegisterName(api.Namespace, api.Service); err != nil { - return nil, nil, err - } - log.Debug("IPC registered", "namespace", api.Namespace) - } - // All APIs registered, start the IPC listener. - listener, err := ipcListen(ipcEndpoint) - if err != nil { - return nil, nil, err - } - go handler.ServeListener(listener) - return listener, handler, nil -} diff --git a/rpc/errors.go b/rpc/errors.go deleted file mode 100644 index 50bb0692c..000000000 --- a/rpc/errors.go +++ /dev/null @@ -1,49 +0,0 @@ -package rpc - -import "fmt" - -const defaultErrorCode = -32000 - -type methodNotFoundError struct{ method string } - -func (e *methodNotFoundError) ErrorCode() int { return -32601 } - -func (e *methodNotFoundError) Error() string { - return fmt.Sprintf("the method %s does not exist/is not available", e.method) -} - -type subscriptionNotFoundError struct{ namespace, subscription string } - -func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } - -func (e *subscriptionNotFoundError) Error() string { - return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) -} - -// Invalid JSON was received by the server. -type parseError struct{ message string } - -func (e *parseError) ErrorCode() int { return -32700 } - -func (e *parseError) Error() string { return e.message } - -// received message isn't a valid request -type invalidRequestError struct{ message string } - -func (e *invalidRequestError) ErrorCode() int { return -32600 } - -func (e *invalidRequestError) Error() string { return e.message } - -// received message is invalid -type invalidMessageError struct{ message string } - -func (e *invalidMessageError) ErrorCode() int { return -32700 } - -func (e *invalidMessageError) Error() string { return e.message } - -// unable to decode supplied params, or an invalid number of parameters -type invalidParamsError struct{ message string } - -func (e *invalidParamsError) ErrorCode() int { return -32602 } - -func (e *invalidParamsError) Error() string { return e.message } diff --git a/rpc/handler.go b/rpc/handler.go deleted file mode 100644 index ca0fb20df..000000000 --- a/rpc/handler.go +++ /dev/null @@ -1,381 +0,0 @@ -package rpc - -import ( - "context" - "encoding/json" - "reflect" - "strconv" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" -) - -// handler handles JSON-RPC messages. There is one handler per connection. Note that -// handler is not safe for concurrent use. Message handling never blocks indefinitely -// because RPCs are processed on background goroutines launched by handler. -// -// The entry points for incoming messages are: -// -// h.handleMsg(message) -// h.handleBatch(message) -// -// Outgoing calls use the requestOp struct. Register the request before sending it -// on the connection: -// -// op := &requestOp{ids: ...} -// h.addRequestOp(op) -// -// Now send the request, then wait for the reply to be delivered through handleMsg: -// -// if err := op.wait(...); err != nil { -// h.removeRequestOp(op) // timeout, etc. -// } -// -type handler struct { - reg *serviceRegistry - unsubscribeCb *callback - idgen func() ID // subscription ID generator - respWait map[string]*requestOp // active client requests - clientSubs map[string]*ClientSubscription // active client subscriptions - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent - log log.Logger - allowSubscribe bool - - subLock sync.Mutex - serverSubs map[ID]*Subscription -} - -type callProc struct { - ctx context.Context - notifiers []*Notifier -} - -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { - rootCtx, cancelRoot := context.WithCancel(connCtx) - h := &handler{ - reg: reg, - idgen: idgen, - conn: conn, - respWait: make(map[string]*requestOp), - clientSubs: make(map[string]*ClientSubscription), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - allowSubscribe: true, - serverSubs: make(map[ID]*Subscription), - log: log.Root(), - } - if conn.RemoteAddr() != "" { - h.log = h.log.New("conn", conn.RemoteAddr()) - } - h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) - return h -} - -// handleBatch executes all messages in a batch and returns the responses. -func (h *handler) handleBatch(msgs []*jsonrpcMessage) { - // Emit error response for empty batches: - if len(msgs) == 0 { - h.startCallProc(func(cp *callProc) { - h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) - }) - return - } - - // Handle non-call messages first: - calls := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range msgs { - if handled := h.handleImmediate(msg); !handled { - calls = append(calls, msg) - } - } - if len(calls) == 0 { - return - } - // Process calls on a goroutine because they may block indefinitely: - h.startCallProc(func(cp *callProc) { - answers := make([]*jsonrpcMessage, 0, len(msgs)) - for _, msg := range calls { - if answer := h.handleCallMsg(cp, msg); answer != nil { - answers = append(answers, answer) - } - } - h.addSubscriptions(cp.notifiers) - if len(answers) > 0 { - h.conn.Write(cp.ctx, answers) - } - for _, n := range cp.notifiers { - n.activate() - } - }) -} - -// handleMsg handles a single message. -func (h *handler) handleMsg(msg *jsonrpcMessage) { - if ok := h.handleImmediate(msg); ok { - return - } - h.startCallProc(func(cp *callProc) { - answer := h.handleCallMsg(cp, msg) - h.addSubscriptions(cp.notifiers) - if answer != nil { - h.conn.Write(cp.ctx, answer) - } - for _, n := range cp.notifiers { - n.activate() - } - }) -} - -// close cancels all requests except for inflightReq and waits for -// call goroutines to shut down. -func (h *handler) close(err error, inflightReq *requestOp) { - h.cancelAllRequests(err, inflightReq) - h.cancelRoot() - h.callWG.Wait() - h.cancelServerSubscriptions(err) -} - -// addRequestOp registers a request operation. -func (h *handler) addRequestOp(op *requestOp) { - for _, id := range op.ids { - h.respWait[string(id)] = op - } -} - -// removeRequestOps stops waiting for the given request IDs. -func (h *handler) removeRequestOp(op *requestOp) { - for _, id := range op.ids { - delete(h.respWait, string(id)) - } -} - -// cancelAllRequests unblocks and removes pending requests and active subscriptions. -func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { - didClose := make(map[*requestOp]bool) - if inflightReq != nil { - didClose[inflightReq] = true - } - - for id, op := range h.respWait { - // Remove the op so that later calls will not close op.resp again. - delete(h.respWait, id) - - if !didClose[op] { - op.err = err - close(op.resp) - didClose[op] = true - } - } - for id, sub := range h.clientSubs { - delete(h.clientSubs, id) - sub.quitWithError(false, err) - } -} - -func (h *handler) addSubscriptions(nn []*Notifier) { - h.subLock.Lock() - defer h.subLock.Unlock() - - for _, n := range nn { - if sub := n.takeSubscription(); sub != nil { - h.serverSubs[sub.ID] = sub - } - } -} - -// cancelServerSubscriptions removes all subscriptions and closes their error channels. -func (h *handler) cancelServerSubscriptions(err error) { - h.subLock.Lock() - defer h.subLock.Unlock() - - for id, s := range h.serverSubs { - s.err <- err - close(s.err) - delete(h.serverSubs, id) - } -} - -// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group. -func (h *handler) startCallProc(fn func(*callProc)) { - h.callWG.Add(1) - go func() { - ctx, cancel := context.WithCancel(h.rootCtx) - defer h.callWG.Done() - defer cancel() - fn(&callProc{ctx: ctx}) - }() -} - -// handleImmediate executes non-call messages. It returns false if the message is a -// call or requires a reply. -func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { - start := time.Now() - switch { - case msg.isNotification(): - if strings.HasSuffix(msg.Method, notificationMethodSuffix) { - h.handleSubscriptionResult(msg) - return true - } - return false - case msg.isResponse(): - h.handleResponse(msg) - h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start)) - return true - default: - return false - } -} - -// handleSubscriptionResult processes subscription notifications. -func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { - var result subscriptionResult - if err := json.Unmarshal(msg.Params, &result); err != nil { - h.log.Debug("Dropping invalid subscription message") - return - } - if h.clientSubs[result.ID] != nil { - h.clientSubs[result.ID].deliver(result.Result) - } -} - -// handleResponse processes method call responses. -func (h *handler) handleResponse(msg *jsonrpcMessage) { - op := h.respWait[string(msg.ID)] - if op == nil { - h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) - return - } - delete(h.respWait, string(msg.ID)) - // For normal responses, just forward the reply to Call/BatchCall. - if op.sub == nil { - op.resp <- msg - return - } - // For subscription responses, start the subscription if the server - // indicates success. EthSubscribe gets unblocked in either case through - // the op.resp channel. - defer close(op.resp) - if msg.Error != nil { - op.err = msg.Error - return - } - if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { - go op.sub.start() - h.clientSubs[op.sub.subid] = op.sub - } -} - -// handleCallMsg executes a call message and returns the answer. -func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - start := time.Now() - switch { - case msg.isNotification(): - h.handleCall(ctx, msg) - h.log.Debug("Served "+msg.Method, "t", time.Since(start)) - return nil - case msg.isCall(): - resp := h.handleCall(ctx, msg) - if resp.Error != nil { - h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) - } else { - h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) - } - return resp - case msg.hasValidID(): - return msg.errorResponse(&invalidRequestError{"invalid request"}) - default: - return errorMessage(&invalidRequestError{"invalid request"}) - } -} - -// handleCall processes method calls. -func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - if msg.isSubscribe() { - return h.handleSubscribe(cp, msg) - } - var callb *callback - if msg.isUnsubscribe() { - callb = h.unsubscribeCb - } else { - callb = h.reg.callback(msg.Method) - } - if callb == nil { - return msg.errorResponse(&methodNotFoundError{method: msg.Method}) - } - args, err := parsePositionalArguments(msg.Params, callb.argTypes) - if err != nil { - return msg.errorResponse(&invalidParamsError{err.Error()}) - } - - return h.runMethod(cp.ctx, msg, callb, args) -} - -// handleSubscribe processes *_subscribe method calls. -func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { - if !h.allowSubscribe { - return msg.errorResponse(ErrNotificationsUnsupported) - } - - // Subscription method name is first argument. - name, err := parseSubscriptionName(msg.Params) - if err != nil { - return msg.errorResponse(&invalidParamsError{err.Error()}) - } - namespace := msg.namespace() - callb := h.reg.subscription(namespace, name) - if callb == nil { - return msg.errorResponse(&subscriptionNotFoundError{namespace, name}) - } - - // Parse subscription name arg too, but remove it before calling the callback. - argTypes := append([]reflect.Type{stringType}, callb.argTypes...) - args, err := parsePositionalArguments(msg.Params, argTypes) - if err != nil { - return msg.errorResponse(&invalidParamsError{err.Error()}) - } - args = args[1:] - - // Install notifier in context so the subscription handler can find it. - n := &Notifier{h: h, namespace: namespace} - cp.notifiers = append(cp.notifiers, n) - ctx := context.WithValue(cp.ctx, notifierKey{}, n) - - return h.runMethod(ctx, msg, callb, args) -} - -// runMethod runs the Go callback for an RPC method. -func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage { - result, err := callb.call(ctx, msg.Method, args) - if err != nil { - return msg.errorResponse(err) - } - return msg.response(result) -} - -// unsubscribe is the callback function for all *_unsubscribe calls. -func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) { - h.subLock.Lock() - defer h.subLock.Unlock() - - s := h.serverSubs[id] - if s == nil { - return false, ErrSubscriptionNotFound - } - close(s.err) - delete(h.serverSubs, id) - return true, nil -} - -type idForLog struct{ json.RawMessage } - -func (id idForLog) String() string { - if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { - return s - } - return string(id.RawMessage) -} diff --git a/rpc/http.go b/rpc/http.go deleted file mode 100644 index c6a0d4c36..000000000 --- a/rpc/http.go +++ /dev/null @@ -1,322 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" - "net" - "net/http" - "sync" - "time" -) - -const ( - maxRequestContentLength = 1024 * 512 - contentType = "application/json" -) - -// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13 -var acceptedContentTypes = []string{contentType, "application/json-rpc", "application/jsonrequest"} - -type httpConn struct { - client *http.Client - req *http.Request - closeOnce sync.Once - closed chan interface{} -} - -// httpConn is treated specially by Client. -func (hc *httpConn) Write(context.Context, interface{}) error { - panic("Write called on httpConn") -} - -func (hc *httpConn) RemoteAddr() string { - return hc.req.URL.String() -} - -func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) { - <-hc.closed - return nil, false, io.EOF -} - -func (hc *httpConn) Close() { - hc.closeOnce.Do(func() { close(hc.closed) }) -} - -func (hc *httpConn) Closed() <-chan interface{} { - return hc.closed -} - -// HTTPTimeouts represents the configuration params for the HTTP RPC server. -type HTTPTimeouts struct { - // ReadTimeout is the maximum duration for reading the entire - // request, including the body. - // - // Because ReadTimeout does not let Handlers make per-request - // decisions on each request body's acceptable deadline or - // upload rate, most users will prefer to use - // ReadHeaderTimeout. It is valid to use them both. - ReadTimeout time.Duration - - // WriteTimeout is the maximum duration before timing out - // writes of the response. It is reset whenever a new - // request's header is read. Like ReadTimeout, it does not - // let Handlers make decisions on a per-request basis. - WriteTimeout time.Duration - - // IdleTimeout is the maximum amount of time to wait for the - // next request when keep-alives are enabled. If IdleTimeout - // is zero, the value of ReadTimeout is used. If both are - // zero, ReadHeaderTimeout is used. - IdleTimeout time.Duration -} - -// DefaultHTTPTimeouts represents the default timeout values used if further -// configuration is not provided. -var DefaultHTTPTimeouts = HTTPTimeouts{ - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 120 * time.Second, -} - -// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP -// using the provided HTTP Client. -func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { - req, err := http.NewRequest(http.MethodPost, endpoint, nil) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", contentType) - req.Header.Set("Accept", contentType) - - initctx := context.Background() - return newClient(initctx, func(context.Context) (ServerCodec, error) { - return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil - }) -} - -// DialHTTP creates a new RPC client that connects to an RPC server over HTTP. -func DialHTTP(endpoint string) (*Client, error) { - return DialHTTPWithClient(endpoint, new(http.Client)) -} - -func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { - hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msg) - if respBody != nil { - defer respBody.Close() - } - - if err != nil { - if respBody != nil { - buf := new(bytes.Buffer) - if _, err2 := buf.ReadFrom(respBody); err2 == nil { - return fmt.Errorf("%v %v", err, buf.String()) - } - } - return err - } - var respmsg jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { - return err - } - op.resp <- &respmsg - return nil -} - -func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { - hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msgs) - if err != nil { - return err - } - defer respBody.Close() - var respmsgs []jsonrpcMessage - if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { - return err - } - for i := 0; i < len(respmsgs); i++ { - op.resp <- &respmsgs[i] - } - return nil -} - -func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { - body, err := json.Marshal(msg) - if err != nil { - return nil, err - } - req := hc.req.WithContext(ctx) - req.Body = ioutil.NopCloser(bytes.NewReader(body)) - req.ContentLength = int64(len(body)) - - resp, err := hc.client.Do(req) - if err != nil { - return nil, err - } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return resp.Body, errors.New(resp.Status) - } - return resp.Body, nil -} - -// httpServerConn turns a HTTP connection into a Conn. -type httpServerConn struct { - io.Reader - io.Writer - r *http.Request -} - -func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec { - body := io.LimitReader(r.Body, maxRequestContentLength) - conn := &httpServerConn{Reader: body, Writer: w, r: r} - return NewJSONCodec(conn) -} - -// Close does nothing and always returns nil. -func (t *httpServerConn) Close() error { return nil } - -// RemoteAddr returns the peer address of the underlying connection. -func (t *httpServerConn) RemoteAddr() string { - return t.r.RemoteAddr -} - -// SetWriteDeadline does nothing and always returns nil. -func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil } - -// NewHTTPServer creates a new HTTP RPC server around an API provider. -// -// Deprecated: Server implements http.Handler -func NewHTTPServer(srv http.Handler) *http.Server { - // TODO(ricl): eth wraps it in CORS-handler. Might need to port if we want to use cors. - handler := srv - - // TODO(ricl): port timeout handlers if necessary - // Bundle and start the HTTP server - return &http.Server{ - Handler: handler, - } -} - -// ServeHTTP serves JSON-RPC requests over HTTP. -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Permit dumb empty requests for remote health-checks (AWS) - if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { - return - } - if code, err := validateRequest(r); err != nil { - http.Error(w, err.Error(), code) - return - } - // All checks passed, create a codec that reads direct from the request body - // untilEOF and writes the response to w and order the server to process a - // single request. - ctx := r.Context() - ctx = context.WithValue(ctx, interface{}("remote"), r.RemoteAddr) - ctx = context.WithValue(ctx, interface{}("scheme"), r.Proto) - ctx = context.WithValue(ctx, interface{}("local"), r.Host) - if ua := r.Header.Get("User-Agent"); ua != "" { - ctx = context.WithValue(ctx, interface{}("User-Agent"), ua) - } - if origin := r.Header.Get("Origin"); origin != "" { - ctx = context.WithValue(ctx, interface{}("Origin"), origin) - } - - w.Header().Set("content-type", contentType) - codec := newHTTPServerConn(r, w) - defer codec.Close() - s.serveSingleRequest(ctx, codec) -} - -// validateRequest returns a non-zero response code and error message if the -// request is invalid. -func validateRequest(r *http.Request) (int, error) { - if r.Method == http.MethodPut || r.Method == http.MethodDelete { - return http.StatusMethodNotAllowed, errors.New("method not allowed") - } - if r.ContentLength > maxRequestContentLength { - err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength) - return http.StatusRequestEntityTooLarge, err - } - // Allow OPTIONS (regardless of content-type) - if r.Method == http.MethodOptions { - return 0, nil - } - // Check content-type - if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil { - for _, accepted := range acceptedContentTypes { - if accepted == mt { - return 0, nil - } - } - } - // Invalid content-type - err := fmt.Errorf("invalid content type, only %s is supported", contentType) - return http.StatusUnsupportedMediaType, err -} - -// func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler { -// // disable CORS support if user has not specified a custom CORS configuration -// if len(allowedOrigins) == 0 { -// return srv -// } -// c := cors.New(cors.Options{ -// AllowedOrigins: allowedOrigins, -// AllowedMethods: []string{http.MethodPost, http.MethodGet}, -// MaxAge: 600, -// AllowedHeaders: []string{"*"}, -// }) -// return c.Handler(srv) -// } - -// virtualHostHandler is a handler which validates the Host-header of incoming requests. -// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers, -// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header -// which domain was used, and validate that against a whitelist. -type virtualHostHandler struct { - vhosts map[string]struct{} - next http.Handler -} - -// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler -func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // if r.Host is not set, we can continue serving since a browser would set the Host header - if r.Host == "" { - h.next.ServeHTTP(w, r) - return - } - host, _, err := net.SplitHostPort(r.Host) - if err != nil { - // Either invalid (too many colons) or no port specified - host = r.Host - } - if ipAddr := net.ParseIP(host); ipAddr != nil { - // It's an IP address, we can serve that - h.next.ServeHTTP(w, r) - return - - } - // Not an ip address, but a hostname. Need to validate - if _, exist := h.vhosts["*"]; exist { - h.next.ServeHTTP(w, r) - return - } - if _, exist := h.vhosts[host]; exist { - h.next.ServeHTTP(w, r) - return - } - http.Error(w, "invalid host specified", http.StatusForbidden) -} - -// func newVHostHandler(vhosts []string, next http.Handler) http.Handler { -// vhostMap := make(map[string]struct{}) -// for _, allowedHost := range vhosts { -// vhostMap[strings.ToLower(allowedHost)] = struct{}{} -// } -// return &virtualHostHandler{vhostMap, next} -// } diff --git a/rpc/ipc.go b/rpc/ipc.go deleted file mode 100644 index 9f7fee254..000000000 --- a/rpc/ipc.go +++ /dev/null @@ -1,40 +0,0 @@ -package rpc - -import ( - "context" - "net" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p/netutil" -) - -// ServeListener accepts connections on l, serving JSON-RPC on them. -func (s *Server) ServeListener(l net.Listener) error { - for { - conn, err := l.Accept() - if netutil.IsTemporaryError(err) { - log.Warn("RPC accept error", "err", err) - continue - } else if err != nil { - return err - } - log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) - go s.ServeCodec(NewJSONCodec(conn)) - } -} - -// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes -// the endpoint is the full path to a unix socket, and Windows the endpoint is an -// identifier for a named pipe. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialIPC(ctx context.Context, endpoint string) (*Client, error) { - return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { - conn, err := newIPCConnection(ctx, endpoint) - if err != nil { - return nil, err - } - return NewJSONCodec(conn), err - }) -} diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go deleted file mode 100644 index fb957c0b5..000000000 --- a/rpc/ipc_unix.go +++ /dev/null @@ -1,46 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "net" - "os" - "path/filepath" - - "github.com/ethereum/go-ethereum/log" -) - -/* -#include - -int max_socket_path_size() { -struct sockaddr_un s; -return sizeof(s.sun_path); -} -*/ -import "C" - -// ipcListen will create a Unix socket on the given endpoint. -func ipcListen(endpoint string) (net.Listener, error) { - if len(endpoint) > int(C.max_socket_path_size()) { - log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", C.max_socket_path_size()), - "endpoint", endpoint) - } - - // Ensure the IPC path exists and remove any previous leftover - if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil { - return nil, err - } - os.Remove(endpoint) - l, err := net.Listen("unix", endpoint) - if err != nil { - return nil, err - } - os.Chmod(endpoint, 0600) - return l, nil -} - -// newIPCConnection will connect to a Unix socket on the given endpoint. -func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { - return dialContext(ctx, "unix", endpoint) -} diff --git a/rpc/json.go b/rpc/json.go deleted file mode 100644 index 7cdcdbe4f..000000000 --- a/rpc/json.go +++ /dev/null @@ -1,314 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "io" - "reflect" - "strings" - "sync" - "time" -) - -const ( - vsn = "2.0" - serviceMethodSeparator = "_" - subscribeMethodSuffix = "_subscribe" - unsubscribeMethodSuffix = "_unsubscribe" - notificationMethodSuffix = "_subscription" - - defaultWriteTimeout = 10 * time.Second // used if context has no deadline -) - -var null = json.RawMessage("null") - -type subscriptionResult struct { - ID string `json:"subscription"` - Result json.RawMessage `json:"result,omitempty"` -} - -// A value of this type can a JSON-RPC request, notification, successful response or -// error response. Which one it is depends on the fields. -type jsonrpcMessage struct { - Version string `json:"jsonrpc,omitempty"` - ID json.RawMessage `json:"id,omitempty"` - Method string `json:"method,omitempty"` - Params json.RawMessage `json:"params,omitempty"` - Error *jsonError `json:"error,omitempty"` - Result json.RawMessage `json:"result,omitempty"` -} - -func (msg *jsonrpcMessage) isNotification() bool { - return msg.ID == nil && msg.Method != "" -} - -func (msg *jsonrpcMessage) isCall() bool { - return msg.hasValidID() && msg.Method != "" -} - -func (msg *jsonrpcMessage) isResponse() bool { - return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil) -} - -func (msg *jsonrpcMessage) hasValidID() bool { - return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' -} - -func (msg *jsonrpcMessage) isSubscribe() bool { - return strings.HasSuffix(msg.Method, subscribeMethodSuffix) -} - -func (msg *jsonrpcMessage) isUnsubscribe() bool { - return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix) -} - -func (msg *jsonrpcMessage) namespace() string { - elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2) - return elem[0] -} - -func (msg *jsonrpcMessage) String() string { - b, _ := json.Marshal(msg) - return string(b) -} - -func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { - resp := errorMessage(err) - resp.ID = msg.ID - return resp -} - -func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { - enc, err := json.Marshal(result) - if err != nil { - // TODO: wrap with 'internal server error' - return msg.errorResponse(err) - } - return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc} -} - -func errorMessage(err error) *jsonrpcMessage { - msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{ - Code: defaultErrorCode, - Message: err.Error(), - }} - ec, ok := err.(Error) - if ok { - msg.Error.Code = ec.ErrorCode() - } - return msg -} - -type jsonError struct { - Code int `json:"code"` - Message string `json:"message"` - Data interface{} `json:"data,omitempty"` -} - -func (err *jsonError) Error() string { - if err.Message == "" { - return fmt.Sprintf("json-rpc error %d", err.Code) - } - return err.Message -} - -func (err *jsonError) ErrorCode() int { - return err.Code -} - -// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec. -type Conn interface { - io.ReadWriteCloser - SetWriteDeadline(time.Time) error -} - -// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description -// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this -// description is used in log messages. -type ConnRemoteAddr interface { - RemoteAddr() string -} - -// connWithRemoteAddr overrides the remote address of a connection. -type connWithRemoteAddr struct { - Conn - addr string -} - -func (c connWithRemoteAddr) RemoteAddr() string { return c.addr } - -// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has -// support for parsing arguments and serializing (result) objects. -type jsonCodec struct { - remoteAddr string - closer sync.Once // close closed channel once - closed chan interface{} // closed on Close - decode func(v interface{}) error // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode func(v interface{}) error // encoder to allow multiple transports - conn Conn -} - -// NewCodec creates a new RPC server codec with support for JSON-RPC 2.0 based -// on explicitly given encoding and decoding methods. -func NewCodec(conn Conn, encode, decode func(v interface{}) error) ServerCodec { - codec := &jsonCodec{ - closed: make(chan interface{}), - encode: encode, - decode: decode, - conn: conn, - } - if ra, ok := conn.(ConnRemoteAddr); ok { - codec.remoteAddr = ra.RemoteAddr() - } - return codec -} - -// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0. -func NewJSONCodec(conn Conn) ServerCodec { - enc := json.NewEncoder(conn) - dec := json.NewDecoder(conn) - dec.UseNumber() - return NewCodec(conn, enc.Encode, dec.Decode) -} - -func (c *jsonCodec) RemoteAddr() string { - return c.remoteAddr -} - -func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) { - // Decode the next JSON object in the input stream. - // This verifies basic syntax, etc. - var rawmsg json.RawMessage - if err := c.decode(&rawmsg); err != nil { - return nil, false, err - } - msg, batch = parseMessage(rawmsg) - return msg, batch, nil -} - -// Write sends a message to client. -func (c *jsonCodec) Write(ctx context.Context, v interface{}) error { - c.encMu.Lock() - defer c.encMu.Unlock() - - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(defaultWriteTimeout) - } - c.conn.SetWriteDeadline(deadline) - return c.encode(v) -} - -// Close the underlying connection -func (c *jsonCodec) Close() { - c.closer.Do(func() { - close(c.closed) - c.conn.Close() - }) -} - -// Closed returns a channel which will be closed when Close is called -func (c *jsonCodec) Closed() <-chan interface{} { - return c.closed -} - -// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error -// checks in this function because the raw message has already been syntax-checked when it -// is called. Any non-JSON-RPC messages in the input return the zero value of -// jsonrpcMessage. -func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { - if !isBatch(raw) { - msgs := []*jsonrpcMessage{{}} - json.Unmarshal(raw, &msgs[0]) - return msgs, false - } - dec := json.NewDecoder(bytes.NewReader(raw)) - dec.Token() // skip '[' - var msgs []*jsonrpcMessage - for dec.More() { - msgs = append(msgs, new(jsonrpcMessage)) - dec.Decode(&msgs[len(msgs)-1]) - } - return msgs, true -} - -// isBatch returns true when the first non-whitespace characters is '[' -func isBatch(raw json.RawMessage) bool { - for _, c := range raw { - // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) - if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { - continue - } - return c == '[' - } - return false -} - -// parsePositionalArguments tries to parse the given args to an array of values with the -// given types. It returns the parsed values or an error when the args could not be -// parsed. Missing optional arguments are returned as reflect.Zero values. -func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) { - dec := json.NewDecoder(bytes.NewReader(rawArgs)) - var args []reflect.Value - tok, err := dec.Token() - switch { - case err == io.EOF || tok == nil && err == nil: - // "params" is optional and may be empty. Also allow "params":null even though it's - // not in the spec because our own client used to send it. - case err != nil: - return nil, err - case tok == json.Delim('['): - // Read argument array. - if args, err = parseArgumentArray(dec, types); err != nil { - return nil, err - } - default: - return nil, errors.New("non-array args") - } - // Set any missing args to nil. - for i := len(args); i < len(types); i++ { - if types[i].Kind() != reflect.Ptr { - return nil, fmt.Errorf("missing value for required argument %d", i) - } - args = append(args, reflect.Zero(types[i])) - } - return args, nil -} - -func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) { - args := make([]reflect.Value, 0, len(types)) - for i := 0; dec.More(); i++ { - if i >= len(types) { - return args, fmt.Errorf("too many arguments, want at most %d", len(types)) - } - argval := reflect.New(types[i]) - if err := dec.Decode(argval.Interface()); err != nil { - return args, fmt.Errorf("invalid argument %d: %v", i, err) - } - if argval.IsNil() && types[i].Kind() != reflect.Ptr { - return args, fmt.Errorf("missing value for required argument %d", i) - } - args = append(args, argval.Elem()) - } - // Read end of args array. - _, err := dec.Token() - return args, err -} - -// parseSubscriptionName extracts the subscription name from an encoded argument array. -func parseSubscriptionName(rawArgs json.RawMessage) (string, error) { - dec := json.NewDecoder(bytes.NewReader(rawArgs)) - if tok, _ := dec.Token(); tok != json.Delim('[') { - return "", errors.New("non-array args") - } - v, _ := dec.Token() - method, ok := v.(string) - if !ok { - return "", errors.New("expected subscription name as first argument") - } - return method, nil -} diff --git a/rpc/server.go b/rpc/server.go deleted file mode 100644 index 416aff8e6..000000000 --- a/rpc/server.go +++ /dev/null @@ -1,116 +0,0 @@ -package rpc - -import ( - "context" - "io" - "sync/atomic" - - mapset "github.com/deckarep/golang-set" - "github.com/ethereum/go-ethereum/log" -) - -const metadataAPI = "rpc" - -// Server is an RPC server. -type Server struct { - services serviceRegistry - idgen func() ID - run int32 - codecs mapset.Set -} - -// NewServer creates a new server instance with no registered handlers. -func NewServer() *Server { - server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} - // Register the default service providing meta information about the RPC service such - // as the services and methods it offers. - rpcService := &Service{server} - server.RegisterName(metadataAPI, rpcService) - return server -} - -// RegisterName creates a service for the given receiver type under the given name. When no -// methods on the given receiver match the criteria to be either a RPC method or a -// subscription an error is returned. Otherwise a new service is created and added to the -// service collection this server provides to clients. -func (s *Server) RegisterName(name string, receiver interface{}) error { - return s.services.registerName(name, receiver) -} - -// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes -// the response back using the given codec. It will block until the codec is closed or the -// server is stopped. In either case the codec is closed. -func (s *Server) ServeCodec(codec ServerCodec) { - defer codec.Close() - - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { - return - } - - // Add the codec to the set so it can be closed by Stop. - s.codecs.Add(codec) - defer s.codecs.Remove(codec) - - c := initClient(codec, s.idgen, &s.services) - <-codec.Closed() - c.Close() -} - -// serveSingleRequest reads and processes a single RPC request from the given codec. This -// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in -// this mode. -func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { - // Don't serve if server is stopped. - if atomic.LoadInt32(&s.run) == 0 { - return - } - - h := newHandler(ctx, codec, s.idgen, &s.services) - h.allowSubscribe = false - defer h.close(io.EOF, nil) - - reqs, batch, err := codec.Read() - if err != nil { - if err != io.EOF { - codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"})) - } - return - } - if batch { - h.handleBatch(reqs) - } else { - h.handleMsg(reqs[0]) - } -} - -// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending -// requests to finish, then closes all codecs which will cancel pending requests and -// subscriptions. -func (s *Server) Stop() { - if atomic.CompareAndSwapInt32(&s.run, 1, 0) { - log.Debug("RPC server shutting down") - s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).Close() - return true - }) - } -} - -// Service gives meta information about the server. -// e.g. gives information about the loaded modules. -type Service struct { - server *Server -} - -// Modules returns the list of RPC services with their version number -func (s *Service) Modules() map[string]string { - s.server.services.mu.Lock() - defer s.server.services.mu.Unlock() - - modules := make(map[string]string) - for name := range s.server.services.services { - modules[name] = "1.0" - } - return modules -} diff --git a/rpc/service.go b/rpc/service.go deleted file mode 100644 index 34278f3e4..000000000 --- a/rpc/service.go +++ /dev/null @@ -1,269 +0,0 @@ -package rpc - -import ( - "context" - "errors" - "fmt" - "reflect" - "runtime" - "strings" - "sync" - "unicode" - "unicode/utf8" - - "github.com/ethereum/go-ethereum/log" -) - -var ( - contextType = reflect.TypeOf((*context.Context)(nil)).Elem() - errorType = reflect.TypeOf((*error)(nil)).Elem() - subscriptionType = reflect.TypeOf(Subscription{}) - stringType = reflect.TypeOf("") -) - -type serviceRegistry struct { - mu sync.Mutex - services map[string]service -} - -// service represents a registered object. -type service struct { - name string // name for service - callbacks map[string]*callback // registered handlers - subscriptions map[string]*callback // available subscriptions/notifications -} - -// callback is a method callback which was registered in the server -type callback struct { - fn reflect.Value // the function - rcvr reflect.Value // receiver object of method, set if fn is method - argTypes []reflect.Type // input argument types - hasCtx bool // method's first argument is a context (not included in argTypes) - errPos int // err return idx, of -1 when method cannot return error - isSubscribe bool // true if this is a subscription callback -} - -func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { - rcvrVal := reflect.ValueOf(rcvr) - if name == "" { - return fmt.Errorf("no service name for type %s", rcvrVal.Type().String()) - } - callbacks := suitableCallbacks(rcvrVal) - if len(callbacks) == 0 { - return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr) - } - - r.mu.Lock() - defer r.mu.Unlock() - if r.services == nil { - r.services = make(map[string]service) - } - svc, ok := r.services[name] - if !ok { - svc = service{ - name: name, - callbacks: make(map[string]*callback), - subscriptions: make(map[string]*callback), - } - r.services[name] = svc - } - for name, cb := range callbacks { - if cb.isSubscribe { - svc.subscriptions[name] = cb - } else { - svc.callbacks[name] = cb - } - } - return nil -} - -// callback returns the callback corresponding to the given RPC method name. -func (r *serviceRegistry) callback(method string) *callback { - elem := strings.SplitN(method, serviceMethodSeparator, 2) - if len(elem) != 2 { - return nil - } - r.mu.Lock() - defer r.mu.Unlock() - return r.services[elem[0]].callbacks[elem[1]] -} - -// subscription returns a subscription callback in the given service. -func (r *serviceRegistry) subscription(service, name string) *callback { - r.mu.Lock() - defer r.mu.Unlock() - return r.services[service].subscriptions[name] -} - -// suitableCallbacks iterates over the methods of the given type. It determines if a method -// satisfies the criteria for a RPC callback or a subscription callback and adds it to the -// collection of callbacks. See server documentation for a summary of these criteria. -func suitableCallbacks(receiver reflect.Value) map[string]*callback { - typ := receiver.Type() - callbacks := make(map[string]*callback) - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - if method.PkgPath != "" { - continue // method not exported - } - cb := newCallback(receiver, method.Func) - if cb == nil { - continue // function invalid - } - name := formatName(method.Name) - callbacks[name] = cb - } - return callbacks -} - -// newCallback turns fn (a function) into a callback object. It returns nil if the function -// is unsuitable as an RPC callback. -func newCallback(receiver, fn reflect.Value) *callback { - fntype := fn.Type() - c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)} - // Determine parameter types. They must all be exported or builtin types. - c.makeArgTypes() - if !allExportedOrBuiltin(c.argTypes) { - return nil - } - // Verify return types. The function must return at most one error - // and/or one other non-error value. - outs := make([]reflect.Type, fntype.NumOut()) - for i := 0; i < fntype.NumOut(); i++ { - outs[i] = fntype.Out(i) - } - if len(outs) > 2 || !allExportedOrBuiltin(outs) { - return nil - } - // If an error is returned, it must be the last returned value. - switch { - case len(outs) == 1 && isErrorType(outs[0]): - c.errPos = 0 - case len(outs) == 2: - if isErrorType(outs[0]) || !isErrorType(outs[1]) { - return nil - } - c.errPos = 1 - } - return c -} - -// makeArgTypes composes the argTypes list. -func (c *callback) makeArgTypes() { - fntype := c.fn.Type() - // Skip receiver and context.Context parameter (if present). - firstArg := 0 - if c.rcvr.IsValid() { - firstArg++ - } - if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType { - c.hasCtx = true - firstArg++ - } - // Add all remaining parameters. - c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg) - for i := firstArg; i < fntype.NumIn(); i++ { - c.argTypes[i-firstArg] = fntype.In(i) - } -} - -// call invokes the callback. -func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) { - // Create the argument slice. - fullargs := make([]reflect.Value, 0, 2+len(args)) - if c.rcvr.IsValid() { - fullargs = append(fullargs, c.rcvr) - } - if c.hasCtx { - fullargs = append(fullargs, reflect.ValueOf(ctx)) - } - fullargs = append(fullargs, args...) - - // Catch panic while running the callback. - defer func() { - if err := recover(); err != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf)) - errRes = errors.New("method handler crashed") - } - }() - // Run the callback. - results := c.fn.Call(fullargs) - if len(results) == 0 { - return nil, nil - } - if c.errPos >= 0 && !results[c.errPos].IsNil() { - // Method has returned non-nil error value. - err := results[c.errPos].Interface().(error) - return reflect.Value{}, err - } - return results[0].Interface(), nil -} - -// Is this an exported - upper case - name? -func isExported(name string) bool { - rune, _ := utf8.DecodeRuneInString(name) - return unicode.IsUpper(rune) -} - -// Are all those types exported or built-in? -func allExportedOrBuiltin(types []reflect.Type) bool { - for _, typ := range types { - for typ.Kind() == reflect.Ptr { - typ = typ.Elem() - } - // PkgPath will be non-empty even for an exported type, - // so we need to check the type name as well. - if !isExported(typ.Name()) && typ.PkgPath() != "" { - return false - } - } - return true -} - -// Is t context.Context or *context.Context? -func isContextType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t == contextType -} - -// Does t satisfy the error interface? -func isErrorType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t.Implements(errorType) -} - -// Is t Subscription or *Subscription? -func isSubscriptionType(t reflect.Type) bool { - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t == subscriptionType -} - -// isPubSub tests whether the given method has as as first argument a context.Context and -// returns the pair (Subscription, error). -func isPubSub(methodType reflect.Type) bool { - // numIn(0) is the receiver type - if methodType.NumIn() < 2 || methodType.NumOut() != 2 { - return false - } - return isContextType(methodType.In(1)) && - isSubscriptionType(methodType.Out(0)) && - isErrorType(methodType.Out(1)) -} - -// formatName converts to first character of name to lowercase. -func formatName(name string) string { - ret := []rune(name) - if len(ret) > 0 { - ret[0] = unicode.ToLower(ret[0]) - } - return string(ret) -} diff --git a/rpc/stdio.go b/rpc/stdio.go deleted file mode 100644 index c7497e831..000000000 --- a/rpc/stdio.go +++ /dev/null @@ -1,50 +0,0 @@ -package rpc - -import ( - "context" - "errors" - "io" - "net" - "os" - "time" -) - -// DialStdIO creates a client on stdin/stdout. -func DialStdIO(ctx context.Context) (*Client, error) { - return DialIO(ctx, os.Stdin, os.Stdout) -} - -// DialIO creates a client which uses the given IO channels -func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) { - return newClient(ctx, func(_ context.Context) (ServerCodec, error) { - return NewJSONCodec(stdioConn{ - in: in, - out: out, - }), nil - }) -} - -type stdioConn struct { - in io.Reader - out io.Writer -} - -func (io stdioConn) Read(b []byte) (n int, err error) { - return io.in.Read(b) -} - -func (io stdioConn) Write(b []byte) (n int, err error) { - return io.out.Write(b) -} - -func (io stdioConn) Close() error { - return nil -} - -func (io stdioConn) RemoteAddr() string { - return "/dev/stdin" -} - -func (io stdioConn) SetWriteDeadline(t time.Time) error { - return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} -} diff --git a/rpc/subscription.go b/rpc/subscription.go deleted file mode 100644 index a1e0a9435..000000000 --- a/rpc/subscription.go +++ /dev/null @@ -1,311 +0,0 @@ -package rpc - -import ( - "bufio" - "container/list" - "context" - crand "crypto/rand" - "encoding/binary" - "encoding/hex" - "encoding/json" - "errors" - "math/rand" - "reflect" - "strings" - "sync" - "time" -) - -var ( - // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("notifications not supported") - // ErrSubscriptionNotFound is returned when the subscription for the given id is not found - ErrSubscriptionNotFound = errors.New("subscription not found") -) - -var globalGen = randomIDGenerator() - -// ID defines a pseudo random number that is used to identify RPC subscriptions. -type ID string - -// NewID returns a new, random ID. -func NewID() ID { - return globalGen() -} - -// randomIDGenerator returns a function generates a random IDs. -func randomIDGenerator() func() ID { - seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) - if err != nil { - seed = int64(time.Now().Nanosecond()) - } - var ( - mu sync.Mutex - rng = rand.New(rand.NewSource(seed)) - ) - return func() ID { - mu.Lock() - defer mu.Unlock() - id := make([]byte, 16) - rng.Read(id) - return encodeID(id) - } -} - -func encodeID(b []byte) ID { - id := hex.EncodeToString(b) - id = strings.TrimLeft(id, "0") - if id == "" { - id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0. - } - return ID("0x" + id) -} - -type notifierKey struct{} - -// NotifierFromContext returns the Notifier value stored in ctx, if any. -func NotifierFromContext(ctx context.Context) (*Notifier, bool) { - n, ok := ctx.Value(notifierKey{}).(*Notifier) - return n, ok -} - -// Notifier is tied to a RPC connection that supports subscriptions. -// Server callbacks use the notifier to send notifications. -type Notifier struct { - h *handler - namespace string - - mu sync.Mutex - sub *Subscription - buffer []json.RawMessage - callReturned bool - activated bool -} - -// CreateSubscription returns a new subscription that is coupled to the -// RPC connection. By default subscriptions are inactive and notifications -// are dropped until the subscription is marked as active. This is done -// by the RPC server after the subscription ID is send to the client. -func (n *Notifier) CreateSubscription() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - - if n.sub != nil { - panic("can't create multiple subscriptions with Notifier") - } else if n.callReturned { - panic("can't create subscription after subscribe call has returned") - } - n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)} - return n.sub -} - -// Notify sends a notification to the client with the given data as payload. -// If an error occurs the RPC connection is closed and the error is returned. -func (n *Notifier) Notify(id ID, data interface{}) error { - enc, err := json.Marshal(data) - if err != nil { - return err - } - - n.mu.Lock() - defer n.mu.Unlock() - - if n.sub == nil { - panic("can't Notify before subscription is created") - } else if n.sub.ID != id { - panic("Notify with wrong ID") - } - if n.activated { - return n.send(n.sub, enc) - } - n.buffer = append(n.buffer, enc) - return nil -} - -// Closed returns a channel that is closed when the RPC connection is closed. -// Deprecated: use subscription error channel -func (n *Notifier) Closed() <-chan interface{} { - return n.h.conn.Closed() -} - -// takeSubscription returns the subscription (if one has been created). No subscription can -// be created after this call. -func (n *Notifier) takeSubscription() *Subscription { - n.mu.Lock() - defer n.mu.Unlock() - n.callReturned = true - return n.sub -} - -// acticate is called after the subscription ID was sent to client. Notifications are -// buffered before activation. This prevents notifications being sent to the client before -// the subscription ID is sent to the client. -func (n *Notifier) activate() error { - n.mu.Lock() - defer n.mu.Unlock() - - for _, data := range n.buffer { - if err := n.send(n.sub, data); err != nil { - return err - } - } - n.activated = true - return nil -} - -func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { - params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) - ctx := context.Background() - return n.h.conn.Write(ctx, &jsonrpcMessage{ - Version: vsn, - Method: n.namespace + notificationMethodSuffix, - Params: params, - }) -} - -// A Subscription is created by a notifier and tight to that notifier. The client can use -// this subscription to wait for an unsubscribe request for the client, see Err(). -type Subscription struct { - ID ID - namespace string - err chan error // closed on unsubscribe -} - -// Err returns a channel that is closed when the client send an unsubscribe request. -func (s *Subscription) Err() <-chan error { - return s.err -} - -// MarshalJSON marshals a subscription as its ID. -func (s *Subscription) MarshalJSON() ([]byte, error) { - return json.Marshal(s.ID) -} - -// ClientSubscription is a subscription established through the Client's Subscribe or -// EthSubscribe methods. -type ClientSubscription struct { - client *Client - etype reflect.Type - channel reflect.Value - namespace string - subid string - in chan json.RawMessage - - quitOnce sync.Once // ensures quit is closed once - quit chan struct{} // quit is closed when the subscription exits - errOnce sync.Once // ensures err is closed once - err chan error -} - -func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { - sub := &ClientSubscription{ - client: c, - namespace: namespace, - etype: channel.Type().Elem(), - channel: channel, - quit: make(chan struct{}), - err: make(chan error, 1), - in: make(chan json.RawMessage), - } - return sub -} - -// Err returns the subscription error channel. The intended use of Err is to schedule -// resubscription when the client connection is closed unexpectedly. -// -// The error channel receives a value when the subscription has ended due -// to an error. The received error is nil if Close has been called -// on the underlying client and no other error has occurred. -// -// The error channel is closed when Unsubscribe is called on the subscription. -func (sub *ClientSubscription) Err() <-chan error { - return sub.err -} - -// Unsubscribe unsubscribes the notification and closes the error channel. -// It can safely be called more than once. -func (sub *ClientSubscription) Unsubscribe() { - sub.quitWithError(true, nil) - sub.errOnce.Do(func() { close(sub.err) }) -} - -func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { - sub.quitOnce.Do(func() { - // The dispatch loop won't be able to execute the unsubscribe call - // if it is blocked on deliver. Close sub.quit first because it - // unblocks deliver. - close(sub.quit) - if unsubscribeServer { - sub.requestUnsubscribe() - } - if err != nil { - if err == errClientQuit { - err = nil // Adhere to subscription semantics. - } - sub.err <- err - } - }) -} - -func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { - select { - case sub.in <- result: - return true - case <-sub.quit: - return false - } -} - -func (sub *ClientSubscription) start() { - sub.quitWithError(sub.forward()) -} - -func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { - cases := []reflect.SelectCase{ - {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, - {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, - {Dir: reflect.SelectSend, Chan: sub.channel}, - } - buffer := list.New() - defer buffer.Init() - for { - var chosen int - var recv reflect.Value - if buffer.Len() == 0 { - // Idle, omit send case. - chosen, recv, _ = reflect.Select(cases[:2]) - } else { - // Non-empty buffer, send the first queued item. - cases[2].Send = reflect.ValueOf(buffer.Front().Value) - chosen, recv, _ = reflect.Select(cases) - } - - switch chosen { - case 0: // <-sub.quit - return false, nil - case 1: // <-sub.in - val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) - if err != nil { - return true, err - } - if buffer.Len() == maxClientSubscriptionBuffer { - return true, errSubscriptionQueueOverflow - } - buffer.PushBack(val) - case 2: // sub.channel<- - cases[2].Send = reflect.Value{} // Don't hold onto the value. - buffer.Remove(buffer.Front()) - } - } -} - -func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { - val := reflect.New(sub.etype) - err := json.Unmarshal(result, val.Interface()) - return val.Elem().Interface(), err -} - -func (sub *ClientSubscription) requestUnsubscribe() error { - var result interface{} - return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) -} diff --git a/rpc/types.go b/rpc/types.go deleted file mode 100644 index 1b0025fdf..000000000 --- a/rpc/types.go +++ /dev/null @@ -1,94 +0,0 @@ -package rpc - -import ( - "context" - "fmt" - "math" - "strings" - - "github.com/ethereum/go-ethereum/common/hexutil" -) - -// API describes the set of methods offered over the RPC interface -type API struct { - Namespace string // namespace under which the rpc methods of Service are exposed - Version string // api version for DApp's - Service interface{} // receiver instance which holds the methods - Public bool // indication if the methods must be considered safe for public use -} - -// Error wraps RPC errors, which contain an error code in addition to the message. -type Error interface { - Error() string // returns the message - ErrorCode() int // returns the code -} - -// ServerCodec implements reading, parsing and writing RPC messages for the server side of -// a RPC session. Implementations must be go-routine safe since the codec can be called in -// multiple go-routines concurrently. -type ServerCodec interface { - Read() (msgs []*jsonrpcMessage, isBatch bool, err error) - Close() - jsonWriter -} - -// jsonWriter can write JSON messages to its underlying connection. -// Implementations must be safe for concurrent use. -type jsonWriter interface { - Write(context.Context, interface{}) error - // Closed returns a channel which is closed when the connection is closed. - Closed() <-chan interface{} - // RemoteAddr returns the peer address of the connection. - RemoteAddr() string -} - -// BlockNumber ... -type BlockNumber int64 - -const ( - // PendingBlockNumber ... - PendingBlockNumber = BlockNumber(-2) - latestBlockNumber = BlockNumber(-1) - earliestBlockNumber = BlockNumber(0) -) - -// UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports: -// - "latest", "earliest" or "pending" as string arguments -// - the block number -// Returned errors: -// - an invalid block number error when the given argument isn't a known strings -// - an out of range error when the given block number is either too little or too large -func (bn *BlockNumber) UnmarshalJSON(data []byte) error { - input := strings.TrimSpace(string(data)) - if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { - input = input[1 : len(input)-1] - } - - switch input { - case "earliest": - *bn = earliestBlockNumber - return nil - case "latest": - *bn = latestBlockNumber - return nil - case "pending": - *bn = PendingBlockNumber - return nil - } - - blckNum, err := hexutil.DecodeUint64(input) - if err != nil { - return err - } - if blckNum > math.MaxInt64 { - return fmt.Errorf("Blocknumber too high") - } - - *bn = BlockNumber(blckNum) - return nil -} - -// Int64 turns blockNumber to int64 -func (bn BlockNumber) Int64() int64 { - return (int64)(bn) -} diff --git a/rpc/websocket.go b/rpc/websocket.go deleted file mode 100644 index f0a820137..000000000 --- a/rpc/websocket.go +++ /dev/null @@ -1,222 +0,0 @@ -package rpc - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "net" - "net/http" - "net/url" - "os" - "strings" - "time" - - mapset "github.com/deckarep/golang-set" - "github.com/ethereum/go-ethereum/log" - "golang.org/x/net/websocket" -) - -// websocketJSONCodec is a custom JSON codec with payload size enforcement and -// special number parsing. -var websocketJSONCodec = websocket.Codec{ - // Marshal is the stock JSON marshaller used by the websocket library too. - Marshal: func(v interface{}) ([]byte, byte, error) { - msg, err := json.Marshal(v) - return msg, websocket.TextFrame, err - }, - // Unmarshal is a specialized unmarshaller to properly convert numbers. - Unmarshal: func(msg []byte, payloadType byte, v interface{}) error { - dec := json.NewDecoder(bytes.NewReader(msg)) - dec.UseNumber() - - return dec.Decode(v) - }, -} - -// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. -// -// allowedOrigins should be a comma-separated list of allowed origin URLs. -// To allow connections with any origin, pass "*". -func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { - return websocket.Server{ - Handshake: wsHandshakeValidator(allowedOrigins), - Handler: func(conn *websocket.Conn) { - codec := newWebsocketCodec(conn) - s.ServeCodec(codec) - }, - } -} - -func newWebsocketCodec(conn *websocket.Conn) ServerCodec { - // Create a custom encode/decode pair to enforce payload size and number encoding - conn.MaxPayloadBytes = maxRequestContentLength - encoder := func(v interface{}) error { - return websocketJSONCodec.Send(conn, v) - } - decoder := func(v interface{}) error { - return websocketJSONCodec.Receive(conn, v) - } - rpcconn := Conn(conn) - if conn.IsServerConn() { - // Override remote address with the actual socket address because - // package websocket crashes if there is no request origin. - addr := conn.Request().RemoteAddr - if wsaddr := conn.RemoteAddr().(*websocket.Addr); wsaddr.URL != nil { - // Add origin if present. - addr += "(" + wsaddr.URL.String() + ")" - } - rpcconn = connWithRemoteAddr{conn, addr} - } - return NewCodec(rpcconn, encoder, decoder) -} - -// NewWSServer creates a new websocket RPC server around an API provider. -// -// Deprecated: use Server.WebsocketHandler -func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { - return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} -} - -// wsHandshakeValidator returns a handler that verifies the origin during the -// websocket upgrade process. When a '*' is specified as an allowed origins all -// connections are accepted. -func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error { - origins := mapset.NewSet() - allowAllOrigins := false - - for _, origin := range allowedOrigins { - if origin == "*" { - allowAllOrigins = true - } - if origin != "" { - origins.Add(strings.ToLower(origin)) - } - } - - // allow localhost if no allowedOrigins are specified. - if len(origins.ToSlice()) == 0 { - origins.Add("http://localhost") - if hostname, err := os.Hostname(); err == nil { - origins.Add("http://" + strings.ToLower(hostname)) - } - } - - log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice())) - - f := func(cfg *websocket.Config, req *http.Request) error { - // Skip origin verification if no Origin header is present. The origin check - // is supposed to protect against browser based attacks. Browsers always set - // Origin. Non-browser software can put anything in origin and checking it doesn't - // provide additional security. - if _, ok := req.Header["Origin"]; !ok { - return nil - } - // Verify origin against whitelist. - origin := strings.ToLower(req.Header.Get("Origin")) - if allowAllOrigins || origins.Contains(origin) { - return nil - } - log.Warn("Rejected WebSocket connection", "origin", origin) - return errors.New("origin not allowed") - } - - return f -} - -func wsGetConfig(endpoint, origin string) (*websocket.Config, error) { - if origin == "" { - var err error - if origin, err = os.Hostname(); err != nil { - return nil, err - } - if strings.HasPrefix(endpoint, "wss") { - origin = "https://" + strings.ToLower(origin) - } else { - origin = "http://" + strings.ToLower(origin) - } - } - config, err := websocket.NewConfig(endpoint, origin) - if err != nil { - return nil, err - } - - if config.Location.User != nil { - b64auth := base64.StdEncoding.EncodeToString([]byte(config.Location.User.String())) - config.Header.Add("Authorization", "Basic "+b64auth) - config.Location.User = nil - } - return config, nil -} - -// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server -// that is listening on the given endpoint. -// -// The context is used for the initial connection establishment. It does not -// affect subsequent interactions with the client. -func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { - config, err := wsGetConfig(endpoint, origin) - if err != nil { - return nil, err - } - - return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { - conn, err := wsDialContext(ctx, config) - if err != nil { - return nil, err - } - return newWebsocketCodec(conn), nil - }) -} - -func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) { - var conn net.Conn - var err error - switch config.Location.Scheme { - case "ws": - conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location)) - case "wss": - dialer := contextDialer(ctx) - conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig) - default: - err = websocket.ErrBadScheme - } - if err != nil { - return nil, err - } - ws, err := websocket.NewClient(config, conn) - if err != nil { - conn.Close() - return nil, err - } - return ws, err -} - -var wsPortMap = map[string]string{"ws": "80", "wss": "443"} - -func wsDialAddress(location *url.URL) string { - if _, ok := wsPortMap[location.Scheme]; ok { - if _, _, err := net.SplitHostPort(location.Host); err != nil { - return net.JoinHostPort(location.Host, wsPortMap[location.Scheme]) - } - } - return location.Host -} - -func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { - d := &net.Dialer{KeepAlive: tcpKeepAliveInterval} - return d.DialContext(ctx, network, addr) -} - -func contextDialer(ctx context.Context) *net.Dialer { - dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} - if deadline, ok := ctx.Deadline(); ok { - dialer.Deadline = deadline - } else { - dialer.Deadline = time.Now().Add(defaultDialTimeout) - } - return dialer -}