new rpc framework

1. Remove eth rpc code and unnecessary changes from
7a0f18f92b. Import the package instead.
2. Move HmyAPIBackend into core package so it has more access to core.
3. Add API interface for services.
4. Start RPC for all roles, for both HTTP (baseport+10) and WS (baseport+20).
3. Keep implemented APIs but move to internal/hmyapi package.
pull/849/head
Yelin Lu 6 years ago committed by Leo Chen
parent 89ad5ef458
commit 0d83532785
  1. 6
      api/service/blockproposal/service.go
  2. 6
      api/service/clientsupport/service.go
  3. 6
      api/service/consensus/service.go
  4. 6
      api/service/discovery/service.go
  5. 6
      api/service/explorer/service.go
  6. 7
      api/service/manager.go
  7. 3
      api/service/manager_test.go
  8. 6
      api/service/networkinfo/service.go
  9. 6
      api/service/newclientsupport/service.go
  10. 6
      api/service/randomness/service.go
  11. 6
      api/service/resharding/service.go
  12. 6
      api/service/restclientsupport/service.go
  13. 128
      api/service/rpc/service.go
  14. 6
      api/service/staking/service.go
  15. 1
      cmd/harmony/main.go
  16. 28
      core/api_backend.go
  17. 3
      core/blockchain.go
  18. 0
      internal/hmyapi/addrlock.go
  19. 11
      internal/hmyapi/backend.go
  20. 0
      internal/hmyapi/private.go
  21. 10
      internal/hmyapi/public.go
  22. 0
      internal/hmyapi/types.go
  23. 133
      node/rpc.go
  24. 32
      node/service.go
  25. 4
      node/service_setup.go
  26. 20
      rpc/README.md
  27. 623
      rpc/client.go
  28. 79
      rpc/endpoints.go
  29. 49
      rpc/errors.go
  30. 381
      rpc/handler.go
  31. 322
      rpc/http.go
  32. 40
      rpc/ipc.go
  33. 46
      rpc/ipc_unix.go
  34. 314
      rpc/json.go
  35. 116
      rpc/server.go
  36. 269
      rpc/service.go
  37. 50
      rpc/stdio.go
  38. 311
      rpc/subscription.go
  39. 94
      rpc/types.go
  40. 222
      rpc/websocket.go

@ -1,6 +1,7 @@
package blockproposal
import (
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
)
@ -54,3 +55,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -4,6 +4,7 @@ import (
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
clientService "github.com/harmony-one/harmony/api/client/service"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/state"
@ -55,3 +56,8 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
func (s *Service) NotifyService(params map[string]interface{}) {
return
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -1,6 +1,7 @@
package consensus
import (
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types"
@ -52,3 +53,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -3,6 +3,7 @@ package discovery
import (
"time"
"github.com/ethereum/go-ethereum/rpc"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service"
@ -147,3 +148,8 @@ func (s *Service) Init() {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
@ -302,3 +303,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -3,6 +3,7 @@ package service
import (
"time"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
)
@ -34,7 +35,6 @@ const (
PeerDiscovery
Resharding
Staking
RPC
Test
Done
)
@ -65,8 +65,6 @@ func (t Type) String() string {
return "PeerDiscovery"
case Resharding:
return "Resharding"
case RPC:
return "RPC"
case Test:
return "Test"
case Done:
@ -95,6 +93,9 @@ type Interface interface {
SetMessageChan(msgChan chan *msg_pb.Message)
StopService()
NotifyService(map[string]interface{})
// APIs retrieves the list of RPC descriptors the service provides
APIs() []rpc.API
}
// Manager stores all services for service manager.

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/p2p"
@ -40,6 +41,8 @@ func (s *SupportSyncingTest) SetMessageChan(msgChan chan *msg_pb.Message) {
s.msgChan = msgChan
}
func (s *SupportSyncingTest) APIs() []rpc.API { return nil }
// Test TakeAction.
func TestTakeAction(t *testing.T) {
m := &Manager{}

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
@ -226,3 +227,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -3,6 +3,7 @@ package newclientsupport
import (
"math/big"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
)
@ -42,3 +43,8 @@ func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
func (s *Service) NotifyService(params map[string]interface{}) {
return
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -1,6 +1,7 @@
package randomness
import (
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/utils"
@ -43,3 +44,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -3,6 +3,7 @@ package resharding
import (
"time"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
@ -89,3 +90,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -11,6 +11,7 @@ import (
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gorilla/mux"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
@ -259,3 +260,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -1,128 +0,0 @@
package rpcservice
import (
"context"
"fmt"
"net"
"net/http"
"strconv"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/rpc"
"github.com/harmony-one/harmony/rpc/hmyapi"
)
const (
rpcPortDiff = 123
)
// Service is the struct for rpc service.
type Service struct {
messageChan chan *msg_pb.Message
server *http.Server
// Util
peer *p2p.Peer
blockchain *core.BlockChain
txPool *core.TxPool
// HTTP RPC
rpcAPIs []rpc.API // List of APIs currently provided by the node
httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
httpWhitelist []string // HTTP RPC modules to allow through this endpoint
httpListener net.Listener // HTTP RPC listener socket to server API requests
httpHandler *rpc.Server // HTTP RPC request handler to process the API requests
}
// New returns RPC service.
func New(b *core.BlockChain, p *p2p.Peer, t *core.TxPool) *Service {
return &Service{
blockchain: b,
peer: p,
txPool: t,
}
}
// StartService starts RPC service.
func (s *Service) StartService() {
utils.GetLogInstance().Info("Starting RPC service.")
if err := s.startRPC(); err != nil {
utils.GetLogInstance().Error("Failed to start RPC service.")
} else {
utils.GetLogInstance().Info("Started RPC service.")
}
}
// StopService shutdowns RPC service.
func (s *Service) StopService() {
utils.GetLogInstance().Info("Shutting down RPC service.")
if err := s.server.Shutdown(context.Background()); err != nil {
utils.GetLogInstance().Error("Error when shutting down RPC server", "error", err)
} else {
utils.GetLogInstance().Error("Shutting down RPC server successufully")
}
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// startRPC is a helper method to start all the various RPC endpoint during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
func (s *Service) startRPC() error {
apis := hmyapi.GetAPIs(rpc.NewBackend(s.blockchain, s.txPool))
port, _ := strconv.Atoi(s.peer.Port)
s.httpEndpoint = fmt.Sprintf("localhost:%v", port+rpcPortDiff)
if err := s.startHTTP(s.httpEndpoint, apis); err != nil {
utils.GetLogInstance().Debug("Failed to start RPC HTTP")
return err
}
utils.GetLogInstance().Debug("Started RPC HTTP")
// All API endpoints started successfully
s.rpcAPIs = apis
return nil
}
// startHTTP initializes and starts the HTTP RPC endpoint.
func (s *Service) startHTTP(endpoint string, apis []rpc.API) error {
utils.GetLogInstance().Debug("rpc startHTTP", "endpoint", endpoint, "apis", apis)
// Short circuit if the HTTP endpoint isn't being exposed
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis)
if err != nil {
return err
}
utils.GetLogInstance().Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint))
// All listeners booted successfully
s.httpEndpoint = endpoint
s.httpListener = listener
s.httpHandler = handler
return nil
}
// stopHTTP terminates the HTTP RPC endpoint.
func (s *Service) stopHTTP() {
if s.httpListener != nil {
s.httpListener.Close()
s.httpListener = nil
utils.GetLogInstance().Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", s.httpEndpoint))
}
if s.httpHandler != nil {
s.httpHandler.Stop()
s.httpHandler = nil
}
}

@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
protobuf "github.com/golang/protobuf/proto"
proto "github.com/harmony-one/harmony/api/client/service/proto"
proto_common "github.com/harmony-one/harmony/api/proto"
@ -250,3 +251,8 @@ func (s *Service) NotifyService(params map[string]interface{}) {
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}
// APIs for the services.
func (s *Service) APIs() []rpc.API {
return nil
}

@ -354,6 +354,7 @@ func main() {
go currentNode.SupportSyncing()
currentNode.ServiceManagerSetup()
currentNode.StartRPC(*port)
currentNode.RunServices()
currentNode.StartServer()
}

@ -1,4 +1,4 @@
package rpc
package core
import (
"context"
@ -6,25 +6,25 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/harmony-one/harmony/core"
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
)
// HmyAPIBackend ...
type HmyAPIBackend struct {
blockchain *core.BlockChain
txPool *core.TxPool
blockchain *BlockChain
txPool *TxPool
}
// NewBackend ...
func NewBackend(blockchain *core.BlockChain, txPool *core.TxPool) *HmyAPIBackend {
func NewBackend(blockchain *BlockChain, txPool *TxPool) *HmyAPIBackend {
return &HmyAPIBackend{blockchain, txPool}
}
// ChainDb ...
func (b *HmyAPIBackend) ChainDb() ethdb.Database {
return b.blockchain.ChainDb()
return b.blockchain.db
}
// GetBlock ...
@ -38,22 +38,22 @@ func (b *HmyAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
}
// BlockByNumber ...
func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr BlockNumber) (*types.Block, error) {
func (b *HmyAPIBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) {
// Pending block is only known by the miner
if blockNr == PendingBlockNumber {
if blockNr == rpc.PendingBlockNumber {
return nil, errors.New("not implemented")
}
// Otherwise resolve and return the block
if blockNr == latestBlockNumber {
if blockNr == rpc.LatestBlockNumber {
return b.blockchain.CurrentBlock(), nil
}
return b.blockchain.GetBlockByNumber(uint64(blockNr)), nil
}
// StateAndHeaderByNumber ...
func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr BlockNumber) (*state.DB, *types.Header, error) {
func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.DB, *types.Header, error) {
// Pending state is only known by the miner
if blockNr == PendingBlockNumber {
if blockNr == rpc.PendingBlockNumber {
return nil, nil, errors.New("not implemented")
}
// Otherwise resolve the block number and return its state
@ -66,13 +66,13 @@ func (b *HmyAPIBackend) StateAndHeaderByNumber(ctx context.Context, blockNr Bloc
}
// HeaderByNumber ...
func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr BlockNumber) (*types.Header, error) {
func (b *HmyAPIBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
// Pending block is only known by the miner
if blockNr == PendingBlockNumber {
if blockNr == rpc.PendingBlockNumber {
return nil, errors.New("not implemented")
}
// Otherwise resolve and return the block
if blockNr == latestBlockNumber {
if blockNr == rpc.LatestBlockNumber {
return b.blockchain.CurrentBlock().Header(), nil
}
return b.blockchain.GetHeaderByNumber(uint64(blockNr)), nil

@ -1744,6 +1744,3 @@ func (bc *BlockChain) StoreNewShardState(block *types.Block, stakeInfo *map[comm
}
return shardState
}
// ChainDb returns the database
func (bc *BlockChain) ChainDb() ethdb.Database { return bc.db }

@ -1,22 +1,21 @@
package hmyapi
import (
"github.com/harmony-one/harmony/rpc"
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/core"
)
const namespace = "hmy"
// GetAPIs returns all the APIs.
func GetAPIs(b *rpc.HmyAPIBackend) []rpc.API {
func GetAPIs(b *core.HmyAPIBackend) []rpc.API {
nonceLock := new(AddrLocker)
return []rpc.API{
{
Namespace: namespace,
Namespace: "hmy",
Version: "1.0",
Service: NewPublicBlockChainAPI(b),
Public: true,
}, {
Namespace: namespace,
Namespace: "hmy",
Version: "1.0",
Service: NewPublicTransactionPoolAPI(b, nonceLock),
Public: true,

@ -6,21 +6,21 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/rpc"
)
// PublicBlockChainAPI provides an API to access the Harmony blockchain.
// It offers only methods that operate on public data that is freely available to anyone.
type PublicBlockChainAPI struct {
b *rpc.HmyAPIBackend
b *core.HmyAPIBackend
}
// NewPublicBlockChainAPI creates a new Harmony blockchain API.
func NewPublicBlockChainAPI(b *rpc.HmyAPIBackend) *PublicBlockChainAPI {
func NewPublicBlockChainAPI(b *core.HmyAPIBackend) *PublicBlockChainAPI {
return &PublicBlockChainAPI{b}
}
@ -124,12 +124,12 @@ func (s *PublicNetAPI) PeerCount() hexutil.Uint {
// PublicTransactionPoolAPI exposes methods for the RPC interface
type PublicTransactionPoolAPI struct {
b *rpc.HmyAPIBackend
b *core.HmyAPIBackend
nonceLock *AddrLocker
}
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(b *rpc.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI {
func NewPublicTransactionPoolAPI(b *core.HmyAPIBackend, nonceLock *AddrLocker) *PublicTransactionPoolAPI {
return &PublicTransactionPoolAPI{b, nonceLock}
}

@ -0,0 +1,133 @@
package node
import (
"fmt"
"net"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/hmyapi"
)
const (
rpcHTTPPortOffset = 10
rpcWSPortOffset = 20
)
var (
// HTTP RPC
rpcAPIs []rpc.API
httpListener net.Listener
httpHandler *rpc.Server
wsListener net.Listener
wsHandler *rpc.Server
httpEndpoint = ""
wsEndpoint = ""
httpModules = []string{"hmy"}
httpVirtualHosts = []string{"*"}
httpTimeouts = rpc.DefaultHTTPTimeouts
wsModules = []string{"net", "web3"}
wsOrigins = []string{"*"}
apiBackend *core.HmyAPIBackend
)
// StartRPC start RPC service
func (node *Node) StartRPC(nodePort string) error {
// Gather all the possible APIs to surface
apiBackend = core.NewBackend(node.blockchain, node.TxPool)
apis := hmyapi.GetAPIs(apiBackend)
for _, service := range node.serviceManager.GetServices() {
apis = append(apis, service.APIs()...)
}
port, _ := strconv.Atoi(nodePort)
httpEndpoint = fmt.Sprintf(":%v", port+rpcHTTPPortOffset)
if err := node.startHTTP(httpEndpoint, apis, httpModules, nil, httpVirtualHosts, httpTimeouts); err != nil {
return err
}
wsEndpoint = fmt.Sprintf(":%v", port+rpcWSPortOffset)
if err := node.startWS(wsEndpoint, apis, wsModules, wsOrigins, true); err != nil {
node.stopHTTP()
return err
}
rpcAPIs = apis
return nil
}
// startHTTP initializes and starts the HTTP RPC endpoint.
func (node *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error {
// Short circuit if the HTTP endpoint isn't being exposed
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts)
if err != nil {
return err
}
log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))
// All listeners booted successfully
httpListener = listener
httpHandler = handler
return nil
}
// stopHTTP terminates the HTTP RPC endpoint.
func (node *Node) stopHTTP() {
if httpListener != nil {
httpListener.Close()
httpListener = nil
log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", httpEndpoint))
}
if httpHandler != nil {
httpHandler.Stop()
httpHandler = nil
}
}
// startWS initializes and starts the websocket RPC endpoint.
func (node *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed
if endpoint == "" {
return nil
}
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)
if err != nil {
return err
}
log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))
// All listeners booted successfully
wsListener = listener
wsHandler = handler
return nil
}
// stopWS terminates the websocket RPC endpoint.
func (node *Node) stopWS() {
if wsListener != nil {
wsListener.Close()
wsListener = nil
log.Info("WebSocket endpoint closed", "url", fmt.Sprintf("ws://%s", wsEndpoint))
}
if wsHandler != nil {
wsHandler.Stop()
wsHandler = nil
}
}

@ -1,32 +0,0 @@
package node
import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/harmony-one/harmony/rpc"
)
// Service is an individual protocol that can be registered into a node.
//
// Notes:
//
// • Service life-cycle management is delegated to the node. The service is allowed to
// initialize itself upon creation, but no goroutines should be spun up outside of the
// Start method.
//
// • Restart logic is not required as the node will create a fresh instance
// every time a service is started.
type Service interface {
// Protocols retrieves the P2P protocols the service wishes to start.
Protocols() []p2p.Protocol
// APIs retrieves the list of RPC descriptors the service provides
APIs() []rpc.API
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
Start(server *p2p.Server) error
// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
Stop() error
}

@ -11,7 +11,6 @@ import (
"github.com/harmony-one/harmony/api/service/networkinfo"
"github.com/harmony-one/harmony/api/service/randomness"
"github.com/harmony-one/harmony/api/service/restclientsupport"
rpcservice "github.com/harmony-one/harmony/api/service/rpc"
"github.com/harmony-one/harmony/api/service/staking"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
@ -39,9 +38,6 @@ func (node *Node) setupForShardLeader() {
node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port))
// Register RPC service
node.serviceManager.RegisterService(service.RPC, rpcservice.New(node.blockchain, &node.SelfPeer, node.TxPool))
}
func (node *Node) setupForShardValidator() {

@ -1,20 +0,0 @@
Files that are mainly ported over from go-ethereum (might include tslint fix):
* rpc/client.go: the rpc client.
* rpc/endpoints.go: starting HTTP/WS/IPC endpoints.
* rpc/errors.go: error code handling.
* rpc/handler.go: handling requests, running methods.
* rpc/http.go: starting HTTP service.
* rpc/ipc_unix.go: util for ipc unix
* rpc/ipc.go: util for general ipc
* rpc/json.go: json de-/serialization
* rpc/server.go: the rpc server
* rpc/stdio.go: stdio for ipc.
* rpc/subscription.go: ws subscriptions.
* rpc/types.go: type declarations
* rpc/websocket.go: ws connection.
* rpc/hmyapi/addrlock.go
Richard's changes:
* rpc/service.go: our service (http/ws/ipc) starter/stoper
* other files under `rpc/hmyapi/`.
* other files that are not in `rpc` folder. Added some util functions.

@ -1,623 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"reflect"
"strconv"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/log"
)
var (
errClientQuit = errors.New("client is closed")
errNoResult = errors.New("no result in JSON-RPC response")
errSubscriptionQueueOverflow = errors.New("subscription queue overflow")
errClientReconnected = errors.New("client reconnected")
errDead = errors.New("connection lost")
)
const (
// Timeouts
tcpKeepAliveInterval = 30 * time.Second
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)
const (
// Subscriptions are removed when the subscriber cannot keep up.
//
// This can be worked around by supplying a channel with sufficiently sized buffer,
// but this can be inconvenient and hard to explain in the docs. Another issue with
// buffered channels is that the buffer is static even though it might not be needed
// most of the time.
//
// The approach taken here is to maintain a per-subscription linked list buffer
// shrinks on demand. If the buffer reaches the size below, the subscription is
// dropped.
maxClientSubscriptionBuffer = 20000
)
// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
Args []interface{}
// The result is unmarshaled into this field. Result must be set to a
// non-nil pointer value of the desired type, otherwise the response will be
// discarded.
Result interface{}
// Error is set if the server returns an error for this request, or if
// unmarshaling into Result fails. It is not set for I/O errors.
Error error
}
// Client represents a connection to an RPC server.
type Client struct {
idgen func() ID // for subscriptions
isHTTP bool
services *serviceRegistry
idCounter uint32
// This function, if non-nil, is called when the connection is lost.
reconnectFunc reconnectFunc
// writeConn is used for writing to the connection on the caller's goroutine. It should
// only be accessed outside of dispatch, with the write lock held. The write lock is
// taken by sending on requestOp and released by sending on sendDone.
writeConn jsonWriter
// for dispatch
close chan struct{}
closing chan struct{} // closed when client is quitting
didClose chan struct{} // closed when client quits
reconnected chan ServerCodec // where write/reconnect sends the new connection
readOp chan readOp // read messages
readErr chan error // errors from read
reqInit chan *requestOp // register response IDs, takes write lock
reqSent chan error // signals write completion, releases write lock
reqTimeout chan *requestOp // removes response IDs when call timeout expires
}
type reconnectFunc func(ctx context.Context) (ServerCodec, error)
type clientContextKey struct{}
type clientConn struct {
codec ServerCodec
handler *handler
}
func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.WithValue(context.Background(), clientContextKey{}, c)
handler := newHandler(ctx, conn, c.idgen, c.services)
return &clientConn{conn, handler}
}
func (cc *clientConn) close(err error, inflightReq *requestOp) {
cc.handler.close(err, inflightReq)
cc.codec.Close()
}
type readOp struct {
msgs []*jsonrpcMessage
batch bool
}
type requestOp struct {
ids []json.RawMessage
err error
resp chan *jsonrpcMessage // receives up to len(ids) responses
sub *ClientSubscription // only set for EthSubscribe requests
}
func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) {
select {
case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs.
select {
case c.reqTimeout <- op:
case <-c.closing:
}
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}
// Dial creates a new client for the given URL.
//
// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
// file name with no URL scheme, a local socket connection is established using UNIX
// domain sockets on supported platforms and named pipes on Windows. If you want to
// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
//
// For websocket connections, the origin is set to the local host name.
//
// The client reconnects automatically if the connection is lost.
func Dial(rawurl string) (*Client, error) {
return DialContext(context.Background(), rawurl)
}
// DialContext creates a new RPC client, just like Dial.
//
// The context is used to cancel or time out the initial connection establishment. It does
// not affect subsequent interactions with the client.
func DialContext(ctx context.Context, rawurl string) (*Client, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
return DialHTTP(rawurl)
case "ws", "wss":
return DialWebsocket(ctx, rawurl, "")
case "stdio":
return DialStdIO(ctx)
case "":
return DialIPC(ctx, rawurl)
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
}
// ClientFromContext Client retrieves the client from the context, if any. This can be used to perform
// 'reverse calls' in a handler method.
func ClientFromContext(ctx context.Context) (*Client, bool) {
client, ok := ctx.Value(clientContextKey{}).(*Client)
return client, ok
}
func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) {
conn, err := connect(initctx)
if err != nil {
return nil, err
}
c := initClient(conn, randomIDGenerator(), new(serviceRegistry))
c.reconnectFunc = connect
return c, nil
}
func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
_, isHTTP := conn.(*httpConn)
c := &Client{
idgen: idgen,
isHTTP: isHTTP,
services: services,
writeConn: conn,
close: make(chan struct{}),
closing: make(chan struct{}),
didClose: make(chan struct{}),
reconnected: make(chan ServerCodec),
readOp: make(chan readOp),
readErr: make(chan error),
reqInit: make(chan *requestOp),
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
}
if !isHTTP {
go c.dispatch(conn)
}
return c
}
// RegisterName creates a service for the given receiver type under the given name. When no
// methods on the given receiver match the criteria to be either a RPC method or a
// subscription an error is returned. Otherwise a new service is created and added to the
// service collection this client provides to the server.
func (c *Client) RegisterName(name string, receiver interface{}) error {
return c.services.registerName(name, receiver)
}
func (c *Client) nextID() json.RawMessage {
id := atomic.AddUint32(&c.idCounter, 1)
return strconv.AppendUint(nil, uint64(id), 10)
}
// SupportedModules calls the rpc_modules method, retrieving the list of
// APIs that are available on the server.
func (c *Client) SupportedModules() (map[string]string, error) {
var result map[string]string
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
err := c.CallContext(ctx, &result, "rpc_modules")
return result, err
}
// Close closes the client, aborting any in-flight requests.
func (c *Client) Close() {
if c.isHTTP {
return
}
select {
case c.close <- struct{}{}:
<-c.didClose
case <-c.didClose:
}
}
// Call performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
ctx := context.Background()
return c.CallContext(ctx, result, method, args...)
}
// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
msg, err := c.newMessage(method, args...)
if err != nil {
return err
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel when it quits.
switch resp, err := op.wait(ctx, c); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return errNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
// BatchCall sends all given requests as a single batch and waits for the server
// to return a response for all of them.
//
// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
// a request is reported through the Error field of the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCall(b []BatchElem) error {
ctx := context.Background()
return c.BatchCallContext(ctx, b)
}
// BatchCallContext sends all given requests as a single batch and waits for the server
// to return a response for all of them. The wait duration is bounded by the
// context's deadline.
//
// In contrast to CallContext, BatchCallContext only returns errors that have occurred
// while sending the request. Any error specific to a request is reported through the
// Error field of the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
msgs := make([]*jsonrpcMessage, len(b))
op := &requestOp{
ids: make([]json.RawMessage, len(b)),
resp: make(chan *jsonrpcMessage, len(b)),
}
for i, elem := range b {
msg, err := c.newMessage(elem.Method, elem.Args...)
if err != nil {
return err
}
msgs[i] = msg
op.ids[i] = msg.ID
}
var err error
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
}
// Wait for all responses to come back.
for n := 0; n < len(b) && err == nil; n++ {
var resp *jsonrpcMessage
resp, err = op.wait(ctx, c)
if err != nil {
break
}
// Find the element corresponding to this response.
// The element is guaranteed to be present because dispatch
// only sends valid IDs to our channel.
var elem *BatchElem
for i := range msgs {
if bytes.Equal(msgs[i].ID, resp.ID) {
elem = &b[i]
break
}
}
if resp.Error != nil {
elem.Error = resp.Error
continue
}
if len(resp.Result) == 0 {
elem.Error = errNoResult
continue
}
elem.Error = json.Unmarshal(resp.Result, elem.Result)
}
return err
}
// Notify sends a notification, i.e. a method call that doesn't expect a response.
func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error {
op := new(requestOp)
msg, err := c.newMessage(method, args...)
if err != nil {
return err
}
msg.ID = nil
if c.isHTTP {
return c.sendHTTP(ctx, op, msg)
}
return c.send(ctx, op, msg)
}
// EthSubscribe registers a subscripion under the "eth" namespace.
func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
return c.Subscribe(ctx, "eth", channel, args...)
}
// ShhSubscribe registers a subscripion under the "shh" namespace.
func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
return c.Subscribe(ctx, "shh", channel, args...)
}
// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
//
// The context argument cancels the RPC request that sets up the subscription but has no
// effect on the subscription after Subscribe has returned.
//
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
// before considering the subscriber dead. The subscription Err channel will receive
// errSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, namespace, chanVal),
}
// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
if err := c.send(ctx, op, msg); err != nil {
return nil, err
}
if _, err := op.wait(ctx, c); err != nil {
return nil, err
}
return op.sub, nil
}
func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
if paramsIn != nil { // prevent sending "params":null
var err error
if msg.Params, err = json.Marshal(paramsIn); err != nil {
return nil, err
}
}
return msg, nil
}
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.reqInit <- op:
err := c.write(ctx, msg)
c.reqSent <- err
return err
case <-ctx.Done():
// This can happen if the client is overloaded or unable to keep up with
// subscription notifications.
return ctx.Err()
case <-c.closing:
return errClientQuit
}
}
func (c *Client) write(ctx context.Context, msg interface{}) error {
// The previous write failed. Try to establish a new connection.
if c.writeConn == nil {
if err := c.reconnect(ctx); err != nil {
return err
}
}
err := c.writeConn.Write(ctx, msg)
if err != nil {
c.writeConn = nil
}
return err
}
func (c *Client) reconnect(ctx context.Context) error {
if c.reconnectFunc == nil {
return errDead
}
if _, ok := ctx.Deadline(); !ok {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
}
newconn, err := c.reconnectFunc(ctx)
if err != nil {
log.Trace("RPC client reconnect failed", "err", err)
return err
}
select {
case c.reconnected <- newconn:
c.writeConn = newconn
return nil
case <-c.didClose:
newconn.Close()
return errClientQuit
}
}
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(codec ServerCodec) {
var (
lastOp *requestOp // tracks last send operation
reqInitLock = c.reqInit // nil while the send lock is held
conn = c.newClientConn(codec)
reading = true
)
defer func() {
close(c.closing)
if reading {
conn.close(errClientQuit, nil)
c.drainRead()
}
close(c.didClose)
}()
// Spawn the initial read loop.
go c.read(codec)
for {
select {
case <-c.close:
return
// Read path:
case op := <-c.readOp:
if op.batch {
conn.handler.handleBatch(op.msgs)
} else {
conn.handler.handleMsg(op.msgs[0])
}
case err := <-c.readErr:
conn.handler.log.Debug("RPC connection read error", "err", err)
conn.close(err, lastOp)
reading = false
// Reconnect:
case newcodec := <-c.reconnected:
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr())
if reading {
// Wait for the previous read loop to exit. This is a rare case which
// happens if this loop isn't notified in time after the connection breaks.
// In those cases the caller will notice first and reconnect. Closing the
// handler terminates all waiting requests (closing op.resp) except for
// lastOp, which will be transferred to the new handler.
conn.close(errClientReconnected, lastOp)
c.drainRead()
}
go c.read(newcodec)
reading = true
conn = c.newClientConn(newcodec)
// Re-register the in-flight request on the new handler
// because that's where it will be sent.
conn.handler.addRequestOp(lastOp)
// Send path:
case op := <-reqInitLock:
// Stop listening for further requests until the current one has been sent.
reqInitLock = nil
lastOp = op
conn.handler.addRequestOp(op)
case err := <-c.reqSent:
if err != nil {
// Remove response handlers for the last send. When the read loop
// goes down, it will signal all other current operations.
conn.handler.removeRequestOp(lastOp)
}
// Let the next request in.
reqInitLock = c.reqInit
lastOp = nil
case op := <-c.reqTimeout:
conn.handler.removeRequestOp(op)
}
}
}
// drainRead drops read messages until an error occurs.
func (c *Client) drainRead() {
for {
select {
case <-c.readOp:
case <-c.readErr:
return
}
}
}
// read decodes RPC messages from a codec, feeding them into dispatch.
func (c *Client) read(codec ServerCodec) {
for {
msgs, batch, err := codec.Read()
if _, ok := err.(*json.SyntaxError); ok {
codec.Write(context.Background(), errorMessage(&parseError{err.Error()}))
}
if err != nil {
c.readErr <- err
return
}
c.readOp <- readOp{msgs, batch}
}
}

@ -1,79 +0,0 @@
package rpc
import (
"net"
"github.com/ethereum/go-ethereum/log"
)
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules
func StartHTTPEndpoint(endpoint string, apis []API) (net.Listener, *Server, error) {
// Register all the APIs exposed by the services
handler := NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("HTTP registered", "namespace", api.Namespace)
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
go NewHTTPServer(handler).Serve(listener)
return listener, handler, err
}
// StartWSEndpoint starts a websocket endpoint
func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) {
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := NewServer()
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
go NewWSServer(wsOrigins, handler).Serve(listener)
return listener, handler, err
}
// StartIPCEndpoint starts an IPC endpoint.
func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) {
// Register all the APIs exposed by the services.
handler := NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("IPC registered", "namespace", api.Namespace)
}
// All APIs registered, start the IPC listener.
listener, err := ipcListen(ipcEndpoint)
if err != nil {
return nil, nil, err
}
go handler.ServeListener(listener)
return listener, handler, nil
}

@ -1,49 +0,0 @@
package rpc
import "fmt"
const defaultErrorCode = -32000
type methodNotFoundError struct{ method string }
func (e *methodNotFoundError) ErrorCode() int { return -32601 }
func (e *methodNotFoundError) Error() string {
return fmt.Sprintf("the method %s does not exist/is not available", e.method)
}
type subscriptionNotFoundError struct{ namespace, subscription string }
func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 }
func (e *subscriptionNotFoundError) Error() string {
return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace)
}
// Invalid JSON was received by the server.
type parseError struct{ message string }
func (e *parseError) ErrorCode() int { return -32700 }
func (e *parseError) Error() string { return e.message }
// received message isn't a valid request
type invalidRequestError struct{ message string }
func (e *invalidRequestError) ErrorCode() int { return -32600 }
func (e *invalidRequestError) Error() string { return e.message }
// received message is invalid
type invalidMessageError struct{ message string }
func (e *invalidMessageError) ErrorCode() int { return -32700 }
func (e *invalidMessageError) Error() string { return e.message }
// unable to decode supplied params, or an invalid number of parameters
type invalidParamsError struct{ message string }
func (e *invalidParamsError) ErrorCode() int { return -32602 }
func (e *invalidParamsError) Error() string { return e.message }

@ -1,381 +0,0 @@
package rpc
import (
"context"
"encoding/json"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
// handler handles JSON-RPC messages. There is one handler per connection. Note that
// handler is not safe for concurrent use. Message handling never blocks indefinitely
// because RPCs are processed on background goroutines launched by handler.
//
// The entry points for incoming messages are:
//
// h.handleMsg(message)
// h.handleBatch(message)
//
// Outgoing calls use the requestOp struct. Register the request before sending it
// on the connection:
//
// op := &requestOp{ids: ...}
// h.addRequestOp(op)
//
// Now send the request, then wait for the reply to be delivered through handleMsg:
//
// if err := op.wait(...); err != nil {
// h.removeRequestOp(op) // timeout, etc.
// }
//
type handler struct {
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
subLock sync.Mutex
serverSubs map[ID]*Subscription
}
type callProc struct {
ctx context.Context
notifiers []*Notifier
}
func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
rootCtx, cancelRoot := context.WithCancel(connCtx)
h := &handler{
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
}
if conn.RemoteAddr() != "" {
h.log = h.log.New("conn", conn.RemoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
return h
}
// handleBatch executes all messages in a batch and returns the responses.
func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
})
return
}
// Handle non-call messages first:
calls := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range msgs {
if handled := h.handleImmediate(msg); !handled {
calls = append(calls, msg)
}
}
if len(calls) == 0 {
return
}
// Process calls on a goroutine because they may block indefinitely:
h.startCallProc(func(cp *callProc) {
answers := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range calls {
if answer := h.handleCallMsg(cp, msg); answer != nil {
answers = append(answers, answer)
}
}
h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
h.conn.Write(cp.ctx, answers)
}
for _, n := range cp.notifiers {
n.activate()
}
})
}
// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
if ok := h.handleImmediate(msg); ok {
return
}
h.startCallProc(func(cp *callProc) {
answer := h.handleCallMsg(cp, msg)
h.addSubscriptions(cp.notifiers)
if answer != nil {
h.conn.Write(cp.ctx, answer)
}
for _, n := range cp.notifiers {
n.activate()
}
})
}
// close cancels all requests except for inflightReq and waits for
// call goroutines to shut down.
func (h *handler) close(err error, inflightReq *requestOp) {
h.cancelAllRequests(err, inflightReq)
h.cancelRoot()
h.callWG.Wait()
h.cancelServerSubscriptions(err)
}
// addRequestOp registers a request operation.
func (h *handler) addRequestOp(op *requestOp) {
for _, id := range op.ids {
h.respWait[string(id)] = op
}
}
// removeRequestOps stops waiting for the given request IDs.
func (h *handler) removeRequestOp(op *requestOp) {
for _, id := range op.ids {
delete(h.respWait, string(id))
}
}
// cancelAllRequests unblocks and removes pending requests and active subscriptions.
func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
didClose := make(map[*requestOp]bool)
if inflightReq != nil {
didClose[inflightReq] = true
}
for id, op := range h.respWait {
// Remove the op so that later calls will not close op.resp again.
delete(h.respWait, id)
if !didClose[op] {
op.err = err
close(op.resp)
didClose[op] = true
}
}
for id, sub := range h.clientSubs {
delete(h.clientSubs, id)
sub.quitWithError(false, err)
}
}
func (h *handler) addSubscriptions(nn []*Notifier) {
h.subLock.Lock()
defer h.subLock.Unlock()
for _, n := range nn {
if sub := n.takeSubscription(); sub != nil {
h.serverSubs[sub.ID] = sub
}
}
}
// cancelServerSubscriptions removes all subscriptions and closes their error channels.
func (h *handler) cancelServerSubscriptions(err error) {
h.subLock.Lock()
defer h.subLock.Unlock()
for id, s := range h.serverSubs {
s.err <- err
close(s.err)
delete(h.serverSubs, id)
}
}
// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
func (h *handler) startCallProc(fn func(*callProc)) {
h.callWG.Add(1)
go func() {
ctx, cancel := context.WithCancel(h.rootCtx)
defer h.callWG.Done()
defer cancel()
fn(&callProc{ctx: ctx})
}()
}
// handleImmediate executes non-call messages. It returns false if the message is a
// call or requires a reply.
func (h *handler) handleImmediate(msg *jsonrpcMessage) bool {
start := time.Now()
switch {
case msg.isNotification():
if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
h.handleSubscriptionResult(msg)
return true
}
return false
case msg.isResponse():
h.handleResponse(msg)
h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start))
return true
default:
return false
}
}
// handleSubscriptionResult processes subscription notifications.
func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
var result subscriptionResult
if err := json.Unmarshal(msg.Params, &result); err != nil {
h.log.Debug("Dropping invalid subscription message")
return
}
if h.clientSubs[result.ID] != nil {
h.clientSubs[result.ID].deliver(result.Result)
}
}
// handleResponse processes method call responses.
func (h *handler) handleResponse(msg *jsonrpcMessage) {
op := h.respWait[string(msg.ID)]
if op == nil {
h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID})
return
}
delete(h.respWait, string(msg.ID))
// For normal responses, just forward the reply to Call/BatchCall.
if op.sub == nil {
op.resp <- msg
return
}
// For subscription responses, start the subscription if the server
// indicates success. EthSubscribe gets unblocked in either case through
// the op.resp channel.
defer close(op.resp)
if msg.Error != nil {
op.err = msg.Error
return
}
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
go op.sub.start()
h.clientSubs[op.sub.subid] = op.sub
}
}
// handleCallMsg executes a call message and returns the answer.
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
start := time.Now()
switch {
case msg.isNotification():
h.handleCall(ctx, msg)
h.log.Debug("Served "+msg.Method, "t", time.Since(start))
return nil
case msg.isCall():
resp := h.handleCall(ctx, msg)
if resp.Error != nil {
h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message)
} else {
h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start))
}
return resp
case msg.hasValidID():
return msg.errorResponse(&invalidRequestError{"invalid request"})
default:
return errorMessage(&invalidRequestError{"invalid request"})
}
}
// handleCall processes method calls.
func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
if msg.isSubscribe() {
return h.handleSubscribe(cp, msg)
}
var callb *callback
if msg.isUnsubscribe() {
callb = h.unsubscribeCb
} else {
callb = h.reg.callback(msg.Method)
}
if callb == nil {
return msg.errorResponse(&methodNotFoundError{method: msg.Method})
}
args, err := parsePositionalArguments(msg.Params, callb.argTypes)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
return h.runMethod(cp.ctx, msg, callb, args)
}
// handleSubscribe processes *_subscribe method calls.
func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
if !h.allowSubscribe {
return msg.errorResponse(ErrNotificationsUnsupported)
}
// Subscription method name is first argument.
name, err := parseSubscriptionName(msg.Params)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
namespace := msg.namespace()
callb := h.reg.subscription(namespace, name)
if callb == nil {
return msg.errorResponse(&subscriptionNotFoundError{namespace, name})
}
// Parse subscription name arg too, but remove it before calling the callback.
argTypes := append([]reflect.Type{stringType}, callb.argTypes...)
args, err := parsePositionalArguments(msg.Params, argTypes)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
args = args[1:]
// Install notifier in context so the subscription handler can find it.
n := &Notifier{h: h, namespace: namespace}
cp.notifiers = append(cp.notifiers, n)
ctx := context.WithValue(cp.ctx, notifierKey{}, n)
return h.runMethod(ctx, msg, callb, args)
}
// runMethod runs the Go callback for an RPC method.
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
result, err := callb.call(ctx, msg.Method, args)
if err != nil {
return msg.errorResponse(err)
}
return msg.response(result)
}
// unsubscribe is the callback function for all *_unsubscribe calls.
func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) {
h.subLock.Lock()
defer h.subLock.Unlock()
s := h.serverSubs[id]
if s == nil {
return false, ErrSubscriptionNotFound
}
close(s.err)
delete(h.serverSubs, id)
return true, nil
}
type idForLog struct{ json.RawMessage }
func (id idForLog) String() string {
if s, err := strconv.Unquote(string(id.RawMessage)); err == nil {
return s
}
return string(id.RawMessage)
}

@ -1,322 +0,0 @@
package rpc
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime"
"net"
"net/http"
"sync"
"time"
)
const (
maxRequestContentLength = 1024 * 512
contentType = "application/json"
)
// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13
var acceptedContentTypes = []string{contentType, "application/json-rpc", "application/jsonrequest"}
type httpConn struct {
client *http.Client
req *http.Request
closeOnce sync.Once
closed chan interface{}
}
// httpConn is treated specially by Client.
func (hc *httpConn) Write(context.Context, interface{}) error {
panic("Write called on httpConn")
}
func (hc *httpConn) RemoteAddr() string {
return hc.req.URL.String()
}
func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) {
<-hc.closed
return nil, false, io.EOF
}
func (hc *httpConn) Close() {
hc.closeOnce.Do(func() { close(hc.closed) })
}
func (hc *httpConn) Closed() <-chan interface{} {
return hc.closed
}
// HTTPTimeouts represents the configuration params for the HTTP RPC server.
type HTTPTimeouts struct {
// ReadTimeout is the maximum duration for reading the entire
// request, including the body.
//
// Because ReadTimeout does not let Handlers make per-request
// decisions on each request body's acceptable deadline or
// upload rate, most users will prefer to use
// ReadHeaderTimeout. It is valid to use them both.
ReadTimeout time.Duration
// WriteTimeout is the maximum duration before timing out
// writes of the response. It is reset whenever a new
// request's header is read. Like ReadTimeout, it does not
// let Handlers make decisions on a per-request basis.
WriteTimeout time.Duration
// IdleTimeout is the maximum amount of time to wait for the
// next request when keep-alives are enabled. If IdleTimeout
// is zero, the value of ReadTimeout is used. If both are
// zero, ReadHeaderTimeout is used.
IdleTimeout time.Duration
}
// DefaultHTTPTimeouts represents the default timeout values used if further
// configuration is not provided.
var DefaultHTTPTimeouts = HTTPTimeouts{
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP
// using the provided HTTP Client.
func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) {
req, err := http.NewRequest(http.MethodPost, endpoint, nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
req.Header.Set("Accept", contentType)
initctx := context.Background()
return newClient(initctx, func(context.Context) (ServerCodec, error) {
return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil
})
}
// DialHTTP creates a new RPC client that connects to an RPC server over HTTP.
func DialHTTP(endpoint string) (*Client, error) {
return DialHTTPWithClient(endpoint, new(http.Client))
}
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msg)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
if respBody != nil {
buf := new(bytes.Buffer)
if _, err2 := buf.ReadFrom(respBody); err2 == nil {
return fmt.Errorf("%v %v", err, buf.String())
}
}
return err
}
var respmsg jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
return err
}
op.resp <- &respmsg
return nil
}
func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msgs)
if err != nil {
return err
}
defer respBody.Close()
var respmsgs []jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
for i := 0; i < len(respmsgs); i++ {
op.resp <- &respmsgs[i]
}
return nil
}
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
body, err := json.Marshal(msg)
if err != nil {
return nil, err
}
req := hc.req.WithContext(ctx)
req.Body = ioutil.NopCloser(bytes.NewReader(body))
req.ContentLength = int64(len(body))
resp, err := hc.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return resp.Body, errors.New(resp.Status)
}
return resp.Body, nil
}
// httpServerConn turns a HTTP connection into a Conn.
type httpServerConn struct {
io.Reader
io.Writer
r *http.Request
}
func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec {
body := io.LimitReader(r.Body, maxRequestContentLength)
conn := &httpServerConn{Reader: body, Writer: w, r: r}
return NewJSONCodec(conn)
}
// Close does nothing and always returns nil.
func (t *httpServerConn) Close() error { return nil }
// RemoteAddr returns the peer address of the underlying connection.
func (t *httpServerConn) RemoteAddr() string {
return t.r.RemoteAddr
}
// SetWriteDeadline does nothing and always returns nil.
func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil }
// NewHTTPServer creates a new HTTP RPC server around an API provider.
//
// Deprecated: Server implements http.Handler
func NewHTTPServer(srv http.Handler) *http.Server {
// TODO(ricl): eth wraps it in CORS-handler. Might need to port if we want to use cors.
handler := srv
// TODO(ricl): port timeout handlers if necessary
// Bundle and start the HTTP server
return &http.Server{
Handler: handler,
}
}
// ServeHTTP serves JSON-RPC requests over HTTP.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Permit dumb empty requests for remote health-checks (AWS)
if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" {
return
}
if code, err := validateRequest(r); err != nil {
http.Error(w, err.Error(), code)
return
}
// All checks passed, create a codec that reads direct from the request body
// untilEOF and writes the response to w and order the server to process a
// single request.
ctx := r.Context()
ctx = context.WithValue(ctx, interface{}("remote"), r.RemoteAddr)
ctx = context.WithValue(ctx, interface{}("scheme"), r.Proto)
ctx = context.WithValue(ctx, interface{}("local"), r.Host)
if ua := r.Header.Get("User-Agent"); ua != "" {
ctx = context.WithValue(ctx, interface{}("User-Agent"), ua)
}
if origin := r.Header.Get("Origin"); origin != "" {
ctx = context.WithValue(ctx, interface{}("Origin"), origin)
}
w.Header().Set("content-type", contentType)
codec := newHTTPServerConn(r, w)
defer codec.Close()
s.serveSingleRequest(ctx, codec)
}
// validateRequest returns a non-zero response code and error message if the
// request is invalid.
func validateRequest(r *http.Request) (int, error) {
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
return http.StatusMethodNotAllowed, errors.New("method not allowed")
}
if r.ContentLength > maxRequestContentLength {
err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength)
return http.StatusRequestEntityTooLarge, err
}
// Allow OPTIONS (regardless of content-type)
if r.Method == http.MethodOptions {
return 0, nil
}
// Check content-type
if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil {
for _, accepted := range acceptedContentTypes {
if accepted == mt {
return 0, nil
}
}
}
// Invalid content-type
err := fmt.Errorf("invalid content type, only %s is supported", contentType)
return http.StatusUnsupportedMediaType, err
}
// func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler {
// // disable CORS support if user has not specified a custom CORS configuration
// if len(allowedOrigins) == 0 {
// return srv
// }
// c := cors.New(cors.Options{
// AllowedOrigins: allowedOrigins,
// AllowedMethods: []string{http.MethodPost, http.MethodGet},
// MaxAge: 600,
// AllowedHeaders: []string{"*"},
// })
// return c.Handler(srv)
// }
// virtualHostHandler is a handler which validates the Host-header of incoming requests.
// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers,
// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header
// which domain was used, and validate that against a whitelist.
type virtualHostHandler struct {
vhosts map[string]struct{}
next http.Handler
}
// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler
func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// if r.Host is not set, we can continue serving since a browser would set the Host header
if r.Host == "" {
h.next.ServeHTTP(w, r)
return
}
host, _, err := net.SplitHostPort(r.Host)
if err != nil {
// Either invalid (too many colons) or no port specified
host = r.Host
}
if ipAddr := net.ParseIP(host); ipAddr != nil {
// It's an IP address, we can serve that
h.next.ServeHTTP(w, r)
return
}
// Not an ip address, but a hostname. Need to validate
if _, exist := h.vhosts["*"]; exist {
h.next.ServeHTTP(w, r)
return
}
if _, exist := h.vhosts[host]; exist {
h.next.ServeHTTP(w, r)
return
}
http.Error(w, "invalid host specified", http.StatusForbidden)
}
// func newVHostHandler(vhosts []string, next http.Handler) http.Handler {
// vhostMap := make(map[string]struct{})
// for _, allowedHost := range vhosts {
// vhostMap[strings.ToLower(allowedHost)] = struct{}{}
// }
// return &virtualHostHandler{vhostMap, next}
// }

@ -1,40 +0,0 @@
package rpc
import (
"context"
"net"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/netutil"
)
// ServeListener accepts connections on l, serving JSON-RPC on them.
func (s *Server) ServeListener(l net.Listener) error {
for {
conn, err := l.Accept()
if netutil.IsTemporaryError(err) {
log.Warn("RPC accept error", "err", err)
continue
} else if err != nil {
return err
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
go s.ServeCodec(NewJSONCodec(conn))
}
}
// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
// the endpoint is the full path to a unix socket, and Windows the endpoint is an
// identifier for a named pipe.
//
// The context is used for the initial connection establishment. It does not
// affect subsequent interactions with the client.
func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
return newClient(ctx, func(ctx context.Context) (ServerCodec, error) {
conn, err := newIPCConnection(ctx, endpoint)
if err != nil {
return nil, err
}
return NewJSONCodec(conn), err
})
}

@ -1,46 +0,0 @@
package rpc
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"github.com/ethereum/go-ethereum/log"
)
/*
#include <sys/un.h>
int max_socket_path_size() {
struct sockaddr_un s;
return sizeof(s.sun_path);
}
*/
import "C"
// ipcListen will create a Unix socket on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) {
if len(endpoint) > int(C.max_socket_path_size()) {
log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", C.max_socket_path_size()),
"endpoint", endpoint)
}
// Ensure the IPC path exists and remove any previous leftover
if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil {
return nil, err
}
os.Remove(endpoint)
l, err := net.Listen("unix", endpoint)
if err != nil {
return nil, err
}
os.Chmod(endpoint, 0600)
return l, nil
}
// newIPCConnection will connect to a Unix socket on the given endpoint.
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
return dialContext(ctx, "unix", endpoint)
}

@ -1,314 +0,0 @@
package rpc
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"reflect"
"strings"
"sync"
"time"
)
const (
vsn = "2.0"
serviceMethodSeparator = "_"
subscribeMethodSuffix = "_subscribe"
unsubscribeMethodSuffix = "_unsubscribe"
notificationMethodSuffix = "_subscription"
defaultWriteTimeout = 10 * time.Second // used if context has no deadline
)
var null = json.RawMessage("null")
type subscriptionResult struct {
ID string `json:"subscription"`
Result json.RawMessage `json:"result,omitempty"`
}
// A value of this type can a JSON-RPC request, notification, successful response or
// error response. Which one it is depends on the fields.
type jsonrpcMessage struct {
Version string `json:"jsonrpc,omitempty"`
ID json.RawMessage `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params json.RawMessage `json:"params,omitempty"`
Error *jsonError `json:"error,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
}
func (msg *jsonrpcMessage) isNotification() bool {
return msg.ID == nil && msg.Method != ""
}
func (msg *jsonrpcMessage) isCall() bool {
return msg.hasValidID() && msg.Method != ""
}
func (msg *jsonrpcMessage) isResponse() bool {
return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil)
}
func (msg *jsonrpcMessage) hasValidID() bool {
return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '['
}
func (msg *jsonrpcMessage) isSubscribe() bool {
return strings.HasSuffix(msg.Method, subscribeMethodSuffix)
}
func (msg *jsonrpcMessage) isUnsubscribe() bool {
return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix)
}
func (msg *jsonrpcMessage) namespace() string {
elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2)
return elem[0]
}
func (msg *jsonrpcMessage) String() string {
b, _ := json.Marshal(msg)
return string(b)
}
func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage {
resp := errorMessage(err)
resp.ID = msg.ID
return resp
}
func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
enc, err := json.Marshal(result)
if err != nil {
// TODO: wrap with 'internal server error'
return msg.errorResponse(err)
}
return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc}
}
func errorMessage(err error) *jsonrpcMessage {
msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{
Code: defaultErrorCode,
Message: err.Error(),
}}
ec, ok := err.(Error)
if ok {
msg.Error.Code = ec.ErrorCode()
}
return msg
}
type jsonError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
func (err *jsonError) Error() string {
if err.Message == "" {
return fmt.Sprintf("json-rpc error %d", err.Code)
}
return err.Message
}
func (err *jsonError) ErrorCode() int {
return err.Code
}
// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec.
type Conn interface {
io.ReadWriteCloser
SetWriteDeadline(time.Time) error
}
// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description
// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this
// description is used in log messages.
type ConnRemoteAddr interface {
RemoteAddr() string
}
// connWithRemoteAddr overrides the remote address of a connection.
type connWithRemoteAddr struct {
Conn
addr string
}
func (c connWithRemoteAddr) RemoteAddr() string { return c.addr }
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
// support for parsing arguments and serializing (result) objects.
type jsonCodec struct {
remoteAddr string
closer sync.Once // close closed channel once
closed chan interface{} // closed on Close
decode func(v interface{}) error // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encode func(v interface{}) error // encoder to allow multiple transports
conn Conn
}
// NewCodec creates a new RPC server codec with support for JSON-RPC 2.0 based
// on explicitly given encoding and decoding methods.
func NewCodec(conn Conn, encode, decode func(v interface{}) error) ServerCodec {
codec := &jsonCodec{
closed: make(chan interface{}),
encode: encode,
decode: decode,
conn: conn,
}
if ra, ok := conn.(ConnRemoteAddr); ok {
codec.remoteAddr = ra.RemoteAddr()
}
return codec
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0.
func NewJSONCodec(conn Conn) ServerCodec {
enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
return NewCodec(conn, enc.Encode, dec.Decode)
}
func (c *jsonCodec) RemoteAddr() string {
return c.remoteAddr
}
func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) {
// Decode the next JSON object in the input stream.
// This verifies basic syntax, etc.
var rawmsg json.RawMessage
if err := c.decode(&rawmsg); err != nil {
return nil, false, err
}
msg, batch = parseMessage(rawmsg)
return msg, batch, nil
}
// Write sends a message to client.
func (c *jsonCodec) Write(ctx context.Context, v interface{}) error {
c.encMu.Lock()
defer c.encMu.Unlock()
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encode(v)
}
// Close the underlying connection
func (c *jsonCodec) Close() {
c.closer.Do(func() {
close(c.closed)
c.conn.Close()
})
}
// Closed returns a channel which will be closed when Close is called
func (c *jsonCodec) Closed() <-chan interface{} {
return c.closed
}
// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error
// checks in this function because the raw message has already been syntax-checked when it
// is called. Any non-JSON-RPC messages in the input return the zero value of
// jsonrpcMessage.
func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) {
if !isBatch(raw) {
msgs := []*jsonrpcMessage{{}}
json.Unmarshal(raw, &msgs[0])
return msgs, false
}
dec := json.NewDecoder(bytes.NewReader(raw))
dec.Token() // skip '['
var msgs []*jsonrpcMessage
for dec.More() {
msgs = append(msgs, new(jsonrpcMessage))
dec.Decode(&msgs[len(msgs)-1])
}
return msgs, true
}
// isBatch returns true when the first non-whitespace characters is '['
func isBatch(raw json.RawMessage) bool {
for _, c := range raw {
// skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt)
if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d {
continue
}
return c == '['
}
return false
}
// parsePositionalArguments tries to parse the given args to an array of values with the
// given types. It returns the parsed values or an error when the args could not be
// parsed. Missing optional arguments are returned as reflect.Zero values.
func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) {
dec := json.NewDecoder(bytes.NewReader(rawArgs))
var args []reflect.Value
tok, err := dec.Token()
switch {
case err == io.EOF || tok == nil && err == nil:
// "params" is optional and may be empty. Also allow "params":null even though it's
// not in the spec because our own client used to send it.
case err != nil:
return nil, err
case tok == json.Delim('['):
// Read argument array.
if args, err = parseArgumentArray(dec, types); err != nil {
return nil, err
}
default:
return nil, errors.New("non-array args")
}
// Set any missing args to nil.
for i := len(args); i < len(types); i++ {
if types[i].Kind() != reflect.Ptr {
return nil, fmt.Errorf("missing value for required argument %d", i)
}
args = append(args, reflect.Zero(types[i]))
}
return args, nil
}
func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) {
args := make([]reflect.Value, 0, len(types))
for i := 0; dec.More(); i++ {
if i >= len(types) {
return args, fmt.Errorf("too many arguments, want at most %d", len(types))
}
argval := reflect.New(types[i])
if err := dec.Decode(argval.Interface()); err != nil {
return args, fmt.Errorf("invalid argument %d: %v", i, err)
}
if argval.IsNil() && types[i].Kind() != reflect.Ptr {
return args, fmt.Errorf("missing value for required argument %d", i)
}
args = append(args, argval.Elem())
}
// Read end of args array.
_, err := dec.Token()
return args, err
}
// parseSubscriptionName extracts the subscription name from an encoded argument array.
func parseSubscriptionName(rawArgs json.RawMessage) (string, error) {
dec := json.NewDecoder(bytes.NewReader(rawArgs))
if tok, _ := dec.Token(); tok != json.Delim('[') {
return "", errors.New("non-array args")
}
v, _ := dec.Token()
method, ok := v.(string)
if !ok {
return "", errors.New("expected subscription name as first argument")
}
return method, nil
}

@ -1,116 +0,0 @@
package rpc
import (
"context"
"io"
"sync/atomic"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/log"
)
const metadataAPI = "rpc"
// Server is an RPC server.
type Server struct {
services serviceRegistry
idgen func() ID
run int32
codecs mapset.Set
}
// NewServer creates a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1}
// Register the default service providing meta information about the RPC service such
// as the services and methods it offers.
rpcService := &Service{server}
server.RegisterName(metadataAPI, rpcService)
return server
}
// RegisterName creates a service for the given receiver type under the given name. When no
// methods on the given receiver match the criteria to be either a RPC method or a
// subscription an error is returned. Otherwise a new service is created and added to the
// service collection this server provides to clients.
func (s *Server) RegisterName(name string, receiver interface{}) error {
return s.services.registerName(name, receiver)
}
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
// the response back using the given codec. It will block until the codec is closed or the
// server is stopped. In either case the codec is closed.
func (s *Server) ServeCodec(codec ServerCodec) {
defer codec.Close()
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
}
// Add the codec to the set so it can be closed by Stop.
s.codecs.Add(codec)
defer s.codecs.Remove(codec)
c := initClient(codec, s.idgen, &s.services)
<-codec.Closed()
c.Close()
}
// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
return
}
h := newHandler(ctx, codec, s.idgen, &s.services)
h.allowSubscribe = false
defer h.close(io.EOF, nil)
reqs, batch, err := codec.Read()
if err != nil {
if err != io.EOF {
codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"}))
}
return
}
if batch {
h.handleBatch(reqs)
} else {
h.handleMsg(reqs[0])
}
}
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
log.Debug("RPC server shutting down")
s.codecs.Each(func(c interface{}) bool {
c.(ServerCodec).Close()
return true
})
}
}
// Service gives meta information about the server.
// e.g. gives information about the loaded modules.
type Service struct {
server *Server
}
// Modules returns the list of RPC services with their version number
func (s *Service) Modules() map[string]string {
s.server.services.mu.Lock()
defer s.server.services.mu.Unlock()
modules := make(map[string]string)
for name := range s.server.services.services {
modules[name] = "1.0"
}
return modules
}

@ -1,269 +0,0 @@
package rpc
import (
"context"
"errors"
"fmt"
"reflect"
"runtime"
"strings"
"sync"
"unicode"
"unicode/utf8"
"github.com/ethereum/go-ethereum/log"
)
var (
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
errorType = reflect.TypeOf((*error)(nil)).Elem()
subscriptionType = reflect.TypeOf(Subscription{})
stringType = reflect.TypeOf("")
)
type serviceRegistry struct {
mu sync.Mutex
services map[string]service
}
// service represents a registered object.
type service struct {
name string // name for service
callbacks map[string]*callback // registered handlers
subscriptions map[string]*callback // available subscriptions/notifications
}
// callback is a method callback which was registered in the server
type callback struct {
fn reflect.Value // the function
rcvr reflect.Value // receiver object of method, set if fn is method
argTypes []reflect.Type // input argument types
hasCtx bool // method's first argument is a context (not included in argTypes)
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // true if this is a subscription callback
}
func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
rcvrVal := reflect.ValueOf(rcvr)
if name == "" {
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
}
callbacks := suitableCallbacks(rcvrVal)
if len(callbacks) == 0 {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
r.mu.Lock()
defer r.mu.Unlock()
if r.services == nil {
r.services = make(map[string]service)
}
svc, ok := r.services[name]
if !ok {
svc = service{
name: name,
callbacks: make(map[string]*callback),
subscriptions: make(map[string]*callback),
}
r.services[name] = svc
}
for name, cb := range callbacks {
if cb.isSubscribe {
svc.subscriptions[name] = cb
} else {
svc.callbacks[name] = cb
}
}
return nil
}
// callback returns the callback corresponding to the given RPC method name.
func (r *serviceRegistry) callback(method string) *callback {
elem := strings.SplitN(method, serviceMethodSeparator, 2)
if len(elem) != 2 {
return nil
}
r.mu.Lock()
defer r.mu.Unlock()
return r.services[elem[0]].callbacks[elem[1]]
}
// subscription returns a subscription callback in the given service.
func (r *serviceRegistry) subscription(service, name string) *callback {
r.mu.Lock()
defer r.mu.Unlock()
return r.services[service].subscriptions[name]
}
// suitableCallbacks iterates over the methods of the given type. It determines if a method
// satisfies the criteria for a RPC callback or a subscription callback and adds it to the
// collection of callbacks. See server documentation for a summary of these criteria.
func suitableCallbacks(receiver reflect.Value) map[string]*callback {
typ := receiver.Type()
callbacks := make(map[string]*callback)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
if method.PkgPath != "" {
continue // method not exported
}
cb := newCallback(receiver, method.Func)
if cb == nil {
continue // function invalid
}
name := formatName(method.Name)
callbacks[name] = cb
}
return callbacks
}
// newCallback turns fn (a function) into a callback object. It returns nil if the function
// is unsuitable as an RPC callback.
func newCallback(receiver, fn reflect.Value) *callback {
fntype := fn.Type()
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)}
// Determine parameter types. They must all be exported or builtin types.
c.makeArgTypes()
if !allExportedOrBuiltin(c.argTypes) {
return nil
}
// Verify return types. The function must return at most one error
// and/or one other non-error value.
outs := make([]reflect.Type, fntype.NumOut())
for i := 0; i < fntype.NumOut(); i++ {
outs[i] = fntype.Out(i)
}
if len(outs) > 2 || !allExportedOrBuiltin(outs) {
return nil
}
// If an error is returned, it must be the last returned value.
switch {
case len(outs) == 1 && isErrorType(outs[0]):
c.errPos = 0
case len(outs) == 2:
if isErrorType(outs[0]) || !isErrorType(outs[1]) {
return nil
}
c.errPos = 1
}
return c
}
// makeArgTypes composes the argTypes list.
func (c *callback) makeArgTypes() {
fntype := c.fn.Type()
// Skip receiver and context.Context parameter (if present).
firstArg := 0
if c.rcvr.IsValid() {
firstArg++
}
if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType {
c.hasCtx = true
firstArg++
}
// Add all remaining parameters.
c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg)
for i := firstArg; i < fntype.NumIn(); i++ {
c.argTypes[i-firstArg] = fntype.In(i)
}
}
// call invokes the callback.
func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) {
// Create the argument slice.
fullargs := make([]reflect.Value, 0, 2+len(args))
if c.rcvr.IsValid() {
fullargs = append(fullargs, c.rcvr)
}
if c.hasCtx {
fullargs = append(fullargs, reflect.ValueOf(ctx))
}
fullargs = append(fullargs, args...)
// Catch panic while running the callback.
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf))
errRes = errors.New("method handler crashed")
}
}()
// Run the callback.
results := c.fn.Call(fullargs)
if len(results) == 0 {
return nil, nil
}
if c.errPos >= 0 && !results[c.errPos].IsNil() {
// Method has returned non-nil error value.
err := results[c.errPos].Interface().(error)
return reflect.Value{}, err
}
return results[0].Interface(), nil
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Are all those types exported or built-in?
func allExportedOrBuiltin(types []reflect.Type) bool {
for _, typ := range types {
for typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
if !isExported(typ.Name()) && typ.PkgPath() != "" {
return false
}
}
return true
}
// Is t context.Context or *context.Context?
func isContextType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == contextType
}
// Does t satisfy the error interface?
func isErrorType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.Implements(errorType)
}
// Is t Subscription or *Subscription?
func isSubscriptionType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == subscriptionType
}
// isPubSub tests whether the given method has as as first argument a context.Context and
// returns the pair (Subscription, error).
func isPubSub(methodType reflect.Type) bool {
// numIn(0) is the receiver type
if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
return false
}
return isContextType(methodType.In(1)) &&
isSubscriptionType(methodType.Out(0)) &&
isErrorType(methodType.Out(1))
}
// formatName converts to first character of name to lowercase.
func formatName(name string) string {
ret := []rune(name)
if len(ret) > 0 {
ret[0] = unicode.ToLower(ret[0])
}
return string(ret)
}

@ -1,50 +0,0 @@
package rpc
import (
"context"
"errors"
"io"
"net"
"os"
"time"
)
// DialStdIO creates a client on stdin/stdout.
func DialStdIO(ctx context.Context) (*Client, error) {
return DialIO(ctx, os.Stdin, os.Stdout)
}
// DialIO creates a client which uses the given IO channels
func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) {
return newClient(ctx, func(_ context.Context) (ServerCodec, error) {
return NewJSONCodec(stdioConn{
in: in,
out: out,
}), nil
})
}
type stdioConn struct {
in io.Reader
out io.Writer
}
func (io stdioConn) Read(b []byte) (n int, err error) {
return io.in.Read(b)
}
func (io stdioConn) Write(b []byte) (n int, err error) {
return io.out.Write(b)
}
func (io stdioConn) Close() error {
return nil
}
func (io stdioConn) RemoteAddr() string {
return "/dev/stdin"
}
func (io stdioConn) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}

@ -1,311 +0,0 @@
package rpc
import (
"bufio"
"container/list"
"context"
crand "crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"math/rand"
"reflect"
"strings"
"sync"
"time"
)
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported = errors.New("notifications not supported")
// ErrSubscriptionNotFound is returned when the subscription for the given id is not found
ErrSubscriptionNotFound = errors.New("subscription not found")
)
var globalGen = randomIDGenerator()
// ID defines a pseudo random number that is used to identify RPC subscriptions.
type ID string
// NewID returns a new, random ID.
func NewID() ID {
return globalGen()
}
// randomIDGenerator returns a function generates a random IDs.
func randomIDGenerator() func() ID {
seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader))
if err != nil {
seed = int64(time.Now().Nanosecond())
}
var (
mu sync.Mutex
rng = rand.New(rand.NewSource(seed))
)
return func() ID {
mu.Lock()
defer mu.Unlock()
id := make([]byte, 16)
rng.Read(id)
return encodeID(id)
}
}
func encodeID(b []byte) ID {
id := hex.EncodeToString(b)
id = strings.TrimLeft(id, "0")
if id == "" {
id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
}
return ID("0x" + id)
}
type notifierKey struct{}
// NotifierFromContext returns the Notifier value stored in ctx, if any.
func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
n, ok := ctx.Value(notifierKey{}).(*Notifier)
return n, ok
}
// Notifier is tied to a RPC connection that supports subscriptions.
// Server callbacks use the notifier to send notifications.
type Notifier struct {
h *handler
namespace string
mu sync.Mutex
sub *Subscription
buffer []json.RawMessage
callReturned bool
activated bool
}
// CreateSubscription returns a new subscription that is coupled to the
// RPC connection. By default subscriptions are inactive and notifications
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
n.mu.Lock()
defer n.mu.Unlock()
if n.sub != nil {
panic("can't create multiple subscriptions with Notifier")
} else if n.callReturned {
panic("can't create subscription after subscribe call has returned")
}
n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)}
return n.sub
}
// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
enc, err := json.Marshal(data)
if err != nil {
return err
}
n.mu.Lock()
defer n.mu.Unlock()
if n.sub == nil {
panic("can't Notify before subscription is created")
} else if n.sub.ID != id {
panic("Notify with wrong ID")
}
if n.activated {
return n.send(n.sub, enc)
}
n.buffer = append(n.buffer, enc)
return nil
}
// Closed returns a channel that is closed when the RPC connection is closed.
// Deprecated: use subscription error channel
func (n *Notifier) Closed() <-chan interface{} {
return n.h.conn.Closed()
}
// takeSubscription returns the subscription (if one has been created). No subscription can
// be created after this call.
func (n *Notifier) takeSubscription() *Subscription {
n.mu.Lock()
defer n.mu.Unlock()
n.callReturned = true
return n.sub
}
// acticate is called after the subscription ID was sent to client. Notifications are
// buffered before activation. This prevents notifications being sent to the client before
// the subscription ID is sent to the client.
func (n *Notifier) activate() error {
n.mu.Lock()
defer n.mu.Unlock()
for _, data := range n.buffer {
if err := n.send(n.sub, data); err != nil {
return err
}
}
n.activated = true
return nil
}
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data})
ctx := context.Background()
return n.h.conn.Write(ctx, &jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: params,
})
}
// A Subscription is created by a notifier and tight to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
ID ID
namespace string
err chan error // closed on unsubscribe
}
// Err returns a channel that is closed when the client send an unsubscribe request.
func (s *Subscription) Err() <-chan error {
return s.err
}
// MarshalJSON marshals a subscription as its ID.
func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.ID)
}
// ClientSubscription is a subscription established through the Client's Subscribe or
// EthSubscribe methods.
type ClientSubscription struct {
client *Client
etype reflect.Type
channel reflect.Value
namespace string
subid string
in chan json.RawMessage
quitOnce sync.Once // ensures quit is closed once
quit chan struct{} // quit is closed when the subscription exits
errOnce sync.Once // ensures err is closed once
err chan error
}
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
namespace: namespace,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
in: make(chan json.RawMessage),
}
return sub
}
// Err returns the subscription error channel. The intended use of Err is to schedule
// resubscription when the client connection is closed unexpectedly.
//
// The error channel receives a value when the subscription has ended due
// to an error. The received error is nil if Close has been called
// on the underlying client and no other error has occurred.
//
// The error channel is closed when Unsubscribe is called on the subscription.
func (sub *ClientSubscription) Err() <-chan error {
return sub.err
}
// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
sub.quitWithError(true, nil)
sub.errOnce.Do(func() { close(sub.err) })
}
func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) {
sub.quitOnce.Do(func() {
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
// unblocks deliver.
close(sub.quit)
if unsubscribeServer {
sub.requestUnsubscribe()
}
if err != nil {
if err == errClientQuit {
err = nil // Adhere to subscription semantics.
}
sub.err <- err
}
})
}
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select {
case sub.in <- result:
return true
case <-sub.quit:
return false
}
}
func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}
buffer := list.New()
defer buffer.Init()
for {
var chosen int
var recv reflect.Value
if buffer.Len() == 0 {
// Idle, omit send case.
chosen, recv, _ = reflect.Select(cases[:2])
} else {
// Non-empty buffer, send the first queued item.
cases[2].Send = reflect.ValueOf(buffer.Front().Value)
chosen, recv, _ = reflect.Select(cases)
}
switch chosen {
case 0: // <-sub.quit
return false, nil
case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil {
return true, err
}
if buffer.Len() == maxClientSubscriptionBuffer {
return true, errSubscriptionQueueOverflow
}
buffer.PushBack(val)
case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value.
buffer.Remove(buffer.Front())
}
}
}
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) {
val := reflect.New(sub.etype)
err := json.Unmarshal(result, val.Interface())
return val.Elem().Interface(), err
}
func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}

@ -1,94 +0,0 @@
package rpc
import (
"context"
"fmt"
"math"
"strings"
"github.com/ethereum/go-ethereum/common/hexutil"
)
// API describes the set of methods offered over the RPC interface
type API struct {
Namespace string // namespace under which the rpc methods of Service are exposed
Version string // api version for DApp's
Service interface{} // receiver instance which holds the methods
Public bool // indication if the methods must be considered safe for public use
}
// Error wraps RPC errors, which contain an error code in addition to the message.
type Error interface {
Error() string // returns the message
ErrorCode() int // returns the code
}
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
// a RPC session. Implementations must be go-routine safe since the codec can be called in
// multiple go-routines concurrently.
type ServerCodec interface {
Read() (msgs []*jsonrpcMessage, isBatch bool, err error)
Close()
jsonWriter
}
// jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use.
type jsonWriter interface {
Write(context.Context, interface{}) error
// Closed returns a channel which is closed when the connection is closed.
Closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection.
RemoteAddr() string
}
// BlockNumber ...
type BlockNumber int64
const (
// PendingBlockNumber ...
PendingBlockNumber = BlockNumber(-2)
latestBlockNumber = BlockNumber(-1)
earliestBlockNumber = BlockNumber(0)
)
// UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports:
// - "latest", "earliest" or "pending" as string arguments
// - the block number
// Returned errors:
// - an invalid block number error when the given argument isn't a known strings
// - an out of range error when the given block number is either too little or too large
func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
input := strings.TrimSpace(string(data))
if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' {
input = input[1 : len(input)-1]
}
switch input {
case "earliest":
*bn = earliestBlockNumber
return nil
case "latest":
*bn = latestBlockNumber
return nil
case "pending":
*bn = PendingBlockNumber
return nil
}
blckNum, err := hexutil.DecodeUint64(input)
if err != nil {
return err
}
if blckNum > math.MaxInt64 {
return fmt.Errorf("Blocknumber too high")
}
*bn = BlockNumber(blckNum)
return nil
}
// Int64 turns blockNumber to int64
func (bn BlockNumber) Int64() int64 {
return (int64)(bn)
}

@ -1,222 +0,0 @@
package rpc
import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/log"
"golang.org/x/net/websocket"
)
// websocketJSONCodec is a custom JSON codec with payload size enforcement and
// special number parsing.
var websocketJSONCodec = websocket.Codec{
// Marshal is the stock JSON marshaller used by the websocket library too.
Marshal: func(v interface{}) ([]byte, byte, error) {
msg, err := json.Marshal(v)
return msg, websocket.TextFrame, err
},
// Unmarshal is a specialized unmarshaller to properly convert numbers.
Unmarshal: func(msg []byte, payloadType byte, v interface{}) error {
dec := json.NewDecoder(bytes.NewReader(msg))
dec.UseNumber()
return dec.Decode(v)
},
}
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
//
// allowedOrigins should be a comma-separated list of allowed origin URLs.
// To allow connections with any origin, pass "*".
func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler {
return websocket.Server{
Handshake: wsHandshakeValidator(allowedOrigins),
Handler: func(conn *websocket.Conn) {
codec := newWebsocketCodec(conn)
s.ServeCodec(codec)
},
}
}
func newWebsocketCodec(conn *websocket.Conn) ServerCodec {
// Create a custom encode/decode pair to enforce payload size and number encoding
conn.MaxPayloadBytes = maxRequestContentLength
encoder := func(v interface{}) error {
return websocketJSONCodec.Send(conn, v)
}
decoder := func(v interface{}) error {
return websocketJSONCodec.Receive(conn, v)
}
rpcconn := Conn(conn)
if conn.IsServerConn() {
// Override remote address with the actual socket address because
// package websocket crashes if there is no request origin.
addr := conn.Request().RemoteAddr
if wsaddr := conn.RemoteAddr().(*websocket.Addr); wsaddr.URL != nil {
// Add origin if present.
addr += "(" + wsaddr.URL.String() + ")"
}
rpcconn = connWithRemoteAddr{conn, addr}
}
return NewCodec(rpcconn, encoder, decoder)
}
// NewWSServer creates a new websocket RPC server around an API provider.
//
// Deprecated: use Server.WebsocketHandler
func NewWSServer(allowedOrigins []string, srv *Server) *http.Server {
return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
}
// wsHandshakeValidator returns a handler that verifies the origin during the
// websocket upgrade process. When a '*' is specified as an allowed origins all
// connections are accepted.
func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error {
origins := mapset.NewSet()
allowAllOrigins := false
for _, origin := range allowedOrigins {
if origin == "*" {
allowAllOrigins = true
}
if origin != "" {
origins.Add(strings.ToLower(origin))
}
}
// allow localhost if no allowedOrigins are specified.
if len(origins.ToSlice()) == 0 {
origins.Add("http://localhost")
if hostname, err := os.Hostname(); err == nil {
origins.Add("http://" + strings.ToLower(hostname))
}
}
log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice()))
f := func(cfg *websocket.Config, req *http.Request) error {
// Skip origin verification if no Origin header is present. The origin check
// is supposed to protect against browser based attacks. Browsers always set
// Origin. Non-browser software can put anything in origin and checking it doesn't
// provide additional security.
if _, ok := req.Header["Origin"]; !ok {
return nil
}
// Verify origin against whitelist.
origin := strings.ToLower(req.Header.Get("Origin"))
if allowAllOrigins || origins.Contains(origin) {
return nil
}
log.Warn("Rejected WebSocket connection", "origin", origin)
return errors.New("origin not allowed")
}
return f
}
func wsGetConfig(endpoint, origin string) (*websocket.Config, error) {
if origin == "" {
var err error
if origin, err = os.Hostname(); err != nil {
return nil, err
}
if strings.HasPrefix(endpoint, "wss") {
origin = "https://" + strings.ToLower(origin)
} else {
origin = "http://" + strings.ToLower(origin)
}
}
config, err := websocket.NewConfig(endpoint, origin)
if err != nil {
return nil, err
}
if config.Location.User != nil {
b64auth := base64.StdEncoding.EncodeToString([]byte(config.Location.User.String()))
config.Header.Add("Authorization", "Basic "+b64auth)
config.Location.User = nil
}
return config, nil
}
// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
// that is listening on the given endpoint.
//
// The context is used for the initial connection establishment. It does not
// affect subsequent interactions with the client.
func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
config, err := wsGetConfig(endpoint, origin)
if err != nil {
return nil, err
}
return newClient(ctx, func(ctx context.Context) (ServerCodec, error) {
conn, err := wsDialContext(ctx, config)
if err != nil {
return nil, err
}
return newWebsocketCodec(conn), nil
})
}
func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
var conn net.Conn
var err error
switch config.Location.Scheme {
case "ws":
conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
case "wss":
dialer := contextDialer(ctx)
conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
default:
err = websocket.ErrBadScheme
}
if err != nil {
return nil, err
}
ws, err := websocket.NewClient(config, conn)
if err != nil {
conn.Close()
return nil, err
}
return ws, err
}
var wsPortMap = map[string]string{"ws": "80", "wss": "443"}
func wsDialAddress(location *url.URL) string {
if _, ok := wsPortMap[location.Scheme]; ok {
if _, _, err := net.SplitHostPort(location.Host); err != nil {
return net.JoinHostPort(location.Host, wsPortMap[location.Scheme])
}
}
return location.Host
}
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
d := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
return d.DialContext(ctx, network, addr)
}
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
Loading…
Cancel
Save