commit
4d8779a2c3
@ -1,128 +0,0 @@ |
|||||||
package rpcservice |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"fmt" |
|
||||||
"net" |
|
||||||
"net/http" |
|
||||||
"strconv" |
|
||||||
|
|
||||||
msg_pb "github.com/harmony-one/harmony/api/proto/message" |
|
||||||
"github.com/harmony-one/harmony/core" |
|
||||||
"github.com/harmony-one/harmony/internal/utils" |
|
||||||
"github.com/harmony-one/harmony/p2p" |
|
||||||
"github.com/harmony-one/harmony/rpc" |
|
||||||
"github.com/harmony-one/harmony/rpc/hmyapi" |
|
||||||
) |
|
||||||
|
|
||||||
const ( |
|
||||||
rpcPortDiff = 123 |
|
||||||
) |
|
||||||
|
|
||||||
// Service is the struct for rpc service.
|
|
||||||
type Service struct { |
|
||||||
messageChan chan *msg_pb.Message |
|
||||||
server *http.Server |
|
||||||
// Util
|
|
||||||
peer *p2p.Peer |
|
||||||
blockchain *core.BlockChain |
|
||||||
txPool *core.TxPool |
|
||||||
// HTTP RPC
|
|
||||||
rpcAPIs []rpc.API // List of APIs currently provided by the node
|
|
||||||
httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
|
|
||||||
httpWhitelist []string // HTTP RPC modules to allow through this endpoint
|
|
||||||
httpListener net.Listener // HTTP RPC listener socket to server API requests
|
|
||||||
httpHandler *rpc.Server // HTTP RPC request handler to process the API requests
|
|
||||||
} |
|
||||||
|
|
||||||
// New returns RPC service.
|
|
||||||
func New(b *core.BlockChain, p *p2p.Peer, t *core.TxPool) *Service { |
|
||||||
return &Service{ |
|
||||||
blockchain: b, |
|
||||||
peer: p, |
|
||||||
txPool: t, |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// StartService starts RPC service.
|
|
||||||
func (s *Service) StartService() { |
|
||||||
utils.GetLogInstance().Info("Starting RPC service.") |
|
||||||
if err := s.startRPC(); err != nil { |
|
||||||
utils.GetLogInstance().Error("Failed to start RPC service.") |
|
||||||
} else { |
|
||||||
utils.GetLogInstance().Info("Started RPC service.") |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// StopService shutdowns RPC service.
|
|
||||||
func (s *Service) StopService() { |
|
||||||
utils.GetLogInstance().Info("Shutting down RPC service.") |
|
||||||
if err := s.server.Shutdown(context.Background()); err != nil { |
|
||||||
utils.GetLogInstance().Error("Error when shutting down RPC server", "error", err) |
|
||||||
} else { |
|
||||||
utils.GetLogInstance().Error("Shutting down RPC server successufully") |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// NotifyService notify service
|
|
||||||
func (s *Service) NotifyService(params map[string]interface{}) { |
|
||||||
return |
|
||||||
} |
|
||||||
|
|
||||||
// SetMessageChan sets up message channel to service.
|
|
||||||
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { |
|
||||||
s.messageChan = messageChan |
|
||||||
} |
|
||||||
|
|
||||||
// startRPC is a helper method to start all the various RPC endpoint during node
|
|
||||||
// startup. It's not meant to be called at any time afterwards as it makes certain
|
|
||||||
// assumptions about the state of the node.
|
|
||||||
func (s *Service) startRPC() error { |
|
||||||
apis := hmyapi.GetAPIs(rpc.NewBackend(s.blockchain, s.txPool)) |
|
||||||
|
|
||||||
port, _ := strconv.Atoi(s.peer.Port) |
|
||||||
s.httpEndpoint = fmt.Sprintf("localhost:%v", port+rpcPortDiff) |
|
||||||
if err := s.startHTTP(s.httpEndpoint, apis); err != nil { |
|
||||||
utils.GetLogInstance().Debug("Failed to start RPC HTTP") |
|
||||||
return err |
|
||||||
} |
|
||||||
utils.GetLogInstance().Debug("Started RPC HTTP") |
|
||||||
|
|
||||||
// All API endpoints started successfully
|
|
||||||
s.rpcAPIs = apis |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
// startHTTP initializes and starts the HTTP RPC endpoint.
|
|
||||||
func (s *Service) startHTTP(endpoint string, apis []rpc.API) error { |
|
||||||
utils.GetLogInstance().Debug("rpc startHTTP", "endpoint", endpoint, "apis", apis) |
|
||||||
// Short circuit if the HTTP endpoint isn't being exposed
|
|
||||||
if endpoint == "" { |
|
||||||
return nil |
|
||||||
} |
|
||||||
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
utils.GetLogInstance().Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint)) |
|
||||||
// All listeners booted successfully
|
|
||||||
s.httpEndpoint = endpoint |
|
||||||
s.httpListener = listener |
|
||||||
s.httpHandler = handler |
|
||||||
|
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
// stopHTTP terminates the HTTP RPC endpoint.
|
|
||||||
func (s *Service) stopHTTP() { |
|
||||||
if s.httpListener != nil { |
|
||||||
s.httpListener.Close() |
|
||||||
s.httpListener = nil |
|
||||||
|
|
||||||
utils.GetLogInstance().Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", s.httpEndpoint)) |
|
||||||
} |
|
||||||
if s.httpHandler != nil { |
|
||||||
s.httpHandler.Stop() |
|
||||||
s.httpHandler = nil |
|
||||||
} |
|
||||||
} |
|
@ -1,22 +1,21 @@ |
|||||||
package hmyapi |
package hmyapi |
||||||
|
|
||||||
import ( |
import ( |
||||||
"github.com/harmony-one/harmony/rpc" |
"github.com/ethereum/go-ethereum/rpc" |
||||||
|
"github.com/harmony-one/harmony/core" |
||||||
) |
) |
||||||
|
|
||||||
const namespace = "hmy" |
|
||||||
|
|
||||||
// GetAPIs returns all the APIs.
|
// GetAPIs returns all the APIs.
|
||||||
func GetAPIs(b *rpc.HmyAPIBackend) []rpc.API { |
func GetAPIs(b *core.HmyAPIBackend) []rpc.API { |
||||||
nonceLock := new(AddrLocker) |
nonceLock := new(AddrLocker) |
||||||
return []rpc.API{ |
return []rpc.API{ |
||||||
{ |
{ |
||||||
Namespace: namespace, |
Namespace: "hmy", |
||||||
Version: "1.0", |
Version: "1.0", |
||||||
Service: NewPublicBlockChainAPI(b), |
Service: NewPublicBlockChainAPI(b), |
||||||
Public: true, |
Public: true, |
||||||
}, { |
}, { |
||||||
Namespace: namespace, |
Namespace: "hmy", |
||||||
Version: "1.0", |
Version: "1.0", |
||||||
Service: NewPublicTransactionPoolAPI(b, nonceLock), |
Service: NewPublicTransactionPoolAPI(b, nonceLock), |
||||||
Public: true, |
Public: true, |
@ -0,0 +1,133 @@ |
|||||||
|
package node |
||||||
|
|
||||||
|
import ( |
||||||
|
"fmt" |
||||||
|
"net" |
||||||
|
"strconv" |
||||||
|
"strings" |
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/log" |
||||||
|
"github.com/ethereum/go-ethereum/rpc" |
||||||
|
"github.com/harmony-one/harmony/core" |
||||||
|
"github.com/harmony-one/harmony/internal/hmyapi" |
||||||
|
) |
||||||
|
|
||||||
|
const ( |
||||||
|
rpcHTTPPortOffset = 10 |
||||||
|
rpcWSPortOffset = 20 |
||||||
|
) |
||||||
|
|
||||||
|
var ( |
||||||
|
// HTTP RPC
|
||||||
|
rpcAPIs []rpc.API |
||||||
|
httpListener net.Listener |
||||||
|
httpHandler *rpc.Server |
||||||
|
|
||||||
|
wsListener net.Listener |
||||||
|
wsHandler *rpc.Server |
||||||
|
|
||||||
|
httpEndpoint = "" |
||||||
|
wsEndpoint = "" |
||||||
|
|
||||||
|
httpModules = []string{"hmy"} |
||||||
|
httpVirtualHosts = []string{"*"} |
||||||
|
httpTimeouts = rpc.DefaultHTTPTimeouts |
||||||
|
|
||||||
|
wsModules = []string{"net", "web3"} |
||||||
|
wsOrigins = []string{"*"} |
||||||
|
|
||||||
|
apiBackend *core.HmyAPIBackend |
||||||
|
) |
||||||
|
|
||||||
|
// StartRPC start RPC service
|
||||||
|
func (node *Node) StartRPC(nodePort string) error { |
||||||
|
// Gather all the possible APIs to surface
|
||||||
|
apiBackend = core.NewBackend(node.blockchain, node.TxPool) |
||||||
|
|
||||||
|
apis := hmyapi.GetAPIs(apiBackend) |
||||||
|
for _, service := range node.serviceManager.GetServices() { |
||||||
|
apis = append(apis, service.APIs()...) |
||||||
|
} |
||||||
|
|
||||||
|
port, _ := strconv.Atoi(nodePort) |
||||||
|
|
||||||
|
httpEndpoint = fmt.Sprintf(":%v", port+rpcHTTPPortOffset) |
||||||
|
if err := node.startHTTP(httpEndpoint, apis, httpModules, nil, httpVirtualHosts, httpTimeouts); err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
wsEndpoint = fmt.Sprintf(":%v", port+rpcWSPortOffset) |
||||||
|
if err := node.startWS(wsEndpoint, apis, wsModules, wsOrigins, true); err != nil { |
||||||
|
node.stopHTTP() |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
rpcAPIs = apis |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// startHTTP initializes and starts the HTTP RPC endpoint.
|
||||||
|
func (node *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error { |
||||||
|
// Short circuit if the HTTP endpoint isn't being exposed
|
||||||
|
if endpoint == "" { |
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
|
||||||
|
log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ",")) |
||||||
|
// All listeners booted successfully
|
||||||
|
httpListener = listener |
||||||
|
httpHandler = handler |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// stopHTTP terminates the HTTP RPC endpoint.
|
||||||
|
func (node *Node) stopHTTP() { |
||||||
|
if httpListener != nil { |
||||||
|
httpListener.Close() |
||||||
|
httpListener = nil |
||||||
|
|
||||||
|
log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", httpEndpoint)) |
||||||
|
} |
||||||
|
if httpHandler != nil { |
||||||
|
httpHandler.Stop() |
||||||
|
httpHandler = nil |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// startWS initializes and starts the websocket RPC endpoint.
|
||||||
|
func (node *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error { |
||||||
|
// Short circuit if the WS endpoint isn't being exposed
|
||||||
|
if endpoint == "" { |
||||||
|
return nil |
||||||
|
} |
||||||
|
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll) |
||||||
|
if err != nil { |
||||||
|
return err |
||||||
|
} |
||||||
|
log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr())) |
||||||
|
// All listeners booted successfully
|
||||||
|
wsListener = listener |
||||||
|
wsHandler = handler |
||||||
|
|
||||||
|
return nil |
||||||
|
} |
||||||
|
|
||||||
|
// stopWS terminates the websocket RPC endpoint.
|
||||||
|
func (node *Node) stopWS() { |
||||||
|
if wsListener != nil { |
||||||
|
wsListener.Close() |
||||||
|
wsListener = nil |
||||||
|
|
||||||
|
log.Info("WebSocket endpoint closed", "url", fmt.Sprintf("ws://%s", wsEndpoint)) |
||||||
|
} |
||||||
|
if wsHandler != nil { |
||||||
|
wsHandler.Stop() |
||||||
|
wsHandler = nil |
||||||
|
} |
||||||
|
} |
@ -1,32 +0,0 @@ |
|||||||
package node |
|
||||||
|
|
||||||
import ( |
|
||||||
"github.com/ethereum/go-ethereum/p2p" |
|
||||||
"github.com/harmony-one/harmony/rpc" |
|
||||||
) |
|
||||||
|
|
||||||
// Service is an individual protocol that can be registered into a node.
|
|
||||||
//
|
|
||||||
// Notes:
|
|
||||||
//
|
|
||||||
// • Service life-cycle management is delegated to the node. The service is allowed to
|
|
||||||
// initialize itself upon creation, but no goroutines should be spun up outside of the
|
|
||||||
// Start method.
|
|
||||||
//
|
|
||||||
// • Restart logic is not required as the node will create a fresh instance
|
|
||||||
// every time a service is started.
|
|
||||||
type Service interface { |
|
||||||
// Protocols retrieves the P2P protocols the service wishes to start.
|
|
||||||
Protocols() []p2p.Protocol |
|
||||||
|
|
||||||
// APIs retrieves the list of RPC descriptors the service provides
|
|
||||||
APIs() []rpc.API |
|
||||||
|
|
||||||
// Start is called after all services have been constructed and the networking
|
|
||||||
// layer was also initialized to spawn any goroutines required by the service.
|
|
||||||
Start(server *p2p.Server) error |
|
||||||
|
|
||||||
// Stop terminates all goroutines belonging to the service, blocking until they
|
|
||||||
// are all terminated.
|
|
||||||
Stop() error |
|
||||||
} |
|
@ -1,20 +0,0 @@ |
|||||||
Files that are mainly ported over from go-ethereum (might include tslint fix): |
|
||||||
* rpc/client.go: the rpc client. |
|
||||||
* rpc/endpoints.go: starting HTTP/WS/IPC endpoints. |
|
||||||
* rpc/errors.go: error code handling. |
|
||||||
* rpc/handler.go: handling requests, running methods. |
|
||||||
* rpc/http.go: starting HTTP service. |
|
||||||
* rpc/ipc_unix.go: util for ipc unix |
|
||||||
* rpc/ipc.go: util for general ipc |
|
||||||
* rpc/json.go: json de-/serialization |
|
||||||
* rpc/server.go: the rpc server |
|
||||||
* rpc/stdio.go: stdio for ipc. |
|
||||||
* rpc/subscription.go: ws subscriptions. |
|
||||||
* rpc/types.go: type declarations |
|
||||||
* rpc/websocket.go: ws connection. |
|
||||||
* rpc/hmyapi/addrlock.go |
|
||||||
|
|
||||||
Richard's changes: |
|
||||||
* rpc/service.go: our service (http/ws/ipc) starter/stoper |
|
||||||
* other files under `rpc/hmyapi/`. |
|
||||||
* other files that are not in `rpc` folder. Added some util functions. |
|
@ -1,623 +0,0 @@ |
|||||||
// Copyright 2016 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"context" |
|
||||||
"encoding/json" |
|
||||||
"errors" |
|
||||||
"fmt" |
|
||||||
"net/url" |
|
||||||
"reflect" |
|
||||||
"strconv" |
|
||||||
"sync/atomic" |
|
||||||
"time" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
) |
|
||||||
|
|
||||||
var ( |
|
||||||
errClientQuit = errors.New("client is closed") |
|
||||||
errNoResult = errors.New("no result in JSON-RPC response") |
|
||||||
errSubscriptionQueueOverflow = errors.New("subscription queue overflow") |
|
||||||
errClientReconnected = errors.New("client reconnected") |
|
||||||
errDead = errors.New("connection lost") |
|
||||||
) |
|
||||||
|
|
||||||
const ( |
|
||||||
// Timeouts
|
|
||||||
tcpKeepAliveInterval = 30 * time.Second |
|
||||||
defaultDialTimeout = 10 * time.Second // used if context has no deadline
|
|
||||||
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
|
|
||||||
) |
|
||||||
|
|
||||||
const ( |
|
||||||
// Subscriptions are removed when the subscriber cannot keep up.
|
|
||||||
//
|
|
||||||
// This can be worked around by supplying a channel with sufficiently sized buffer,
|
|
||||||
// but this can be inconvenient and hard to explain in the docs. Another issue with
|
|
||||||
// buffered channels is that the buffer is static even though it might not be needed
|
|
||||||
// most of the time.
|
|
||||||
//
|
|
||||||
// The approach taken here is to maintain a per-subscription linked list buffer
|
|
||||||
// shrinks on demand. If the buffer reaches the size below, the subscription is
|
|
||||||
// dropped.
|
|
||||||
maxClientSubscriptionBuffer = 20000 |
|
||||||
) |
|
||||||
|
|
||||||
// BatchElem is an element in a batch request.
|
|
||||||
type BatchElem struct { |
|
||||||
Method string |
|
||||||
Args []interface{} |
|
||||||
// The result is unmarshaled into this field. Result must be set to a
|
|
||||||
// non-nil pointer value of the desired type, otherwise the response will be
|
|
||||||
// discarded.
|
|
||||||
Result interface{} |
|
||||||
// Error is set if the server returns an error for this request, or if
|
|
||||||
// unmarshaling into Result fails. It is not set for I/O errors.
|
|
||||||
Error error |
|
||||||
} |
|
||||||
|
|
||||||
// Client represents a connection to an RPC server.
|
|
||||||
type Client struct { |
|
||||||
idgen func() ID // for subscriptions
|
|
||||||
isHTTP bool |
|
||||||
services *serviceRegistry |
|
||||||
|
|
||||||
idCounter uint32 |
|
||||||
|
|
||||||
// This function, if non-nil, is called when the connection is lost.
|
|
||||||
reconnectFunc reconnectFunc |
|
||||||
|
|
||||||
// writeConn is used for writing to the connection on the caller's goroutine. It should
|
|
||||||
// only be accessed outside of dispatch, with the write lock held. The write lock is
|
|
||||||
// taken by sending on requestOp and released by sending on sendDone.
|
|
||||||
writeConn jsonWriter |
|
||||||
|
|
||||||
// for dispatch
|
|
||||||
close chan struct{} |
|
||||||
closing chan struct{} // closed when client is quitting
|
|
||||||
didClose chan struct{} // closed when client quits
|
|
||||||
reconnected chan ServerCodec // where write/reconnect sends the new connection
|
|
||||||
readOp chan readOp // read messages
|
|
||||||
readErr chan error // errors from read
|
|
||||||
reqInit chan *requestOp // register response IDs, takes write lock
|
|
||||||
reqSent chan error // signals write completion, releases write lock
|
|
||||||
reqTimeout chan *requestOp // removes response IDs when call timeout expires
|
|
||||||
} |
|
||||||
|
|
||||||
type reconnectFunc func(ctx context.Context) (ServerCodec, error) |
|
||||||
|
|
||||||
type clientContextKey struct{} |
|
||||||
|
|
||||||
type clientConn struct { |
|
||||||
codec ServerCodec |
|
||||||
handler *handler |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) newClientConn(conn ServerCodec) *clientConn { |
|
||||||
ctx := context.WithValue(context.Background(), clientContextKey{}, c) |
|
||||||
handler := newHandler(ctx, conn, c.idgen, c.services) |
|
||||||
return &clientConn{conn, handler} |
|
||||||
} |
|
||||||
|
|
||||||
func (cc *clientConn) close(err error, inflightReq *requestOp) { |
|
||||||
cc.handler.close(err, inflightReq) |
|
||||||
cc.codec.Close() |
|
||||||
} |
|
||||||
|
|
||||||
type readOp struct { |
|
||||||
msgs []*jsonrpcMessage |
|
||||||
batch bool |
|
||||||
} |
|
||||||
|
|
||||||
type requestOp struct { |
|
||||||
ids []json.RawMessage |
|
||||||
err error |
|
||||||
resp chan *jsonrpcMessage // receives up to len(ids) responses
|
|
||||||
sub *ClientSubscription // only set for EthSubscribe requests
|
|
||||||
} |
|
||||||
|
|
||||||
func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, error) { |
|
||||||
select { |
|
||||||
case <-ctx.Done(): |
|
||||||
// Send the timeout to dispatch so it can remove the request IDs.
|
|
||||||
select { |
|
||||||
case c.reqTimeout <- op: |
|
||||||
case <-c.closing: |
|
||||||
} |
|
||||||
return nil, ctx.Err() |
|
||||||
case resp := <-op.resp: |
|
||||||
return resp, op.err |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Dial creates a new client for the given URL.
|
|
||||||
//
|
|
||||||
// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
|
|
||||||
// file name with no URL scheme, a local socket connection is established using UNIX
|
|
||||||
// domain sockets on supported platforms and named pipes on Windows. If you want to
|
|
||||||
// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
|
|
||||||
//
|
|
||||||
// For websocket connections, the origin is set to the local host name.
|
|
||||||
//
|
|
||||||
// The client reconnects automatically if the connection is lost.
|
|
||||||
func Dial(rawurl string) (*Client, error) { |
|
||||||
return DialContext(context.Background(), rawurl) |
|
||||||
} |
|
||||||
|
|
||||||
// DialContext creates a new RPC client, just like Dial.
|
|
||||||
//
|
|
||||||
// The context is used to cancel or time out the initial connection establishment. It does
|
|
||||||
// not affect subsequent interactions with the client.
|
|
||||||
func DialContext(ctx context.Context, rawurl string) (*Client, error) { |
|
||||||
u, err := url.Parse(rawurl) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
switch u.Scheme { |
|
||||||
case "http", "https": |
|
||||||
return DialHTTP(rawurl) |
|
||||||
case "ws", "wss": |
|
||||||
return DialWebsocket(ctx, rawurl, "") |
|
||||||
case "stdio": |
|
||||||
return DialStdIO(ctx) |
|
||||||
case "": |
|
||||||
return DialIPC(ctx, rawurl) |
|
||||||
default: |
|
||||||
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// ClientFromContext Client retrieves the client from the context, if any. This can be used to perform
|
|
||||||
// 'reverse calls' in a handler method.
|
|
||||||
func ClientFromContext(ctx context.Context) (*Client, bool) { |
|
||||||
client, ok := ctx.Value(clientContextKey{}).(*Client) |
|
||||||
return client, ok |
|
||||||
} |
|
||||||
|
|
||||||
func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) { |
|
||||||
conn, err := connect(initctx) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
c := initClient(conn, randomIDGenerator(), new(serviceRegistry)) |
|
||||||
c.reconnectFunc = connect |
|
||||||
return c, nil |
|
||||||
} |
|
||||||
|
|
||||||
func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { |
|
||||||
_, isHTTP := conn.(*httpConn) |
|
||||||
c := &Client{ |
|
||||||
idgen: idgen, |
|
||||||
isHTTP: isHTTP, |
|
||||||
services: services, |
|
||||||
writeConn: conn, |
|
||||||
close: make(chan struct{}), |
|
||||||
closing: make(chan struct{}), |
|
||||||
didClose: make(chan struct{}), |
|
||||||
reconnected: make(chan ServerCodec), |
|
||||||
readOp: make(chan readOp), |
|
||||||
readErr: make(chan error), |
|
||||||
reqInit: make(chan *requestOp), |
|
||||||
reqSent: make(chan error, 1), |
|
||||||
reqTimeout: make(chan *requestOp), |
|
||||||
} |
|
||||||
if !isHTTP { |
|
||||||
go c.dispatch(conn) |
|
||||||
} |
|
||||||
return c |
|
||||||
} |
|
||||||
|
|
||||||
// RegisterName creates a service for the given receiver type under the given name. When no
|
|
||||||
// methods on the given receiver match the criteria to be either a RPC method or a
|
|
||||||
// subscription an error is returned. Otherwise a new service is created and added to the
|
|
||||||
// service collection this client provides to the server.
|
|
||||||
func (c *Client) RegisterName(name string, receiver interface{}) error { |
|
||||||
return c.services.registerName(name, receiver) |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) nextID() json.RawMessage { |
|
||||||
id := atomic.AddUint32(&c.idCounter, 1) |
|
||||||
return strconv.AppendUint(nil, uint64(id), 10) |
|
||||||
} |
|
||||||
|
|
||||||
// SupportedModules calls the rpc_modules method, retrieving the list of
|
|
||||||
// APIs that are available on the server.
|
|
||||||
func (c *Client) SupportedModules() (map[string]string, error) { |
|
||||||
var result map[string]string |
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) |
|
||||||
defer cancel() |
|
||||||
err := c.CallContext(ctx, &result, "rpc_modules") |
|
||||||
return result, err |
|
||||||
} |
|
||||||
|
|
||||||
// Close closes the client, aborting any in-flight requests.
|
|
||||||
func (c *Client) Close() { |
|
||||||
if c.isHTTP { |
|
||||||
return |
|
||||||
} |
|
||||||
select { |
|
||||||
case c.close <- struct{}{}: |
|
||||||
<-c.didClose |
|
||||||
case <-c.didClose: |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Call performs a JSON-RPC call with the given arguments and unmarshals into
|
|
||||||
// result if no error occurred.
|
|
||||||
//
|
|
||||||
// The result must be a pointer so that package json can unmarshal into it. You
|
|
||||||
// can also pass nil, in which case the result is ignored.
|
|
||||||
func (c *Client) Call(result interface{}, method string, args ...interface{}) error { |
|
||||||
ctx := context.Background() |
|
||||||
return c.CallContext(ctx, result, method, args...) |
|
||||||
} |
|
||||||
|
|
||||||
// CallContext performs a JSON-RPC call with the given arguments. If the context is
|
|
||||||
// canceled before the call has successfully returned, CallContext returns immediately.
|
|
||||||
//
|
|
||||||
// The result must be a pointer so that package json can unmarshal into it. You
|
|
||||||
// can also pass nil, in which case the result is ignored.
|
|
||||||
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { |
|
||||||
msg, err := c.newMessage(method, args...) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} |
|
||||||
|
|
||||||
if c.isHTTP { |
|
||||||
err = c.sendHTTP(ctx, op, msg) |
|
||||||
} else { |
|
||||||
err = c.send(ctx, op, msg) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
// dispatch has accepted the request and will close the channel when it quits.
|
|
||||||
switch resp, err := op.wait(ctx, c); { |
|
||||||
case err != nil: |
|
||||||
return err |
|
||||||
case resp.Error != nil: |
|
||||||
return resp.Error |
|
||||||
case len(resp.Result) == 0: |
|
||||||
return errNoResult |
|
||||||
default: |
|
||||||
return json.Unmarshal(resp.Result, &result) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// BatchCall sends all given requests as a single batch and waits for the server
|
|
||||||
// to return a response for all of them.
|
|
||||||
//
|
|
||||||
// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
|
|
||||||
// a request is reported through the Error field of the corresponding BatchElem.
|
|
||||||
//
|
|
||||||
// Note that batch calls may not be executed atomically on the server side.
|
|
||||||
func (c *Client) BatchCall(b []BatchElem) error { |
|
||||||
ctx := context.Background() |
|
||||||
return c.BatchCallContext(ctx, b) |
|
||||||
} |
|
||||||
|
|
||||||
// BatchCallContext sends all given requests as a single batch and waits for the server
|
|
||||||
// to return a response for all of them. The wait duration is bounded by the
|
|
||||||
// context's deadline.
|
|
||||||
//
|
|
||||||
// In contrast to CallContext, BatchCallContext only returns errors that have occurred
|
|
||||||
// while sending the request. Any error specific to a request is reported through the
|
|
||||||
// Error field of the corresponding BatchElem.
|
|
||||||
//
|
|
||||||
// Note that batch calls may not be executed atomically on the server side.
|
|
||||||
func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { |
|
||||||
msgs := make([]*jsonrpcMessage, len(b)) |
|
||||||
op := &requestOp{ |
|
||||||
ids: make([]json.RawMessage, len(b)), |
|
||||||
resp: make(chan *jsonrpcMessage, len(b)), |
|
||||||
} |
|
||||||
for i, elem := range b { |
|
||||||
msg, err := c.newMessage(elem.Method, elem.Args...) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
msgs[i] = msg |
|
||||||
op.ids[i] = msg.ID |
|
||||||
} |
|
||||||
|
|
||||||
var err error |
|
||||||
if c.isHTTP { |
|
||||||
err = c.sendBatchHTTP(ctx, op, msgs) |
|
||||||
} else { |
|
||||||
err = c.send(ctx, op, msgs) |
|
||||||
} |
|
||||||
|
|
||||||
// Wait for all responses to come back.
|
|
||||||
for n := 0; n < len(b) && err == nil; n++ { |
|
||||||
var resp *jsonrpcMessage |
|
||||||
resp, err = op.wait(ctx, c) |
|
||||||
if err != nil { |
|
||||||
break |
|
||||||
} |
|
||||||
// Find the element corresponding to this response.
|
|
||||||
// The element is guaranteed to be present because dispatch
|
|
||||||
// only sends valid IDs to our channel.
|
|
||||||
var elem *BatchElem |
|
||||||
for i := range msgs { |
|
||||||
if bytes.Equal(msgs[i].ID, resp.ID) { |
|
||||||
elem = &b[i] |
|
||||||
break |
|
||||||
} |
|
||||||
} |
|
||||||
if resp.Error != nil { |
|
||||||
elem.Error = resp.Error |
|
||||||
continue |
|
||||||
} |
|
||||||
if len(resp.Result) == 0 { |
|
||||||
elem.Error = errNoResult |
|
||||||
continue |
|
||||||
} |
|
||||||
elem.Error = json.Unmarshal(resp.Result, elem.Result) |
|
||||||
} |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
// Notify sends a notification, i.e. a method call that doesn't expect a response.
|
|
||||||
func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error { |
|
||||||
op := new(requestOp) |
|
||||||
msg, err := c.newMessage(method, args...) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
msg.ID = nil |
|
||||||
|
|
||||||
if c.isHTTP { |
|
||||||
return c.sendHTTP(ctx, op, msg) |
|
||||||
} |
|
||||||
return c.send(ctx, op, msg) |
|
||||||
} |
|
||||||
|
|
||||||
// EthSubscribe registers a subscripion under the "eth" namespace.
|
|
||||||
func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { |
|
||||||
return c.Subscribe(ctx, "eth", channel, args...) |
|
||||||
} |
|
||||||
|
|
||||||
// ShhSubscribe registers a subscripion under the "shh" namespace.
|
|
||||||
func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { |
|
||||||
return c.Subscribe(ctx, "shh", channel, args...) |
|
||||||
} |
|
||||||
|
|
||||||
// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
|
|
||||||
// registering a subscription. Server notifications for the subscription are
|
|
||||||
// sent to the given channel. The element type of the channel must match the
|
|
||||||
// expected type of content returned by the subscription.
|
|
||||||
//
|
|
||||||
// The context argument cancels the RPC request that sets up the subscription but has no
|
|
||||||
// effect on the subscription after Subscribe has returned.
|
|
||||||
//
|
|
||||||
// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications
|
|
||||||
// before considering the subscriber dead. The subscription Err channel will receive
|
|
||||||
// errSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
|
|
||||||
// that the channel usually has at least one reader to prevent this issue.
|
|
||||||
func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { |
|
||||||
// Check type of channel first.
|
|
||||||
chanVal := reflect.ValueOf(channel) |
|
||||||
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { |
|
||||||
panic("first argument to Subscribe must be a writable channel") |
|
||||||
} |
|
||||||
if chanVal.IsNil() { |
|
||||||
panic("channel given to Subscribe must not be nil") |
|
||||||
} |
|
||||||
if c.isHTTP { |
|
||||||
return nil, ErrNotificationsUnsupported |
|
||||||
} |
|
||||||
|
|
||||||
msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
op := &requestOp{ |
|
||||||
ids: []json.RawMessage{msg.ID}, |
|
||||||
resp: make(chan *jsonrpcMessage), |
|
||||||
sub: newClientSubscription(c, namespace, chanVal), |
|
||||||
} |
|
||||||
|
|
||||||
// Send the subscription request.
|
|
||||||
// The arrival and validity of the response is signaled on sub.quit.
|
|
||||||
if err := c.send(ctx, op, msg); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
if _, err := op.wait(ctx, c); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return op.sub, nil |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { |
|
||||||
msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method} |
|
||||||
if paramsIn != nil { // prevent sending "params":null
|
|
||||||
var err error |
|
||||||
if msg.Params, err = json.Marshal(paramsIn); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
} |
|
||||||
return msg, nil |
|
||||||
} |
|
||||||
|
|
||||||
// send registers op with the dispatch loop, then sends msg on the connection.
|
|
||||||
// if sending fails, op is deregistered.
|
|
||||||
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { |
|
||||||
select { |
|
||||||
case c.reqInit <- op: |
|
||||||
err := c.write(ctx, msg) |
|
||||||
c.reqSent <- err |
|
||||||
return err |
|
||||||
case <-ctx.Done(): |
|
||||||
// This can happen if the client is overloaded or unable to keep up with
|
|
||||||
// subscription notifications.
|
|
||||||
return ctx.Err() |
|
||||||
case <-c.closing: |
|
||||||
return errClientQuit |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) write(ctx context.Context, msg interface{}) error { |
|
||||||
// The previous write failed. Try to establish a new connection.
|
|
||||||
if c.writeConn == nil { |
|
||||||
if err := c.reconnect(ctx); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
} |
|
||||||
err := c.writeConn.Write(ctx, msg) |
|
||||||
if err != nil { |
|
||||||
c.writeConn = nil |
|
||||||
} |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) reconnect(ctx context.Context) error { |
|
||||||
if c.reconnectFunc == nil { |
|
||||||
return errDead |
|
||||||
} |
|
||||||
|
|
||||||
if _, ok := ctx.Deadline(); !ok { |
|
||||||
var cancel func() |
|
||||||
ctx, cancel = context.WithTimeout(ctx, defaultDialTimeout) |
|
||||||
defer cancel() |
|
||||||
} |
|
||||||
newconn, err := c.reconnectFunc(ctx) |
|
||||||
if err != nil { |
|
||||||
log.Trace("RPC client reconnect failed", "err", err) |
|
||||||
return err |
|
||||||
} |
|
||||||
select { |
|
||||||
case c.reconnected <- newconn: |
|
||||||
c.writeConn = newconn |
|
||||||
return nil |
|
||||||
case <-c.didClose: |
|
||||||
newconn.Close() |
|
||||||
return errClientQuit |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// dispatch is the main loop of the client.
|
|
||||||
// It sends read messages to waiting calls to Call and BatchCall
|
|
||||||
// and subscription notifications to registered subscriptions.
|
|
||||||
func (c *Client) dispatch(codec ServerCodec) { |
|
||||||
var ( |
|
||||||
lastOp *requestOp // tracks last send operation
|
|
||||||
reqInitLock = c.reqInit // nil while the send lock is held
|
|
||||||
conn = c.newClientConn(codec) |
|
||||||
reading = true |
|
||||||
) |
|
||||||
defer func() { |
|
||||||
close(c.closing) |
|
||||||
if reading { |
|
||||||
conn.close(errClientQuit, nil) |
|
||||||
c.drainRead() |
|
||||||
} |
|
||||||
close(c.didClose) |
|
||||||
}() |
|
||||||
|
|
||||||
// Spawn the initial read loop.
|
|
||||||
go c.read(codec) |
|
||||||
|
|
||||||
for { |
|
||||||
select { |
|
||||||
case <-c.close: |
|
||||||
return |
|
||||||
|
|
||||||
// Read path:
|
|
||||||
case op := <-c.readOp: |
|
||||||
if op.batch { |
|
||||||
conn.handler.handleBatch(op.msgs) |
|
||||||
} else { |
|
||||||
conn.handler.handleMsg(op.msgs[0]) |
|
||||||
} |
|
||||||
|
|
||||||
case err := <-c.readErr: |
|
||||||
conn.handler.log.Debug("RPC connection read error", "err", err) |
|
||||||
conn.close(err, lastOp) |
|
||||||
reading = false |
|
||||||
|
|
||||||
// Reconnect:
|
|
||||||
case newcodec := <-c.reconnected: |
|
||||||
log.Debug("RPC client reconnected", "reading", reading, "conn", newcodec.RemoteAddr()) |
|
||||||
if reading { |
|
||||||
// Wait for the previous read loop to exit. This is a rare case which
|
|
||||||
// happens if this loop isn't notified in time after the connection breaks.
|
|
||||||
// In those cases the caller will notice first and reconnect. Closing the
|
|
||||||
// handler terminates all waiting requests (closing op.resp) except for
|
|
||||||
// lastOp, which will be transferred to the new handler.
|
|
||||||
conn.close(errClientReconnected, lastOp) |
|
||||||
c.drainRead() |
|
||||||
} |
|
||||||
go c.read(newcodec) |
|
||||||
reading = true |
|
||||||
conn = c.newClientConn(newcodec) |
|
||||||
// Re-register the in-flight request on the new handler
|
|
||||||
// because that's where it will be sent.
|
|
||||||
conn.handler.addRequestOp(lastOp) |
|
||||||
|
|
||||||
// Send path:
|
|
||||||
case op := <-reqInitLock: |
|
||||||
// Stop listening for further requests until the current one has been sent.
|
|
||||||
reqInitLock = nil |
|
||||||
lastOp = op |
|
||||||
conn.handler.addRequestOp(op) |
|
||||||
|
|
||||||
case err := <-c.reqSent: |
|
||||||
if err != nil { |
|
||||||
// Remove response handlers for the last send. When the read loop
|
|
||||||
// goes down, it will signal all other current operations.
|
|
||||||
conn.handler.removeRequestOp(lastOp) |
|
||||||
} |
|
||||||
// Let the next request in.
|
|
||||||
reqInitLock = c.reqInit |
|
||||||
lastOp = nil |
|
||||||
|
|
||||||
case op := <-c.reqTimeout: |
|
||||||
conn.handler.removeRequestOp(op) |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// drainRead drops read messages until an error occurs.
|
|
||||||
func (c *Client) drainRead() { |
|
||||||
for { |
|
||||||
select { |
|
||||||
case <-c.readOp: |
|
||||||
case <-c.readErr: |
|
||||||
return |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// read decodes RPC messages from a codec, feeding them into dispatch.
|
|
||||||
func (c *Client) read(codec ServerCodec) { |
|
||||||
for { |
|
||||||
msgs, batch, err := codec.Read() |
|
||||||
if _, ok := err.(*json.SyntaxError); ok { |
|
||||||
codec.Write(context.Background(), errorMessage(&parseError{err.Error()})) |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
c.readErr <- err |
|
||||||
return |
|
||||||
} |
|
||||||
c.readOp <- readOp{msgs, batch} |
|
||||||
} |
|
||||||
} |
|
@ -1,79 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"net" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
) |
|
||||||
|
|
||||||
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules
|
|
||||||
func StartHTTPEndpoint(endpoint string, apis []API) (net.Listener, *Server, error) { |
|
||||||
// Register all the APIs exposed by the services
|
|
||||||
handler := NewServer() |
|
||||||
for _, api := range apis { |
|
||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil { |
|
||||||
return nil, nil, err |
|
||||||
} |
|
||||||
log.Debug("HTTP registered", "namespace", api.Namespace) |
|
||||||
} |
|
||||||
// All APIs registered, start the HTTP listener
|
|
||||||
var ( |
|
||||||
listener net.Listener |
|
||||||
err error |
|
||||||
) |
|
||||||
if listener, err = net.Listen("tcp", endpoint); err != nil { |
|
||||||
return nil, nil, err |
|
||||||
} |
|
||||||
go NewHTTPServer(handler).Serve(listener) |
|
||||||
return listener, handler, err |
|
||||||
} |
|
||||||
|
|
||||||
// StartWSEndpoint starts a websocket endpoint
|
|
||||||
func StartWSEndpoint(endpoint string, apis []API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *Server, error) { |
|
||||||
|
|
||||||
// Generate the whitelist based on the allowed modules
|
|
||||||
whitelist := make(map[string]bool) |
|
||||||
for _, module := range modules { |
|
||||||
whitelist[module] = true |
|
||||||
} |
|
||||||
// Register all the APIs exposed by the services
|
|
||||||
handler := NewServer() |
|
||||||
for _, api := range apis { |
|
||||||
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { |
|
||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil { |
|
||||||
return nil, nil, err |
|
||||||
} |
|
||||||
log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) |
|
||||||
} |
|
||||||
} |
|
||||||
// All APIs registered, start the HTTP listener
|
|
||||||
var ( |
|
||||||
listener net.Listener |
|
||||||
err error |
|
||||||
) |
|
||||||
if listener, err = net.Listen("tcp", endpoint); err != nil { |
|
||||||
return nil, nil, err |
|
||||||
} |
|
||||||
go NewWSServer(wsOrigins, handler).Serve(listener) |
|
||||||
return listener, handler, err |
|
||||||
|
|
||||||
} |
|
||||||
|
|
||||||
// StartIPCEndpoint starts an IPC endpoint.
|
|
||||||
func StartIPCEndpoint(ipcEndpoint string, apis []API) (net.Listener, *Server, error) { |
|
||||||
// Register all the APIs exposed by the services.
|
|
||||||
handler := NewServer() |
|
||||||
for _, api := range apis { |
|
||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil { |
|
||||||
return nil, nil, err |
|
||||||
} |
|
||||||
log.Debug("IPC registered", "namespace", api.Namespace) |
|
||||||
} |
|
||||||
// All APIs registered, start the IPC listener.
|
|
||||||
listener, err := ipcListen(ipcEndpoint) |
|
||||||
if err != nil { |
|
||||||
return nil, nil, err |
|
||||||
} |
|
||||||
go handler.ServeListener(listener) |
|
||||||
return listener, handler, nil |
|
||||||
} |
|
@ -1,49 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import "fmt" |
|
||||||
|
|
||||||
const defaultErrorCode = -32000 |
|
||||||
|
|
||||||
type methodNotFoundError struct{ method string } |
|
||||||
|
|
||||||
func (e *methodNotFoundError) ErrorCode() int { return -32601 } |
|
||||||
|
|
||||||
func (e *methodNotFoundError) Error() string { |
|
||||||
return fmt.Sprintf("the method %s does not exist/is not available", e.method) |
|
||||||
} |
|
||||||
|
|
||||||
type subscriptionNotFoundError struct{ namespace, subscription string } |
|
||||||
|
|
||||||
func (e *subscriptionNotFoundError) ErrorCode() int { return -32601 } |
|
||||||
|
|
||||||
func (e *subscriptionNotFoundError) Error() string { |
|
||||||
return fmt.Sprintf("no %q subscription in %s namespace", e.subscription, e.namespace) |
|
||||||
} |
|
||||||
|
|
||||||
// Invalid JSON was received by the server.
|
|
||||||
type parseError struct{ message string } |
|
||||||
|
|
||||||
func (e *parseError) ErrorCode() int { return -32700 } |
|
||||||
|
|
||||||
func (e *parseError) Error() string { return e.message } |
|
||||||
|
|
||||||
// received message isn't a valid request
|
|
||||||
type invalidRequestError struct{ message string } |
|
||||||
|
|
||||||
func (e *invalidRequestError) ErrorCode() int { return -32600 } |
|
||||||
|
|
||||||
func (e *invalidRequestError) Error() string { return e.message } |
|
||||||
|
|
||||||
// received message is invalid
|
|
||||||
type invalidMessageError struct{ message string } |
|
||||||
|
|
||||||
func (e *invalidMessageError) ErrorCode() int { return -32700 } |
|
||||||
|
|
||||||
func (e *invalidMessageError) Error() string { return e.message } |
|
||||||
|
|
||||||
// unable to decode supplied params, or an invalid number of parameters
|
|
||||||
type invalidParamsError struct{ message string } |
|
||||||
|
|
||||||
func (e *invalidParamsError) ErrorCode() int { return -32602 } |
|
||||||
|
|
||||||
func (e *invalidParamsError) Error() string { return e.message } |
|
@ -1,381 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"encoding/json" |
|
||||||
"reflect" |
|
||||||
"strconv" |
|
||||||
"strings" |
|
||||||
"sync" |
|
||||||
"time" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
) |
|
||||||
|
|
||||||
// handler handles JSON-RPC messages. There is one handler per connection. Note that
|
|
||||||
// handler is not safe for concurrent use. Message handling never blocks indefinitely
|
|
||||||
// because RPCs are processed on background goroutines launched by handler.
|
|
||||||
//
|
|
||||||
// The entry points for incoming messages are:
|
|
||||||
//
|
|
||||||
// h.handleMsg(message)
|
|
||||||
// h.handleBatch(message)
|
|
||||||
//
|
|
||||||
// Outgoing calls use the requestOp struct. Register the request before sending it
|
|
||||||
// on the connection:
|
|
||||||
//
|
|
||||||
// op := &requestOp{ids: ...}
|
|
||||||
// h.addRequestOp(op)
|
|
||||||
//
|
|
||||||
// Now send the request, then wait for the reply to be delivered through handleMsg:
|
|
||||||
//
|
|
||||||
// if err := op.wait(...); err != nil {
|
|
||||||
// h.removeRequestOp(op) // timeout, etc.
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
type handler struct { |
|
||||||
reg *serviceRegistry |
|
||||||
unsubscribeCb *callback |
|
||||||
idgen func() ID // subscription ID generator
|
|
||||||
respWait map[string]*requestOp // active client requests
|
|
||||||
clientSubs map[string]*ClientSubscription // active client subscriptions
|
|
||||||
callWG sync.WaitGroup // pending call goroutines
|
|
||||||
rootCtx context.Context // canceled by close()
|
|
||||||
cancelRoot func() // cancel function for rootCtx
|
|
||||||
conn jsonWriter // where responses will be sent
|
|
||||||
log log.Logger |
|
||||||
allowSubscribe bool |
|
||||||
|
|
||||||
subLock sync.Mutex |
|
||||||
serverSubs map[ID]*Subscription |
|
||||||
} |
|
||||||
|
|
||||||
type callProc struct { |
|
||||||
ctx context.Context |
|
||||||
notifiers []*Notifier |
|
||||||
} |
|
||||||
|
|
||||||
func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { |
|
||||||
rootCtx, cancelRoot := context.WithCancel(connCtx) |
|
||||||
h := &handler{ |
|
||||||
reg: reg, |
|
||||||
idgen: idgen, |
|
||||||
conn: conn, |
|
||||||
respWait: make(map[string]*requestOp), |
|
||||||
clientSubs: make(map[string]*ClientSubscription), |
|
||||||
rootCtx: rootCtx, |
|
||||||
cancelRoot: cancelRoot, |
|
||||||
allowSubscribe: true, |
|
||||||
serverSubs: make(map[ID]*Subscription), |
|
||||||
log: log.Root(), |
|
||||||
} |
|
||||||
if conn.RemoteAddr() != "" { |
|
||||||
h.log = h.log.New("conn", conn.RemoteAddr()) |
|
||||||
} |
|
||||||
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe)) |
|
||||||
return h |
|
||||||
} |
|
||||||
|
|
||||||
// handleBatch executes all messages in a batch and returns the responses.
|
|
||||||
func (h *handler) handleBatch(msgs []*jsonrpcMessage) { |
|
||||||
// Emit error response for empty batches:
|
|
||||||
if len(msgs) == 0 { |
|
||||||
h.startCallProc(func(cp *callProc) { |
|
||||||
h.conn.Write(cp.ctx, errorMessage(&invalidRequestError{"empty batch"})) |
|
||||||
}) |
|
||||||
return |
|
||||||
} |
|
||||||
|
|
||||||
// Handle non-call messages first:
|
|
||||||
calls := make([]*jsonrpcMessage, 0, len(msgs)) |
|
||||||
for _, msg := range msgs { |
|
||||||
if handled := h.handleImmediate(msg); !handled { |
|
||||||
calls = append(calls, msg) |
|
||||||
} |
|
||||||
} |
|
||||||
if len(calls) == 0 { |
|
||||||
return |
|
||||||
} |
|
||||||
// Process calls on a goroutine because they may block indefinitely:
|
|
||||||
h.startCallProc(func(cp *callProc) { |
|
||||||
answers := make([]*jsonrpcMessage, 0, len(msgs)) |
|
||||||
for _, msg := range calls { |
|
||||||
if answer := h.handleCallMsg(cp, msg); answer != nil { |
|
||||||
answers = append(answers, answer) |
|
||||||
} |
|
||||||
} |
|
||||||
h.addSubscriptions(cp.notifiers) |
|
||||||
if len(answers) > 0 { |
|
||||||
h.conn.Write(cp.ctx, answers) |
|
||||||
} |
|
||||||
for _, n := range cp.notifiers { |
|
||||||
n.activate() |
|
||||||
} |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
// handleMsg handles a single message.
|
|
||||||
func (h *handler) handleMsg(msg *jsonrpcMessage) { |
|
||||||
if ok := h.handleImmediate(msg); ok { |
|
||||||
return |
|
||||||
} |
|
||||||
h.startCallProc(func(cp *callProc) { |
|
||||||
answer := h.handleCallMsg(cp, msg) |
|
||||||
h.addSubscriptions(cp.notifiers) |
|
||||||
if answer != nil { |
|
||||||
h.conn.Write(cp.ctx, answer) |
|
||||||
} |
|
||||||
for _, n := range cp.notifiers { |
|
||||||
n.activate() |
|
||||||
} |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
// close cancels all requests except for inflightReq and waits for
|
|
||||||
// call goroutines to shut down.
|
|
||||||
func (h *handler) close(err error, inflightReq *requestOp) { |
|
||||||
h.cancelAllRequests(err, inflightReq) |
|
||||||
h.cancelRoot() |
|
||||||
h.callWG.Wait() |
|
||||||
h.cancelServerSubscriptions(err) |
|
||||||
} |
|
||||||
|
|
||||||
// addRequestOp registers a request operation.
|
|
||||||
func (h *handler) addRequestOp(op *requestOp) { |
|
||||||
for _, id := range op.ids { |
|
||||||
h.respWait[string(id)] = op |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// removeRequestOps stops waiting for the given request IDs.
|
|
||||||
func (h *handler) removeRequestOp(op *requestOp) { |
|
||||||
for _, id := range op.ids { |
|
||||||
delete(h.respWait, string(id)) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// cancelAllRequests unblocks and removes pending requests and active subscriptions.
|
|
||||||
func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) { |
|
||||||
didClose := make(map[*requestOp]bool) |
|
||||||
if inflightReq != nil { |
|
||||||
didClose[inflightReq] = true |
|
||||||
} |
|
||||||
|
|
||||||
for id, op := range h.respWait { |
|
||||||
// Remove the op so that later calls will not close op.resp again.
|
|
||||||
delete(h.respWait, id) |
|
||||||
|
|
||||||
if !didClose[op] { |
|
||||||
op.err = err |
|
||||||
close(op.resp) |
|
||||||
didClose[op] = true |
|
||||||
} |
|
||||||
} |
|
||||||
for id, sub := range h.clientSubs { |
|
||||||
delete(h.clientSubs, id) |
|
||||||
sub.quitWithError(false, err) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func (h *handler) addSubscriptions(nn []*Notifier) { |
|
||||||
h.subLock.Lock() |
|
||||||
defer h.subLock.Unlock() |
|
||||||
|
|
||||||
for _, n := range nn { |
|
||||||
if sub := n.takeSubscription(); sub != nil { |
|
||||||
h.serverSubs[sub.ID] = sub |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// cancelServerSubscriptions removes all subscriptions and closes their error channels.
|
|
||||||
func (h *handler) cancelServerSubscriptions(err error) { |
|
||||||
h.subLock.Lock() |
|
||||||
defer h.subLock.Unlock() |
|
||||||
|
|
||||||
for id, s := range h.serverSubs { |
|
||||||
s.err <- err |
|
||||||
close(s.err) |
|
||||||
delete(h.serverSubs, id) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
|
|
||||||
func (h *handler) startCallProc(fn func(*callProc)) { |
|
||||||
h.callWG.Add(1) |
|
||||||
go func() { |
|
||||||
ctx, cancel := context.WithCancel(h.rootCtx) |
|
||||||
defer h.callWG.Done() |
|
||||||
defer cancel() |
|
||||||
fn(&callProc{ctx: ctx}) |
|
||||||
}() |
|
||||||
} |
|
||||||
|
|
||||||
// handleImmediate executes non-call messages. It returns false if the message is a
|
|
||||||
// call or requires a reply.
|
|
||||||
func (h *handler) handleImmediate(msg *jsonrpcMessage) bool { |
|
||||||
start := time.Now() |
|
||||||
switch { |
|
||||||
case msg.isNotification(): |
|
||||||
if strings.HasSuffix(msg.Method, notificationMethodSuffix) { |
|
||||||
h.handleSubscriptionResult(msg) |
|
||||||
return true |
|
||||||
} |
|
||||||
return false |
|
||||||
case msg.isResponse(): |
|
||||||
h.handleResponse(msg) |
|
||||||
h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start)) |
|
||||||
return true |
|
||||||
default: |
|
||||||
return false |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// handleSubscriptionResult processes subscription notifications.
|
|
||||||
func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) { |
|
||||||
var result subscriptionResult |
|
||||||
if err := json.Unmarshal(msg.Params, &result); err != nil { |
|
||||||
h.log.Debug("Dropping invalid subscription message") |
|
||||||
return |
|
||||||
} |
|
||||||
if h.clientSubs[result.ID] != nil { |
|
||||||
h.clientSubs[result.ID].deliver(result.Result) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// handleResponse processes method call responses.
|
|
||||||
func (h *handler) handleResponse(msg *jsonrpcMessage) { |
|
||||||
op := h.respWait[string(msg.ID)] |
|
||||||
if op == nil { |
|
||||||
h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID}) |
|
||||||
return |
|
||||||
} |
|
||||||
delete(h.respWait, string(msg.ID)) |
|
||||||
// For normal responses, just forward the reply to Call/BatchCall.
|
|
||||||
if op.sub == nil { |
|
||||||
op.resp <- msg |
|
||||||
return |
|
||||||
} |
|
||||||
// For subscription responses, start the subscription if the server
|
|
||||||
// indicates success. EthSubscribe gets unblocked in either case through
|
|
||||||
// the op.resp channel.
|
|
||||||
defer close(op.resp) |
|
||||||
if msg.Error != nil { |
|
||||||
op.err = msg.Error |
|
||||||
return |
|
||||||
} |
|
||||||
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { |
|
||||||
go op.sub.start() |
|
||||||
h.clientSubs[op.sub.subid] = op.sub |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// handleCallMsg executes a call message and returns the answer.
|
|
||||||
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage { |
|
||||||
start := time.Now() |
|
||||||
switch { |
|
||||||
case msg.isNotification(): |
|
||||||
h.handleCall(ctx, msg) |
|
||||||
h.log.Debug("Served "+msg.Method, "t", time.Since(start)) |
|
||||||
return nil |
|
||||||
case msg.isCall(): |
|
||||||
resp := h.handleCall(ctx, msg) |
|
||||||
if resp.Error != nil { |
|
||||||
h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message) |
|
||||||
} else { |
|
||||||
h.log.Debug("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start)) |
|
||||||
} |
|
||||||
return resp |
|
||||||
case msg.hasValidID(): |
|
||||||
return msg.errorResponse(&invalidRequestError{"invalid request"}) |
|
||||||
default: |
|
||||||
return errorMessage(&invalidRequestError{"invalid request"}) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// handleCall processes method calls.
|
|
||||||
func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { |
|
||||||
if msg.isSubscribe() { |
|
||||||
return h.handleSubscribe(cp, msg) |
|
||||||
} |
|
||||||
var callb *callback |
|
||||||
if msg.isUnsubscribe() { |
|
||||||
callb = h.unsubscribeCb |
|
||||||
} else { |
|
||||||
callb = h.reg.callback(msg.Method) |
|
||||||
} |
|
||||||
if callb == nil { |
|
||||||
return msg.errorResponse(&methodNotFoundError{method: msg.Method}) |
|
||||||
} |
|
||||||
args, err := parsePositionalArguments(msg.Params, callb.argTypes) |
|
||||||
if err != nil { |
|
||||||
return msg.errorResponse(&invalidParamsError{err.Error()}) |
|
||||||
} |
|
||||||
|
|
||||||
return h.runMethod(cp.ctx, msg, callb, args) |
|
||||||
} |
|
||||||
|
|
||||||
// handleSubscribe processes *_subscribe method calls.
|
|
||||||
func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage { |
|
||||||
if !h.allowSubscribe { |
|
||||||
return msg.errorResponse(ErrNotificationsUnsupported) |
|
||||||
} |
|
||||||
|
|
||||||
// Subscription method name is first argument.
|
|
||||||
name, err := parseSubscriptionName(msg.Params) |
|
||||||
if err != nil { |
|
||||||
return msg.errorResponse(&invalidParamsError{err.Error()}) |
|
||||||
} |
|
||||||
namespace := msg.namespace() |
|
||||||
callb := h.reg.subscription(namespace, name) |
|
||||||
if callb == nil { |
|
||||||
return msg.errorResponse(&subscriptionNotFoundError{namespace, name}) |
|
||||||
} |
|
||||||
|
|
||||||
// Parse subscription name arg too, but remove it before calling the callback.
|
|
||||||
argTypes := append([]reflect.Type{stringType}, callb.argTypes...) |
|
||||||
args, err := parsePositionalArguments(msg.Params, argTypes) |
|
||||||
if err != nil { |
|
||||||
return msg.errorResponse(&invalidParamsError{err.Error()}) |
|
||||||
} |
|
||||||
args = args[1:] |
|
||||||
|
|
||||||
// Install notifier in context so the subscription handler can find it.
|
|
||||||
n := &Notifier{h: h, namespace: namespace} |
|
||||||
cp.notifiers = append(cp.notifiers, n) |
|
||||||
ctx := context.WithValue(cp.ctx, notifierKey{}, n) |
|
||||||
|
|
||||||
return h.runMethod(ctx, msg, callb, args) |
|
||||||
} |
|
||||||
|
|
||||||
// runMethod runs the Go callback for an RPC method.
|
|
||||||
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage { |
|
||||||
result, err := callb.call(ctx, msg.Method, args) |
|
||||||
if err != nil { |
|
||||||
return msg.errorResponse(err) |
|
||||||
} |
|
||||||
return msg.response(result) |
|
||||||
} |
|
||||||
|
|
||||||
// unsubscribe is the callback function for all *_unsubscribe calls.
|
|
||||||
func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) { |
|
||||||
h.subLock.Lock() |
|
||||||
defer h.subLock.Unlock() |
|
||||||
|
|
||||||
s := h.serverSubs[id] |
|
||||||
if s == nil { |
|
||||||
return false, ErrSubscriptionNotFound |
|
||||||
} |
|
||||||
close(s.err) |
|
||||||
delete(h.serverSubs, id) |
|
||||||
return true, nil |
|
||||||
} |
|
||||||
|
|
||||||
type idForLog struct{ json.RawMessage } |
|
||||||
|
|
||||||
func (id idForLog) String() string { |
|
||||||
if s, err := strconv.Unquote(string(id.RawMessage)); err == nil { |
|
||||||
return s |
|
||||||
} |
|
||||||
return string(id.RawMessage) |
|
||||||
} |
|
@ -1,322 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"context" |
|
||||||
"encoding/json" |
|
||||||
"errors" |
|
||||||
"fmt" |
|
||||||
"io" |
|
||||||
"io/ioutil" |
|
||||||
"mime" |
|
||||||
"net" |
|
||||||
"net/http" |
|
||||||
"sync" |
|
||||||
"time" |
|
||||||
) |
|
||||||
|
|
||||||
const ( |
|
||||||
maxRequestContentLength = 1024 * 512 |
|
||||||
contentType = "application/json" |
|
||||||
) |
|
||||||
|
|
||||||
// https://www.jsonrpc.org/historical/json-rpc-over-http.html#id13
|
|
||||||
var acceptedContentTypes = []string{contentType, "application/json-rpc", "application/jsonrequest"} |
|
||||||
|
|
||||||
type httpConn struct { |
|
||||||
client *http.Client |
|
||||||
req *http.Request |
|
||||||
closeOnce sync.Once |
|
||||||
closed chan interface{} |
|
||||||
} |
|
||||||
|
|
||||||
// httpConn is treated specially by Client.
|
|
||||||
func (hc *httpConn) Write(context.Context, interface{}) error { |
|
||||||
panic("Write called on httpConn") |
|
||||||
} |
|
||||||
|
|
||||||
func (hc *httpConn) RemoteAddr() string { |
|
||||||
return hc.req.URL.String() |
|
||||||
} |
|
||||||
|
|
||||||
func (hc *httpConn) Read() ([]*jsonrpcMessage, bool, error) { |
|
||||||
<-hc.closed |
|
||||||
return nil, false, io.EOF |
|
||||||
} |
|
||||||
|
|
||||||
func (hc *httpConn) Close() { |
|
||||||
hc.closeOnce.Do(func() { close(hc.closed) }) |
|
||||||
} |
|
||||||
|
|
||||||
func (hc *httpConn) Closed() <-chan interface{} { |
|
||||||
return hc.closed |
|
||||||
} |
|
||||||
|
|
||||||
// HTTPTimeouts represents the configuration params for the HTTP RPC server.
|
|
||||||
type HTTPTimeouts struct { |
|
||||||
// ReadTimeout is the maximum duration for reading the entire
|
|
||||||
// request, including the body.
|
|
||||||
//
|
|
||||||
// Because ReadTimeout does not let Handlers make per-request
|
|
||||||
// decisions on each request body's acceptable deadline or
|
|
||||||
// upload rate, most users will prefer to use
|
|
||||||
// ReadHeaderTimeout. It is valid to use them both.
|
|
||||||
ReadTimeout time.Duration |
|
||||||
|
|
||||||
// WriteTimeout is the maximum duration before timing out
|
|
||||||
// writes of the response. It is reset whenever a new
|
|
||||||
// request's header is read. Like ReadTimeout, it does not
|
|
||||||
// let Handlers make decisions on a per-request basis.
|
|
||||||
WriteTimeout time.Duration |
|
||||||
|
|
||||||
// IdleTimeout is the maximum amount of time to wait for the
|
|
||||||
// next request when keep-alives are enabled. If IdleTimeout
|
|
||||||
// is zero, the value of ReadTimeout is used. If both are
|
|
||||||
// zero, ReadHeaderTimeout is used.
|
|
||||||
IdleTimeout time.Duration |
|
||||||
} |
|
||||||
|
|
||||||
// DefaultHTTPTimeouts represents the default timeout values used if further
|
|
||||||
// configuration is not provided.
|
|
||||||
var DefaultHTTPTimeouts = HTTPTimeouts{ |
|
||||||
ReadTimeout: 30 * time.Second, |
|
||||||
WriteTimeout: 30 * time.Second, |
|
||||||
IdleTimeout: 120 * time.Second, |
|
||||||
} |
|
||||||
|
|
||||||
// DialHTTPWithClient creates a new RPC client that connects to an RPC server over HTTP
|
|
||||||
// using the provided HTTP Client.
|
|
||||||
func DialHTTPWithClient(endpoint string, client *http.Client) (*Client, error) { |
|
||||||
req, err := http.NewRequest(http.MethodPost, endpoint, nil) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
req.Header.Set("Content-Type", contentType) |
|
||||||
req.Header.Set("Accept", contentType) |
|
||||||
|
|
||||||
initctx := context.Background() |
|
||||||
return newClient(initctx, func(context.Context) (ServerCodec, error) { |
|
||||||
return &httpConn{client: client, req: req, closed: make(chan interface{})}, nil |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
// DialHTTP creates a new RPC client that connects to an RPC server over HTTP.
|
|
||||||
func DialHTTP(endpoint string) (*Client, error) { |
|
||||||
return DialHTTPWithClient(endpoint, new(http.Client)) |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { |
|
||||||
hc := c.writeConn.(*httpConn) |
|
||||||
respBody, err := hc.doRequest(ctx, msg) |
|
||||||
if respBody != nil { |
|
||||||
defer respBody.Close() |
|
||||||
} |
|
||||||
|
|
||||||
if err != nil { |
|
||||||
if respBody != nil { |
|
||||||
buf := new(bytes.Buffer) |
|
||||||
if _, err2 := buf.ReadFrom(respBody); err2 == nil { |
|
||||||
return fmt.Errorf("%v %v", err, buf.String()) |
|
||||||
} |
|
||||||
} |
|
||||||
return err |
|
||||||
} |
|
||||||
var respmsg jsonrpcMessage |
|
||||||
if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
op.resp <- &respmsg |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { |
|
||||||
hc := c.writeConn.(*httpConn) |
|
||||||
respBody, err := hc.doRequest(ctx, msgs) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
defer respBody.Close() |
|
||||||
var respmsgs []jsonrpcMessage |
|
||||||
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
for i := 0; i < len(respmsgs); i++ { |
|
||||||
op.resp <- &respmsgs[i] |
|
||||||
} |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { |
|
||||||
body, err := json.Marshal(msg) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
req := hc.req.WithContext(ctx) |
|
||||||
req.Body = ioutil.NopCloser(bytes.NewReader(body)) |
|
||||||
req.ContentLength = int64(len(body)) |
|
||||||
|
|
||||||
resp, err := hc.client.Do(req) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 { |
|
||||||
return resp.Body, errors.New(resp.Status) |
|
||||||
} |
|
||||||
return resp.Body, nil |
|
||||||
} |
|
||||||
|
|
||||||
// httpServerConn turns a HTTP connection into a Conn.
|
|
||||||
type httpServerConn struct { |
|
||||||
io.Reader |
|
||||||
io.Writer |
|
||||||
r *http.Request |
|
||||||
} |
|
||||||
|
|
||||||
func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec { |
|
||||||
body := io.LimitReader(r.Body, maxRequestContentLength) |
|
||||||
conn := &httpServerConn{Reader: body, Writer: w, r: r} |
|
||||||
return NewJSONCodec(conn) |
|
||||||
} |
|
||||||
|
|
||||||
// Close does nothing and always returns nil.
|
|
||||||
func (t *httpServerConn) Close() error { return nil } |
|
||||||
|
|
||||||
// RemoteAddr returns the peer address of the underlying connection.
|
|
||||||
func (t *httpServerConn) RemoteAddr() string { |
|
||||||
return t.r.RemoteAddr |
|
||||||
} |
|
||||||
|
|
||||||
// SetWriteDeadline does nothing and always returns nil.
|
|
||||||
func (t *httpServerConn) SetWriteDeadline(time.Time) error { return nil } |
|
||||||
|
|
||||||
// NewHTTPServer creates a new HTTP RPC server around an API provider.
|
|
||||||
//
|
|
||||||
// Deprecated: Server implements http.Handler
|
|
||||||
func NewHTTPServer(srv http.Handler) *http.Server { |
|
||||||
// TODO(ricl): eth wraps it in CORS-handler. Might need to port if we want to use cors.
|
|
||||||
handler := srv |
|
||||||
|
|
||||||
// TODO(ricl): port timeout handlers if necessary
|
|
||||||
// Bundle and start the HTTP server
|
|
||||||
return &http.Server{ |
|
||||||
Handler: handler, |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// ServeHTTP serves JSON-RPC requests over HTTP.
|
|
||||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
|
||||||
// Permit dumb empty requests for remote health-checks (AWS)
|
|
||||||
if r.Method == http.MethodGet && r.ContentLength == 0 && r.URL.RawQuery == "" { |
|
||||||
return |
|
||||||
} |
|
||||||
if code, err := validateRequest(r); err != nil { |
|
||||||
http.Error(w, err.Error(), code) |
|
||||||
return |
|
||||||
} |
|
||||||
// All checks passed, create a codec that reads direct from the request body
|
|
||||||
// untilEOF and writes the response to w and order the server to process a
|
|
||||||
// single request.
|
|
||||||
ctx := r.Context() |
|
||||||
ctx = context.WithValue(ctx, interface{}("remote"), r.RemoteAddr) |
|
||||||
ctx = context.WithValue(ctx, interface{}("scheme"), r.Proto) |
|
||||||
ctx = context.WithValue(ctx, interface{}("local"), r.Host) |
|
||||||
if ua := r.Header.Get("User-Agent"); ua != "" { |
|
||||||
ctx = context.WithValue(ctx, interface{}("User-Agent"), ua) |
|
||||||
} |
|
||||||
if origin := r.Header.Get("Origin"); origin != "" { |
|
||||||
ctx = context.WithValue(ctx, interface{}("Origin"), origin) |
|
||||||
} |
|
||||||
|
|
||||||
w.Header().Set("content-type", contentType) |
|
||||||
codec := newHTTPServerConn(r, w) |
|
||||||
defer codec.Close() |
|
||||||
s.serveSingleRequest(ctx, codec) |
|
||||||
} |
|
||||||
|
|
||||||
// validateRequest returns a non-zero response code and error message if the
|
|
||||||
// request is invalid.
|
|
||||||
func validateRequest(r *http.Request) (int, error) { |
|
||||||
if r.Method == http.MethodPut || r.Method == http.MethodDelete { |
|
||||||
return http.StatusMethodNotAllowed, errors.New("method not allowed") |
|
||||||
} |
|
||||||
if r.ContentLength > maxRequestContentLength { |
|
||||||
err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength) |
|
||||||
return http.StatusRequestEntityTooLarge, err |
|
||||||
} |
|
||||||
// Allow OPTIONS (regardless of content-type)
|
|
||||||
if r.Method == http.MethodOptions { |
|
||||||
return 0, nil |
|
||||||
} |
|
||||||
// Check content-type
|
|
||||||
if mt, _, err := mime.ParseMediaType(r.Header.Get("content-type")); err == nil { |
|
||||||
for _, accepted := range acceptedContentTypes { |
|
||||||
if accepted == mt { |
|
||||||
return 0, nil |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
// Invalid content-type
|
|
||||||
err := fmt.Errorf("invalid content type, only %s is supported", contentType) |
|
||||||
return http.StatusUnsupportedMediaType, err |
|
||||||
} |
|
||||||
|
|
||||||
// func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler {
|
|
||||||
// // disable CORS support if user has not specified a custom CORS configuration
|
|
||||||
// if len(allowedOrigins) == 0 {
|
|
||||||
// return srv
|
|
||||||
// }
|
|
||||||
// c := cors.New(cors.Options{
|
|
||||||
// AllowedOrigins: allowedOrigins,
|
|
||||||
// AllowedMethods: []string{http.MethodPost, http.MethodGet},
|
|
||||||
// MaxAge: 600,
|
|
||||||
// AllowedHeaders: []string{"*"},
|
|
||||||
// })
|
|
||||||
// return c.Handler(srv)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// virtualHostHandler is a handler which validates the Host-header of incoming requests.
|
|
||||||
// The virtualHostHandler can prevent DNS rebinding attacks, which do not utilize CORS-headers,
|
|
||||||
// since they do in-domain requests against the RPC api. Instead, we can see on the Host-header
|
|
||||||
// which domain was used, and validate that against a whitelist.
|
|
||||||
type virtualHostHandler struct { |
|
||||||
vhosts map[string]struct{} |
|
||||||
next http.Handler |
|
||||||
} |
|
||||||
|
|
||||||
// ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler
|
|
||||||
func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
|
||||||
// if r.Host is not set, we can continue serving since a browser would set the Host header
|
|
||||||
if r.Host == "" { |
|
||||||
h.next.ServeHTTP(w, r) |
|
||||||
return |
|
||||||
} |
|
||||||
host, _, err := net.SplitHostPort(r.Host) |
|
||||||
if err != nil { |
|
||||||
// Either invalid (too many colons) or no port specified
|
|
||||||
host = r.Host |
|
||||||
} |
|
||||||
if ipAddr := net.ParseIP(host); ipAddr != nil { |
|
||||||
// It's an IP address, we can serve that
|
|
||||||
h.next.ServeHTTP(w, r) |
|
||||||
return |
|
||||||
|
|
||||||
} |
|
||||||
// Not an ip address, but a hostname. Need to validate
|
|
||||||
if _, exist := h.vhosts["*"]; exist { |
|
||||||
h.next.ServeHTTP(w, r) |
|
||||||
return |
|
||||||
} |
|
||||||
if _, exist := h.vhosts[host]; exist { |
|
||||||
h.next.ServeHTTP(w, r) |
|
||||||
return |
|
||||||
} |
|
||||||
http.Error(w, "invalid host specified", http.StatusForbidden) |
|
||||||
} |
|
||||||
|
|
||||||
// func newVHostHandler(vhosts []string, next http.Handler) http.Handler {
|
|
||||||
// vhostMap := make(map[string]struct{})
|
|
||||||
// for _, allowedHost := range vhosts {
|
|
||||||
// vhostMap[strings.ToLower(allowedHost)] = struct{}{}
|
|
||||||
// }
|
|
||||||
// return &virtualHostHandler{vhostMap, next}
|
|
||||||
// }
|
|
@ -1,40 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"net" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
"github.com/ethereum/go-ethereum/p2p/netutil" |
|
||||||
) |
|
||||||
|
|
||||||
// ServeListener accepts connections on l, serving JSON-RPC on them.
|
|
||||||
func (s *Server) ServeListener(l net.Listener) error { |
|
||||||
for { |
|
||||||
conn, err := l.Accept() |
|
||||||
if netutil.IsTemporaryError(err) { |
|
||||||
log.Warn("RPC accept error", "err", err) |
|
||||||
continue |
|
||||||
} else if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr()) |
|
||||||
go s.ServeCodec(NewJSONCodec(conn)) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
|
|
||||||
// the endpoint is the full path to a unix socket, and Windows the endpoint is an
|
|
||||||
// identifier for a named pipe.
|
|
||||||
//
|
|
||||||
// The context is used for the initial connection establishment. It does not
|
|
||||||
// affect subsequent interactions with the client.
|
|
||||||
func DialIPC(ctx context.Context, endpoint string) (*Client, error) { |
|
||||||
return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { |
|
||||||
conn, err := newIPCConnection(ctx, endpoint) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return NewJSONCodec(conn), err |
|
||||||
}) |
|
||||||
} |
|
@ -1,46 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"fmt" |
|
||||||
"net" |
|
||||||
"os" |
|
||||||
"path/filepath" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
) |
|
||||||
|
|
||||||
/* |
|
||||||
#include <sys/un.h> |
|
||||||
|
|
||||||
int max_socket_path_size() { |
|
||||||
struct sockaddr_un s; |
|
||||||
return sizeof(s.sun_path); |
|
||||||
} |
|
||||||
*/ |
|
||||||
import "C" |
|
||||||
|
|
||||||
// ipcListen will create a Unix socket on the given endpoint.
|
|
||||||
func ipcListen(endpoint string) (net.Listener, error) { |
|
||||||
if len(endpoint) > int(C.max_socket_path_size()) { |
|
||||||
log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", C.max_socket_path_size()), |
|
||||||
"endpoint", endpoint) |
|
||||||
} |
|
||||||
|
|
||||||
// Ensure the IPC path exists and remove any previous leftover
|
|
||||||
if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
os.Remove(endpoint) |
|
||||||
l, err := net.Listen("unix", endpoint) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
os.Chmod(endpoint, 0600) |
|
||||||
return l, nil |
|
||||||
} |
|
||||||
|
|
||||||
// newIPCConnection will connect to a Unix socket on the given endpoint.
|
|
||||||
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { |
|
||||||
return dialContext(ctx, "unix", endpoint) |
|
||||||
} |
|
@ -1,314 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"context" |
|
||||||
"encoding/json" |
|
||||||
"errors" |
|
||||||
"fmt" |
|
||||||
"io" |
|
||||||
"reflect" |
|
||||||
"strings" |
|
||||||
"sync" |
|
||||||
"time" |
|
||||||
) |
|
||||||
|
|
||||||
const ( |
|
||||||
vsn = "2.0" |
|
||||||
serviceMethodSeparator = "_" |
|
||||||
subscribeMethodSuffix = "_subscribe" |
|
||||||
unsubscribeMethodSuffix = "_unsubscribe" |
|
||||||
notificationMethodSuffix = "_subscription" |
|
||||||
|
|
||||||
defaultWriteTimeout = 10 * time.Second // used if context has no deadline
|
|
||||||
) |
|
||||||
|
|
||||||
var null = json.RawMessage("null") |
|
||||||
|
|
||||||
type subscriptionResult struct { |
|
||||||
ID string `json:"subscription"` |
|
||||||
Result json.RawMessage `json:"result,omitempty"` |
|
||||||
} |
|
||||||
|
|
||||||
// A value of this type can a JSON-RPC request, notification, successful response or
|
|
||||||
// error response. Which one it is depends on the fields.
|
|
||||||
type jsonrpcMessage struct { |
|
||||||
Version string `json:"jsonrpc,omitempty"` |
|
||||||
ID json.RawMessage `json:"id,omitempty"` |
|
||||||
Method string `json:"method,omitempty"` |
|
||||||
Params json.RawMessage `json:"params,omitempty"` |
|
||||||
Error *jsonError `json:"error,omitempty"` |
|
||||||
Result json.RawMessage `json:"result,omitempty"` |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) isNotification() bool { |
|
||||||
return msg.ID == nil && msg.Method != "" |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) isCall() bool { |
|
||||||
return msg.hasValidID() && msg.Method != "" |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) isResponse() bool { |
|
||||||
return msg.hasValidID() && msg.Method == "" && msg.Params == nil && (msg.Result != nil || msg.Error != nil) |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) hasValidID() bool { |
|
||||||
return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) isSubscribe() bool { |
|
||||||
return strings.HasSuffix(msg.Method, subscribeMethodSuffix) |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) isUnsubscribe() bool { |
|
||||||
return strings.HasSuffix(msg.Method, unsubscribeMethodSuffix) |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) namespace() string { |
|
||||||
elem := strings.SplitN(msg.Method, serviceMethodSeparator, 2) |
|
||||||
return elem[0] |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) String() string { |
|
||||||
b, _ := json.Marshal(msg) |
|
||||||
return string(b) |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { |
|
||||||
resp := errorMessage(err) |
|
||||||
resp.ID = msg.ID |
|
||||||
return resp |
|
||||||
} |
|
||||||
|
|
||||||
func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { |
|
||||||
enc, err := json.Marshal(result) |
|
||||||
if err != nil { |
|
||||||
// TODO: wrap with 'internal server error'
|
|
||||||
return msg.errorResponse(err) |
|
||||||
} |
|
||||||
return &jsonrpcMessage{Version: vsn, ID: msg.ID, Result: enc} |
|
||||||
} |
|
||||||
|
|
||||||
func errorMessage(err error) *jsonrpcMessage { |
|
||||||
msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{ |
|
||||||
Code: defaultErrorCode, |
|
||||||
Message: err.Error(), |
|
||||||
}} |
|
||||||
ec, ok := err.(Error) |
|
||||||
if ok { |
|
||||||
msg.Error.Code = ec.ErrorCode() |
|
||||||
} |
|
||||||
return msg |
|
||||||
} |
|
||||||
|
|
||||||
type jsonError struct { |
|
||||||
Code int `json:"code"` |
|
||||||
Message string `json:"message"` |
|
||||||
Data interface{} `json:"data,omitempty"` |
|
||||||
} |
|
||||||
|
|
||||||
func (err *jsonError) Error() string { |
|
||||||
if err.Message == "" { |
|
||||||
return fmt.Sprintf("json-rpc error %d", err.Code) |
|
||||||
} |
|
||||||
return err.Message |
|
||||||
} |
|
||||||
|
|
||||||
func (err *jsonError) ErrorCode() int { |
|
||||||
return err.Code |
|
||||||
} |
|
||||||
|
|
||||||
// Conn is a subset of the methods of net.Conn which are sufficient for ServerCodec.
|
|
||||||
type Conn interface { |
|
||||||
io.ReadWriteCloser |
|
||||||
SetWriteDeadline(time.Time) error |
|
||||||
} |
|
||||||
|
|
||||||
// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description
|
|
||||||
// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this
|
|
||||||
// description is used in log messages.
|
|
||||||
type ConnRemoteAddr interface { |
|
||||||
RemoteAddr() string |
|
||||||
} |
|
||||||
|
|
||||||
// connWithRemoteAddr overrides the remote address of a connection.
|
|
||||||
type connWithRemoteAddr struct { |
|
||||||
Conn |
|
||||||
addr string |
|
||||||
} |
|
||||||
|
|
||||||
func (c connWithRemoteAddr) RemoteAddr() string { return c.addr } |
|
||||||
|
|
||||||
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
|
|
||||||
// support for parsing arguments and serializing (result) objects.
|
|
||||||
type jsonCodec struct { |
|
||||||
remoteAddr string |
|
||||||
closer sync.Once // close closed channel once
|
|
||||||
closed chan interface{} // closed on Close
|
|
||||||
decode func(v interface{}) error // decoder to allow multiple transports
|
|
||||||
encMu sync.Mutex // guards the encoder
|
|
||||||
encode func(v interface{}) error // encoder to allow multiple transports
|
|
||||||
conn Conn |
|
||||||
} |
|
||||||
|
|
||||||
// NewCodec creates a new RPC server codec with support for JSON-RPC 2.0 based
|
|
||||||
// on explicitly given encoding and decoding methods.
|
|
||||||
func NewCodec(conn Conn, encode, decode func(v interface{}) error) ServerCodec { |
|
||||||
codec := &jsonCodec{ |
|
||||||
closed: make(chan interface{}), |
|
||||||
encode: encode, |
|
||||||
decode: decode, |
|
||||||
conn: conn, |
|
||||||
} |
|
||||||
if ra, ok := conn.(ConnRemoteAddr); ok { |
|
||||||
codec.remoteAddr = ra.RemoteAddr() |
|
||||||
} |
|
||||||
return codec |
|
||||||
} |
|
||||||
|
|
||||||
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0.
|
|
||||||
func NewJSONCodec(conn Conn) ServerCodec { |
|
||||||
enc := json.NewEncoder(conn) |
|
||||||
dec := json.NewDecoder(conn) |
|
||||||
dec.UseNumber() |
|
||||||
return NewCodec(conn, enc.Encode, dec.Decode) |
|
||||||
} |
|
||||||
|
|
||||||
func (c *jsonCodec) RemoteAddr() string { |
|
||||||
return c.remoteAddr |
|
||||||
} |
|
||||||
|
|
||||||
func (c *jsonCodec) Read() (msg []*jsonrpcMessage, batch bool, err error) { |
|
||||||
// Decode the next JSON object in the input stream.
|
|
||||||
// This verifies basic syntax, etc.
|
|
||||||
var rawmsg json.RawMessage |
|
||||||
if err := c.decode(&rawmsg); err != nil { |
|
||||||
return nil, false, err |
|
||||||
} |
|
||||||
msg, batch = parseMessage(rawmsg) |
|
||||||
return msg, batch, nil |
|
||||||
} |
|
||||||
|
|
||||||
// Write sends a message to client.
|
|
||||||
func (c *jsonCodec) Write(ctx context.Context, v interface{}) error { |
|
||||||
c.encMu.Lock() |
|
||||||
defer c.encMu.Unlock() |
|
||||||
|
|
||||||
deadline, ok := ctx.Deadline() |
|
||||||
if !ok { |
|
||||||
deadline = time.Now().Add(defaultWriteTimeout) |
|
||||||
} |
|
||||||
c.conn.SetWriteDeadline(deadline) |
|
||||||
return c.encode(v) |
|
||||||
} |
|
||||||
|
|
||||||
// Close the underlying connection
|
|
||||||
func (c *jsonCodec) Close() { |
|
||||||
c.closer.Do(func() { |
|
||||||
close(c.closed) |
|
||||||
c.conn.Close() |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
// Closed returns a channel which will be closed when Close is called
|
|
||||||
func (c *jsonCodec) Closed() <-chan interface{} { |
|
||||||
return c.closed |
|
||||||
} |
|
||||||
|
|
||||||
// parseMessage parses raw bytes as a (batch of) JSON-RPC message(s). There are no error
|
|
||||||
// checks in this function because the raw message has already been syntax-checked when it
|
|
||||||
// is called. Any non-JSON-RPC messages in the input return the zero value of
|
|
||||||
// jsonrpcMessage.
|
|
||||||
func parseMessage(raw json.RawMessage) ([]*jsonrpcMessage, bool) { |
|
||||||
if !isBatch(raw) { |
|
||||||
msgs := []*jsonrpcMessage{{}} |
|
||||||
json.Unmarshal(raw, &msgs[0]) |
|
||||||
return msgs, false |
|
||||||
} |
|
||||||
dec := json.NewDecoder(bytes.NewReader(raw)) |
|
||||||
dec.Token() // skip '['
|
|
||||||
var msgs []*jsonrpcMessage |
|
||||||
for dec.More() { |
|
||||||
msgs = append(msgs, new(jsonrpcMessage)) |
|
||||||
dec.Decode(&msgs[len(msgs)-1]) |
|
||||||
} |
|
||||||
return msgs, true |
|
||||||
} |
|
||||||
|
|
||||||
// isBatch returns true when the first non-whitespace characters is '['
|
|
||||||
func isBatch(raw json.RawMessage) bool { |
|
||||||
for _, c := range raw { |
|
||||||
// skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt)
|
|
||||||
if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { |
|
||||||
continue |
|
||||||
} |
|
||||||
return c == '[' |
|
||||||
} |
|
||||||
return false |
|
||||||
} |
|
||||||
|
|
||||||
// parsePositionalArguments tries to parse the given args to an array of values with the
|
|
||||||
// given types. It returns the parsed values or an error when the args could not be
|
|
||||||
// parsed. Missing optional arguments are returned as reflect.Zero values.
|
|
||||||
func parsePositionalArguments(rawArgs json.RawMessage, types []reflect.Type) ([]reflect.Value, error) { |
|
||||||
dec := json.NewDecoder(bytes.NewReader(rawArgs)) |
|
||||||
var args []reflect.Value |
|
||||||
tok, err := dec.Token() |
|
||||||
switch { |
|
||||||
case err == io.EOF || tok == nil && err == nil: |
|
||||||
// "params" is optional and may be empty. Also allow "params":null even though it's
|
|
||||||
// not in the spec because our own client used to send it.
|
|
||||||
case err != nil: |
|
||||||
return nil, err |
|
||||||
case tok == json.Delim('['): |
|
||||||
// Read argument array.
|
|
||||||
if args, err = parseArgumentArray(dec, types); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
default: |
|
||||||
return nil, errors.New("non-array args") |
|
||||||
} |
|
||||||
// Set any missing args to nil.
|
|
||||||
for i := len(args); i < len(types); i++ { |
|
||||||
if types[i].Kind() != reflect.Ptr { |
|
||||||
return nil, fmt.Errorf("missing value for required argument %d", i) |
|
||||||
} |
|
||||||
args = append(args, reflect.Zero(types[i])) |
|
||||||
} |
|
||||||
return args, nil |
|
||||||
} |
|
||||||
|
|
||||||
func parseArgumentArray(dec *json.Decoder, types []reflect.Type) ([]reflect.Value, error) { |
|
||||||
args := make([]reflect.Value, 0, len(types)) |
|
||||||
for i := 0; dec.More(); i++ { |
|
||||||
if i >= len(types) { |
|
||||||
return args, fmt.Errorf("too many arguments, want at most %d", len(types)) |
|
||||||
} |
|
||||||
argval := reflect.New(types[i]) |
|
||||||
if err := dec.Decode(argval.Interface()); err != nil { |
|
||||||
return args, fmt.Errorf("invalid argument %d: %v", i, err) |
|
||||||
} |
|
||||||
if argval.IsNil() && types[i].Kind() != reflect.Ptr { |
|
||||||
return args, fmt.Errorf("missing value for required argument %d", i) |
|
||||||
} |
|
||||||
args = append(args, argval.Elem()) |
|
||||||
} |
|
||||||
// Read end of args array.
|
|
||||||
_, err := dec.Token() |
|
||||||
return args, err |
|
||||||
} |
|
||||||
|
|
||||||
// parseSubscriptionName extracts the subscription name from an encoded argument array.
|
|
||||||
func parseSubscriptionName(rawArgs json.RawMessage) (string, error) { |
|
||||||
dec := json.NewDecoder(bytes.NewReader(rawArgs)) |
|
||||||
if tok, _ := dec.Token(); tok != json.Delim('[') { |
|
||||||
return "", errors.New("non-array args") |
|
||||||
} |
|
||||||
v, _ := dec.Token() |
|
||||||
method, ok := v.(string) |
|
||||||
if !ok { |
|
||||||
return "", errors.New("expected subscription name as first argument") |
|
||||||
} |
|
||||||
return method, nil |
|
||||||
} |
|
@ -1,116 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"io" |
|
||||||
"sync/atomic" |
|
||||||
|
|
||||||
mapset "github.com/deckarep/golang-set" |
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
) |
|
||||||
|
|
||||||
const metadataAPI = "rpc" |
|
||||||
|
|
||||||
// Server is an RPC server.
|
|
||||||
type Server struct { |
|
||||||
services serviceRegistry |
|
||||||
idgen func() ID |
|
||||||
run int32 |
|
||||||
codecs mapset.Set |
|
||||||
} |
|
||||||
|
|
||||||
// NewServer creates a new server instance with no registered handlers.
|
|
||||||
func NewServer() *Server { |
|
||||||
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet(), run: 1} |
|
||||||
// Register the default service providing meta information about the RPC service such
|
|
||||||
// as the services and methods it offers.
|
|
||||||
rpcService := &Service{server} |
|
||||||
server.RegisterName(metadataAPI, rpcService) |
|
||||||
return server |
|
||||||
} |
|
||||||
|
|
||||||
// RegisterName creates a service for the given receiver type under the given name. When no
|
|
||||||
// methods on the given receiver match the criteria to be either a RPC method or a
|
|
||||||
// subscription an error is returned. Otherwise a new service is created and added to the
|
|
||||||
// service collection this server provides to clients.
|
|
||||||
func (s *Server) RegisterName(name string, receiver interface{}) error { |
|
||||||
return s.services.registerName(name, receiver) |
|
||||||
} |
|
||||||
|
|
||||||
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes
|
|
||||||
// the response back using the given codec. It will block until the codec is closed or the
|
|
||||||
// server is stopped. In either case the codec is closed.
|
|
||||||
func (s *Server) ServeCodec(codec ServerCodec) { |
|
||||||
defer codec.Close() |
|
||||||
|
|
||||||
// Don't serve if server is stopped.
|
|
||||||
if atomic.LoadInt32(&s.run) == 0 { |
|
||||||
return |
|
||||||
} |
|
||||||
|
|
||||||
// Add the codec to the set so it can be closed by Stop.
|
|
||||||
s.codecs.Add(codec) |
|
||||||
defer s.codecs.Remove(codec) |
|
||||||
|
|
||||||
c := initClient(codec, s.idgen, &s.services) |
|
||||||
<-codec.Closed() |
|
||||||
c.Close() |
|
||||||
} |
|
||||||
|
|
||||||
// serveSingleRequest reads and processes a single RPC request from the given codec. This
|
|
||||||
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
|
|
||||||
// this mode.
|
|
||||||
func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { |
|
||||||
// Don't serve if server is stopped.
|
|
||||||
if atomic.LoadInt32(&s.run) == 0 { |
|
||||||
return |
|
||||||
} |
|
||||||
|
|
||||||
h := newHandler(ctx, codec, s.idgen, &s.services) |
|
||||||
h.allowSubscribe = false |
|
||||||
defer h.close(io.EOF, nil) |
|
||||||
|
|
||||||
reqs, batch, err := codec.Read() |
|
||||||
if err != nil { |
|
||||||
if err != io.EOF { |
|
||||||
codec.Write(ctx, errorMessage(&invalidMessageError{"parse error"})) |
|
||||||
} |
|
||||||
return |
|
||||||
} |
|
||||||
if batch { |
|
||||||
h.handleBatch(reqs) |
|
||||||
} else { |
|
||||||
h.handleMsg(reqs[0]) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Stop stops reading new requests, waits for stopPendingRequestTimeout to allow pending
|
|
||||||
// requests to finish, then closes all codecs which will cancel pending requests and
|
|
||||||
// subscriptions.
|
|
||||||
func (s *Server) Stop() { |
|
||||||
if atomic.CompareAndSwapInt32(&s.run, 1, 0) { |
|
||||||
log.Debug("RPC server shutting down") |
|
||||||
s.codecs.Each(func(c interface{}) bool { |
|
||||||
c.(ServerCodec).Close() |
|
||||||
return true |
|
||||||
}) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// Service gives meta information about the server.
|
|
||||||
// e.g. gives information about the loaded modules.
|
|
||||||
type Service struct { |
|
||||||
server *Server |
|
||||||
} |
|
||||||
|
|
||||||
// Modules returns the list of RPC services with their version number
|
|
||||||
func (s *Service) Modules() map[string]string { |
|
||||||
s.server.services.mu.Lock() |
|
||||||
defer s.server.services.mu.Unlock() |
|
||||||
|
|
||||||
modules := make(map[string]string) |
|
||||||
for name := range s.server.services.services { |
|
||||||
modules[name] = "1.0" |
|
||||||
} |
|
||||||
return modules |
|
||||||
} |
|
@ -1,269 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"errors" |
|
||||||
"fmt" |
|
||||||
"reflect" |
|
||||||
"runtime" |
|
||||||
"strings" |
|
||||||
"sync" |
|
||||||
"unicode" |
|
||||||
"unicode/utf8" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
) |
|
||||||
|
|
||||||
var ( |
|
||||||
contextType = reflect.TypeOf((*context.Context)(nil)).Elem() |
|
||||||
errorType = reflect.TypeOf((*error)(nil)).Elem() |
|
||||||
subscriptionType = reflect.TypeOf(Subscription{}) |
|
||||||
stringType = reflect.TypeOf("") |
|
||||||
) |
|
||||||
|
|
||||||
type serviceRegistry struct { |
|
||||||
mu sync.Mutex |
|
||||||
services map[string]service |
|
||||||
} |
|
||||||
|
|
||||||
// service represents a registered object.
|
|
||||||
type service struct { |
|
||||||
name string // name for service
|
|
||||||
callbacks map[string]*callback // registered handlers
|
|
||||||
subscriptions map[string]*callback // available subscriptions/notifications
|
|
||||||
} |
|
||||||
|
|
||||||
// callback is a method callback which was registered in the server
|
|
||||||
type callback struct { |
|
||||||
fn reflect.Value // the function
|
|
||||||
rcvr reflect.Value // receiver object of method, set if fn is method
|
|
||||||
argTypes []reflect.Type // input argument types
|
|
||||||
hasCtx bool // method's first argument is a context (not included in argTypes)
|
|
||||||
errPos int // err return idx, of -1 when method cannot return error
|
|
||||||
isSubscribe bool // true if this is a subscription callback
|
|
||||||
} |
|
||||||
|
|
||||||
func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { |
|
||||||
rcvrVal := reflect.ValueOf(rcvr) |
|
||||||
if name == "" { |
|
||||||
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String()) |
|
||||||
} |
|
||||||
callbacks := suitableCallbacks(rcvrVal) |
|
||||||
if len(callbacks) == 0 { |
|
||||||
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr) |
|
||||||
} |
|
||||||
|
|
||||||
r.mu.Lock() |
|
||||||
defer r.mu.Unlock() |
|
||||||
if r.services == nil { |
|
||||||
r.services = make(map[string]service) |
|
||||||
} |
|
||||||
svc, ok := r.services[name] |
|
||||||
if !ok { |
|
||||||
svc = service{ |
|
||||||
name: name, |
|
||||||
callbacks: make(map[string]*callback), |
|
||||||
subscriptions: make(map[string]*callback), |
|
||||||
} |
|
||||||
r.services[name] = svc |
|
||||||
} |
|
||||||
for name, cb := range callbacks { |
|
||||||
if cb.isSubscribe { |
|
||||||
svc.subscriptions[name] = cb |
|
||||||
} else { |
|
||||||
svc.callbacks[name] = cb |
|
||||||
} |
|
||||||
} |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
// callback returns the callback corresponding to the given RPC method name.
|
|
||||||
func (r *serviceRegistry) callback(method string) *callback { |
|
||||||
elem := strings.SplitN(method, serviceMethodSeparator, 2) |
|
||||||
if len(elem) != 2 { |
|
||||||
return nil |
|
||||||
} |
|
||||||
r.mu.Lock() |
|
||||||
defer r.mu.Unlock() |
|
||||||
return r.services[elem[0]].callbacks[elem[1]] |
|
||||||
} |
|
||||||
|
|
||||||
// subscription returns a subscription callback in the given service.
|
|
||||||
func (r *serviceRegistry) subscription(service, name string) *callback { |
|
||||||
r.mu.Lock() |
|
||||||
defer r.mu.Unlock() |
|
||||||
return r.services[service].subscriptions[name] |
|
||||||
} |
|
||||||
|
|
||||||
// suitableCallbacks iterates over the methods of the given type. It determines if a method
|
|
||||||
// satisfies the criteria for a RPC callback or a subscription callback and adds it to the
|
|
||||||
// collection of callbacks. See server documentation for a summary of these criteria.
|
|
||||||
func suitableCallbacks(receiver reflect.Value) map[string]*callback { |
|
||||||
typ := receiver.Type() |
|
||||||
callbacks := make(map[string]*callback) |
|
||||||
for m := 0; m < typ.NumMethod(); m++ { |
|
||||||
method := typ.Method(m) |
|
||||||
if method.PkgPath != "" { |
|
||||||
continue // method not exported
|
|
||||||
} |
|
||||||
cb := newCallback(receiver, method.Func) |
|
||||||
if cb == nil { |
|
||||||
continue // function invalid
|
|
||||||
} |
|
||||||
name := formatName(method.Name) |
|
||||||
callbacks[name] = cb |
|
||||||
} |
|
||||||
return callbacks |
|
||||||
} |
|
||||||
|
|
||||||
// newCallback turns fn (a function) into a callback object. It returns nil if the function
|
|
||||||
// is unsuitable as an RPC callback.
|
|
||||||
func newCallback(receiver, fn reflect.Value) *callback { |
|
||||||
fntype := fn.Type() |
|
||||||
c := &callback{fn: fn, rcvr: receiver, errPos: -1, isSubscribe: isPubSub(fntype)} |
|
||||||
// Determine parameter types. They must all be exported or builtin types.
|
|
||||||
c.makeArgTypes() |
|
||||||
if !allExportedOrBuiltin(c.argTypes) { |
|
||||||
return nil |
|
||||||
} |
|
||||||
// Verify return types. The function must return at most one error
|
|
||||||
// and/or one other non-error value.
|
|
||||||
outs := make([]reflect.Type, fntype.NumOut()) |
|
||||||
for i := 0; i < fntype.NumOut(); i++ { |
|
||||||
outs[i] = fntype.Out(i) |
|
||||||
} |
|
||||||
if len(outs) > 2 || !allExportedOrBuiltin(outs) { |
|
||||||
return nil |
|
||||||
} |
|
||||||
// If an error is returned, it must be the last returned value.
|
|
||||||
switch { |
|
||||||
case len(outs) == 1 && isErrorType(outs[0]): |
|
||||||
c.errPos = 0 |
|
||||||
case len(outs) == 2: |
|
||||||
if isErrorType(outs[0]) || !isErrorType(outs[1]) { |
|
||||||
return nil |
|
||||||
} |
|
||||||
c.errPos = 1 |
|
||||||
} |
|
||||||
return c |
|
||||||
} |
|
||||||
|
|
||||||
// makeArgTypes composes the argTypes list.
|
|
||||||
func (c *callback) makeArgTypes() { |
|
||||||
fntype := c.fn.Type() |
|
||||||
// Skip receiver and context.Context parameter (if present).
|
|
||||||
firstArg := 0 |
|
||||||
if c.rcvr.IsValid() { |
|
||||||
firstArg++ |
|
||||||
} |
|
||||||
if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType { |
|
||||||
c.hasCtx = true |
|
||||||
firstArg++ |
|
||||||
} |
|
||||||
// Add all remaining parameters.
|
|
||||||
c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg) |
|
||||||
for i := firstArg; i < fntype.NumIn(); i++ { |
|
||||||
c.argTypes[i-firstArg] = fntype.In(i) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// call invokes the callback.
|
|
||||||
func (c *callback) call(ctx context.Context, method string, args []reflect.Value) (res interface{}, errRes error) { |
|
||||||
// Create the argument slice.
|
|
||||||
fullargs := make([]reflect.Value, 0, 2+len(args)) |
|
||||||
if c.rcvr.IsValid() { |
|
||||||
fullargs = append(fullargs, c.rcvr) |
|
||||||
} |
|
||||||
if c.hasCtx { |
|
||||||
fullargs = append(fullargs, reflect.ValueOf(ctx)) |
|
||||||
} |
|
||||||
fullargs = append(fullargs, args...) |
|
||||||
|
|
||||||
// Catch panic while running the callback.
|
|
||||||
defer func() { |
|
||||||
if err := recover(); err != nil { |
|
||||||
const size = 64 << 10 |
|
||||||
buf := make([]byte, size) |
|
||||||
buf = buf[:runtime.Stack(buf, false)] |
|
||||||
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf)) |
|
||||||
errRes = errors.New("method handler crashed") |
|
||||||
} |
|
||||||
}() |
|
||||||
// Run the callback.
|
|
||||||
results := c.fn.Call(fullargs) |
|
||||||
if len(results) == 0 { |
|
||||||
return nil, nil |
|
||||||
} |
|
||||||
if c.errPos >= 0 && !results[c.errPos].IsNil() { |
|
||||||
// Method has returned non-nil error value.
|
|
||||||
err := results[c.errPos].Interface().(error) |
|
||||||
return reflect.Value{}, err |
|
||||||
} |
|
||||||
return results[0].Interface(), nil |
|
||||||
} |
|
||||||
|
|
||||||
// Is this an exported - upper case - name?
|
|
||||||
func isExported(name string) bool { |
|
||||||
rune, _ := utf8.DecodeRuneInString(name) |
|
||||||
return unicode.IsUpper(rune) |
|
||||||
} |
|
||||||
|
|
||||||
// Are all those types exported or built-in?
|
|
||||||
func allExportedOrBuiltin(types []reflect.Type) bool { |
|
||||||
for _, typ := range types { |
|
||||||
for typ.Kind() == reflect.Ptr { |
|
||||||
typ = typ.Elem() |
|
||||||
} |
|
||||||
// PkgPath will be non-empty even for an exported type,
|
|
||||||
// so we need to check the type name as well.
|
|
||||||
if !isExported(typ.Name()) && typ.PkgPath() != "" { |
|
||||||
return false |
|
||||||
} |
|
||||||
} |
|
||||||
return true |
|
||||||
} |
|
||||||
|
|
||||||
// Is t context.Context or *context.Context?
|
|
||||||
func isContextType(t reflect.Type) bool { |
|
||||||
for t.Kind() == reflect.Ptr { |
|
||||||
t = t.Elem() |
|
||||||
} |
|
||||||
return t == contextType |
|
||||||
} |
|
||||||
|
|
||||||
// Does t satisfy the error interface?
|
|
||||||
func isErrorType(t reflect.Type) bool { |
|
||||||
for t.Kind() == reflect.Ptr { |
|
||||||
t = t.Elem() |
|
||||||
} |
|
||||||
return t.Implements(errorType) |
|
||||||
} |
|
||||||
|
|
||||||
// Is t Subscription or *Subscription?
|
|
||||||
func isSubscriptionType(t reflect.Type) bool { |
|
||||||
for t.Kind() == reflect.Ptr { |
|
||||||
t = t.Elem() |
|
||||||
} |
|
||||||
return t == subscriptionType |
|
||||||
} |
|
||||||
|
|
||||||
// isPubSub tests whether the given method has as as first argument a context.Context and
|
|
||||||
// returns the pair (Subscription, error).
|
|
||||||
func isPubSub(methodType reflect.Type) bool { |
|
||||||
// numIn(0) is the receiver type
|
|
||||||
if methodType.NumIn() < 2 || methodType.NumOut() != 2 { |
|
||||||
return false |
|
||||||
} |
|
||||||
return isContextType(methodType.In(1)) && |
|
||||||
isSubscriptionType(methodType.Out(0)) && |
|
||||||
isErrorType(methodType.Out(1)) |
|
||||||
} |
|
||||||
|
|
||||||
// formatName converts to first character of name to lowercase.
|
|
||||||
func formatName(name string) string { |
|
||||||
ret := []rune(name) |
|
||||||
if len(ret) > 0 { |
|
||||||
ret[0] = unicode.ToLower(ret[0]) |
|
||||||
} |
|
||||||
return string(ret) |
|
||||||
} |
|
@ -1,50 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"errors" |
|
||||||
"io" |
|
||||||
"net" |
|
||||||
"os" |
|
||||||
"time" |
|
||||||
) |
|
||||||
|
|
||||||
// DialStdIO creates a client on stdin/stdout.
|
|
||||||
func DialStdIO(ctx context.Context) (*Client, error) { |
|
||||||
return DialIO(ctx, os.Stdin, os.Stdout) |
|
||||||
} |
|
||||||
|
|
||||||
// DialIO creates a client which uses the given IO channels
|
|
||||||
func DialIO(ctx context.Context, in io.Reader, out io.Writer) (*Client, error) { |
|
||||||
return newClient(ctx, func(_ context.Context) (ServerCodec, error) { |
|
||||||
return NewJSONCodec(stdioConn{ |
|
||||||
in: in, |
|
||||||
out: out, |
|
||||||
}), nil |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
type stdioConn struct { |
|
||||||
in io.Reader |
|
||||||
out io.Writer |
|
||||||
} |
|
||||||
|
|
||||||
func (io stdioConn) Read(b []byte) (n int, err error) { |
|
||||||
return io.in.Read(b) |
|
||||||
} |
|
||||||
|
|
||||||
func (io stdioConn) Write(b []byte) (n int, err error) { |
|
||||||
return io.out.Write(b) |
|
||||||
} |
|
||||||
|
|
||||||
func (io stdioConn) Close() error { |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (io stdioConn) RemoteAddr() string { |
|
||||||
return "/dev/stdin" |
|
||||||
} |
|
||||||
|
|
||||||
func (io stdioConn) SetWriteDeadline(t time.Time) error { |
|
||||||
return &net.OpError{Op: "set", Net: "stdio", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} |
|
||||||
} |
|
@ -1,311 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"bufio" |
|
||||||
"container/list" |
|
||||||
"context" |
|
||||||
crand "crypto/rand" |
|
||||||
"encoding/binary" |
|
||||||
"encoding/hex" |
|
||||||
"encoding/json" |
|
||||||
"errors" |
|
||||||
"math/rand" |
|
||||||
"reflect" |
|
||||||
"strings" |
|
||||||
"sync" |
|
||||||
"time" |
|
||||||
) |
|
||||||
|
|
||||||
var ( |
|
||||||
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
|
|
||||||
ErrNotificationsUnsupported = errors.New("notifications not supported") |
|
||||||
// ErrSubscriptionNotFound is returned when the subscription for the given id is not found
|
|
||||||
ErrSubscriptionNotFound = errors.New("subscription not found") |
|
||||||
) |
|
||||||
|
|
||||||
var globalGen = randomIDGenerator() |
|
||||||
|
|
||||||
// ID defines a pseudo random number that is used to identify RPC subscriptions.
|
|
||||||
type ID string |
|
||||||
|
|
||||||
// NewID returns a new, random ID.
|
|
||||||
func NewID() ID { |
|
||||||
return globalGen() |
|
||||||
} |
|
||||||
|
|
||||||
// randomIDGenerator returns a function generates a random IDs.
|
|
||||||
func randomIDGenerator() func() ID { |
|
||||||
seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)) |
|
||||||
if err != nil { |
|
||||||
seed = int64(time.Now().Nanosecond()) |
|
||||||
} |
|
||||||
var ( |
|
||||||
mu sync.Mutex |
|
||||||
rng = rand.New(rand.NewSource(seed)) |
|
||||||
) |
|
||||||
return func() ID { |
|
||||||
mu.Lock() |
|
||||||
defer mu.Unlock() |
|
||||||
id := make([]byte, 16) |
|
||||||
rng.Read(id) |
|
||||||
return encodeID(id) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func encodeID(b []byte) ID { |
|
||||||
id := hex.EncodeToString(b) |
|
||||||
id = strings.TrimLeft(id, "0") |
|
||||||
if id == "" { |
|
||||||
id = "0" // ID's are RPC quantities, no leading zero's and 0 is 0x0.
|
|
||||||
} |
|
||||||
return ID("0x" + id) |
|
||||||
} |
|
||||||
|
|
||||||
type notifierKey struct{} |
|
||||||
|
|
||||||
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
|
||||||
func NotifierFromContext(ctx context.Context) (*Notifier, bool) { |
|
||||||
n, ok := ctx.Value(notifierKey{}).(*Notifier) |
|
||||||
return n, ok |
|
||||||
} |
|
||||||
|
|
||||||
// Notifier is tied to a RPC connection that supports subscriptions.
|
|
||||||
// Server callbacks use the notifier to send notifications.
|
|
||||||
type Notifier struct { |
|
||||||
h *handler |
|
||||||
namespace string |
|
||||||
|
|
||||||
mu sync.Mutex |
|
||||||
sub *Subscription |
|
||||||
buffer []json.RawMessage |
|
||||||
callReturned bool |
|
||||||
activated bool |
|
||||||
} |
|
||||||
|
|
||||||
// CreateSubscription returns a new subscription that is coupled to the
|
|
||||||
// RPC connection. By default subscriptions are inactive and notifications
|
|
||||||
// are dropped until the subscription is marked as active. This is done
|
|
||||||
// by the RPC server after the subscription ID is send to the client.
|
|
||||||
func (n *Notifier) CreateSubscription() *Subscription { |
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
if n.sub != nil { |
|
||||||
panic("can't create multiple subscriptions with Notifier") |
|
||||||
} else if n.callReturned { |
|
||||||
panic("can't create subscription after subscribe call has returned") |
|
||||||
} |
|
||||||
n.sub = &Subscription{ID: n.h.idgen(), namespace: n.namespace, err: make(chan error, 1)} |
|
||||||
return n.sub |
|
||||||
} |
|
||||||
|
|
||||||
// Notify sends a notification to the client with the given data as payload.
|
|
||||||
// If an error occurs the RPC connection is closed and the error is returned.
|
|
||||||
func (n *Notifier) Notify(id ID, data interface{}) error { |
|
||||||
enc, err := json.Marshal(data) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
|
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
if n.sub == nil { |
|
||||||
panic("can't Notify before subscription is created") |
|
||||||
} else if n.sub.ID != id { |
|
||||||
panic("Notify with wrong ID") |
|
||||||
} |
|
||||||
if n.activated { |
|
||||||
return n.send(n.sub, enc) |
|
||||||
} |
|
||||||
n.buffer = append(n.buffer, enc) |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
// Closed returns a channel that is closed when the RPC connection is closed.
|
|
||||||
// Deprecated: use subscription error channel
|
|
||||||
func (n *Notifier) Closed() <-chan interface{} { |
|
||||||
return n.h.conn.Closed() |
|
||||||
} |
|
||||||
|
|
||||||
// takeSubscription returns the subscription (if one has been created). No subscription can
|
|
||||||
// be created after this call.
|
|
||||||
func (n *Notifier) takeSubscription() *Subscription { |
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
n.callReturned = true |
|
||||||
return n.sub |
|
||||||
} |
|
||||||
|
|
||||||
// acticate is called after the subscription ID was sent to client. Notifications are
|
|
||||||
// buffered before activation. This prevents notifications being sent to the client before
|
|
||||||
// the subscription ID is sent to the client.
|
|
||||||
func (n *Notifier) activate() error { |
|
||||||
n.mu.Lock() |
|
||||||
defer n.mu.Unlock() |
|
||||||
|
|
||||||
for _, data := range n.buffer { |
|
||||||
if err := n.send(n.sub, data); err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
} |
|
||||||
n.activated = true |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
func (n *Notifier) send(sub *Subscription, data json.RawMessage) error { |
|
||||||
params, _ := json.Marshal(&subscriptionResult{ID: string(sub.ID), Result: data}) |
|
||||||
ctx := context.Background() |
|
||||||
return n.h.conn.Write(ctx, &jsonrpcMessage{ |
|
||||||
Version: vsn, |
|
||||||
Method: n.namespace + notificationMethodSuffix, |
|
||||||
Params: params, |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
// A Subscription is created by a notifier and tight to that notifier. The client can use
|
|
||||||
// this subscription to wait for an unsubscribe request for the client, see Err().
|
|
||||||
type Subscription struct { |
|
||||||
ID ID |
|
||||||
namespace string |
|
||||||
err chan error // closed on unsubscribe
|
|
||||||
} |
|
||||||
|
|
||||||
// Err returns a channel that is closed when the client send an unsubscribe request.
|
|
||||||
func (s *Subscription) Err() <-chan error { |
|
||||||
return s.err |
|
||||||
} |
|
||||||
|
|
||||||
// MarshalJSON marshals a subscription as its ID.
|
|
||||||
func (s *Subscription) MarshalJSON() ([]byte, error) { |
|
||||||
return json.Marshal(s.ID) |
|
||||||
} |
|
||||||
|
|
||||||
// ClientSubscription is a subscription established through the Client's Subscribe or
|
|
||||||
// EthSubscribe methods.
|
|
||||||
type ClientSubscription struct { |
|
||||||
client *Client |
|
||||||
etype reflect.Type |
|
||||||
channel reflect.Value |
|
||||||
namespace string |
|
||||||
subid string |
|
||||||
in chan json.RawMessage |
|
||||||
|
|
||||||
quitOnce sync.Once // ensures quit is closed once
|
|
||||||
quit chan struct{} // quit is closed when the subscription exits
|
|
||||||
errOnce sync.Once // ensures err is closed once
|
|
||||||
err chan error |
|
||||||
} |
|
||||||
|
|
||||||
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { |
|
||||||
sub := &ClientSubscription{ |
|
||||||
client: c, |
|
||||||
namespace: namespace, |
|
||||||
etype: channel.Type().Elem(), |
|
||||||
channel: channel, |
|
||||||
quit: make(chan struct{}), |
|
||||||
err: make(chan error, 1), |
|
||||||
in: make(chan json.RawMessage), |
|
||||||
} |
|
||||||
return sub |
|
||||||
} |
|
||||||
|
|
||||||
// Err returns the subscription error channel. The intended use of Err is to schedule
|
|
||||||
// resubscription when the client connection is closed unexpectedly.
|
|
||||||
//
|
|
||||||
// The error channel receives a value when the subscription has ended due
|
|
||||||
// to an error. The received error is nil if Close has been called
|
|
||||||
// on the underlying client and no other error has occurred.
|
|
||||||
//
|
|
||||||
// The error channel is closed when Unsubscribe is called on the subscription.
|
|
||||||
func (sub *ClientSubscription) Err() <-chan error { |
|
||||||
return sub.err |
|
||||||
} |
|
||||||
|
|
||||||
// Unsubscribe unsubscribes the notification and closes the error channel.
|
|
||||||
// It can safely be called more than once.
|
|
||||||
func (sub *ClientSubscription) Unsubscribe() { |
|
||||||
sub.quitWithError(true, nil) |
|
||||||
sub.errOnce.Do(func() { close(sub.err) }) |
|
||||||
} |
|
||||||
|
|
||||||
func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { |
|
||||||
sub.quitOnce.Do(func() { |
|
||||||
// The dispatch loop won't be able to execute the unsubscribe call
|
|
||||||
// if it is blocked on deliver. Close sub.quit first because it
|
|
||||||
// unblocks deliver.
|
|
||||||
close(sub.quit) |
|
||||||
if unsubscribeServer { |
|
||||||
sub.requestUnsubscribe() |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
if err == errClientQuit { |
|
||||||
err = nil // Adhere to subscription semantics.
|
|
||||||
} |
|
||||||
sub.err <- err |
|
||||||
} |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { |
|
||||||
select { |
|
||||||
case sub.in <- result: |
|
||||||
return true |
|
||||||
case <-sub.quit: |
|
||||||
return false |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func (sub *ClientSubscription) start() { |
|
||||||
sub.quitWithError(sub.forward()) |
|
||||||
} |
|
||||||
|
|
||||||
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { |
|
||||||
cases := []reflect.SelectCase{ |
|
||||||
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, |
|
||||||
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, |
|
||||||
{Dir: reflect.SelectSend, Chan: sub.channel}, |
|
||||||
} |
|
||||||
buffer := list.New() |
|
||||||
defer buffer.Init() |
|
||||||
for { |
|
||||||
var chosen int |
|
||||||
var recv reflect.Value |
|
||||||
if buffer.Len() == 0 { |
|
||||||
// Idle, omit send case.
|
|
||||||
chosen, recv, _ = reflect.Select(cases[:2]) |
|
||||||
} else { |
|
||||||
// Non-empty buffer, send the first queued item.
|
|
||||||
cases[2].Send = reflect.ValueOf(buffer.Front().Value) |
|
||||||
chosen, recv, _ = reflect.Select(cases) |
|
||||||
} |
|
||||||
|
|
||||||
switch chosen { |
|
||||||
case 0: // <-sub.quit
|
|
||||||
return false, nil |
|
||||||
case 1: // <-sub.in
|
|
||||||
val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) |
|
||||||
if err != nil { |
|
||||||
return true, err |
|
||||||
} |
|
||||||
if buffer.Len() == maxClientSubscriptionBuffer { |
|
||||||
return true, errSubscriptionQueueOverflow |
|
||||||
} |
|
||||||
buffer.PushBack(val) |
|
||||||
case 2: // sub.channel<-
|
|
||||||
cases[2].Send = reflect.Value{} // Don't hold onto the value.
|
|
||||||
buffer.Remove(buffer.Front()) |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { |
|
||||||
val := reflect.New(sub.etype) |
|
||||||
err := json.Unmarshal(result, val.Interface()) |
|
||||||
return val.Elem().Interface(), err |
|
||||||
} |
|
||||||
|
|
||||||
func (sub *ClientSubscription) requestUnsubscribe() error { |
|
||||||
var result interface{} |
|
||||||
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid) |
|
||||||
} |
|
@ -1,94 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"context" |
|
||||||
"fmt" |
|
||||||
"math" |
|
||||||
"strings" |
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil" |
|
||||||
) |
|
||||||
|
|
||||||
// API describes the set of methods offered over the RPC interface
|
|
||||||
type API struct { |
|
||||||
Namespace string // namespace under which the rpc methods of Service are exposed
|
|
||||||
Version string // api version for DApp's
|
|
||||||
Service interface{} // receiver instance which holds the methods
|
|
||||||
Public bool // indication if the methods must be considered safe for public use
|
|
||||||
} |
|
||||||
|
|
||||||
// Error wraps RPC errors, which contain an error code in addition to the message.
|
|
||||||
type Error interface { |
|
||||||
Error() string // returns the message
|
|
||||||
ErrorCode() int // returns the code
|
|
||||||
} |
|
||||||
|
|
||||||
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
|
|
||||||
// a RPC session. Implementations must be go-routine safe since the codec can be called in
|
|
||||||
// multiple go-routines concurrently.
|
|
||||||
type ServerCodec interface { |
|
||||||
Read() (msgs []*jsonrpcMessage, isBatch bool, err error) |
|
||||||
Close() |
|
||||||
jsonWriter |
|
||||||
} |
|
||||||
|
|
||||||
// jsonWriter can write JSON messages to its underlying connection.
|
|
||||||
// Implementations must be safe for concurrent use.
|
|
||||||
type jsonWriter interface { |
|
||||||
Write(context.Context, interface{}) error |
|
||||||
// Closed returns a channel which is closed when the connection is closed.
|
|
||||||
Closed() <-chan interface{} |
|
||||||
// RemoteAddr returns the peer address of the connection.
|
|
||||||
RemoteAddr() string |
|
||||||
} |
|
||||||
|
|
||||||
// BlockNumber ...
|
|
||||||
type BlockNumber int64 |
|
||||||
|
|
||||||
const ( |
|
||||||
// PendingBlockNumber ...
|
|
||||||
PendingBlockNumber = BlockNumber(-2) |
|
||||||
latestBlockNumber = BlockNumber(-1) |
|
||||||
earliestBlockNumber = BlockNumber(0) |
|
||||||
) |
|
||||||
|
|
||||||
// UnmarshalJSON parses the given JSON fragment into a BlockNumber. It supports:
|
|
||||||
// - "latest", "earliest" or "pending" as string arguments
|
|
||||||
// - the block number
|
|
||||||
// Returned errors:
|
|
||||||
// - an invalid block number error when the given argument isn't a known strings
|
|
||||||
// - an out of range error when the given block number is either too little or too large
|
|
||||||
func (bn *BlockNumber) UnmarshalJSON(data []byte) error { |
|
||||||
input := strings.TrimSpace(string(data)) |
|
||||||
if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' { |
|
||||||
input = input[1 : len(input)-1] |
|
||||||
} |
|
||||||
|
|
||||||
switch input { |
|
||||||
case "earliest": |
|
||||||
*bn = earliestBlockNumber |
|
||||||
return nil |
|
||||||
case "latest": |
|
||||||
*bn = latestBlockNumber |
|
||||||
return nil |
|
||||||
case "pending": |
|
||||||
*bn = PendingBlockNumber |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
blckNum, err := hexutil.DecodeUint64(input) |
|
||||||
if err != nil { |
|
||||||
return err |
|
||||||
} |
|
||||||
if blckNum > math.MaxInt64 { |
|
||||||
return fmt.Errorf("Blocknumber too high") |
|
||||||
} |
|
||||||
|
|
||||||
*bn = BlockNumber(blckNum) |
|
||||||
return nil |
|
||||||
} |
|
||||||
|
|
||||||
// Int64 turns blockNumber to int64
|
|
||||||
func (bn BlockNumber) Int64() int64 { |
|
||||||
return (int64)(bn) |
|
||||||
} |
|
@ -1,222 +0,0 @@ |
|||||||
package rpc |
|
||||||
|
|
||||||
import ( |
|
||||||
"bytes" |
|
||||||
"context" |
|
||||||
"crypto/tls" |
|
||||||
"encoding/base64" |
|
||||||
"encoding/json" |
|
||||||
"errors" |
|
||||||
"fmt" |
|
||||||
"net" |
|
||||||
"net/http" |
|
||||||
"net/url" |
|
||||||
"os" |
|
||||||
"strings" |
|
||||||
"time" |
|
||||||
|
|
||||||
mapset "github.com/deckarep/golang-set" |
|
||||||
"github.com/ethereum/go-ethereum/log" |
|
||||||
"golang.org/x/net/websocket" |
|
||||||
) |
|
||||||
|
|
||||||
// websocketJSONCodec is a custom JSON codec with payload size enforcement and
|
|
||||||
// special number parsing.
|
|
||||||
var websocketJSONCodec = websocket.Codec{ |
|
||||||
// Marshal is the stock JSON marshaller used by the websocket library too.
|
|
||||||
Marshal: func(v interface{}) ([]byte, byte, error) { |
|
||||||
msg, err := json.Marshal(v) |
|
||||||
return msg, websocket.TextFrame, err |
|
||||||
}, |
|
||||||
// Unmarshal is a specialized unmarshaller to properly convert numbers.
|
|
||||||
Unmarshal: func(msg []byte, payloadType byte, v interface{}) error { |
|
||||||
dec := json.NewDecoder(bytes.NewReader(msg)) |
|
||||||
dec.UseNumber() |
|
||||||
|
|
||||||
return dec.Decode(v) |
|
||||||
}, |
|
||||||
} |
|
||||||
|
|
||||||
// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
|
|
||||||
//
|
|
||||||
// allowedOrigins should be a comma-separated list of allowed origin URLs.
|
|
||||||
// To allow connections with any origin, pass "*".
|
|
||||||
func (s *Server) WebsocketHandler(allowedOrigins []string) http.Handler { |
|
||||||
return websocket.Server{ |
|
||||||
Handshake: wsHandshakeValidator(allowedOrigins), |
|
||||||
Handler: func(conn *websocket.Conn) { |
|
||||||
codec := newWebsocketCodec(conn) |
|
||||||
s.ServeCodec(codec) |
|
||||||
}, |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
func newWebsocketCodec(conn *websocket.Conn) ServerCodec { |
|
||||||
// Create a custom encode/decode pair to enforce payload size and number encoding
|
|
||||||
conn.MaxPayloadBytes = maxRequestContentLength |
|
||||||
encoder := func(v interface{}) error { |
|
||||||
return websocketJSONCodec.Send(conn, v) |
|
||||||
} |
|
||||||
decoder := func(v interface{}) error { |
|
||||||
return websocketJSONCodec.Receive(conn, v) |
|
||||||
} |
|
||||||
rpcconn := Conn(conn) |
|
||||||
if conn.IsServerConn() { |
|
||||||
// Override remote address with the actual socket address because
|
|
||||||
// package websocket crashes if there is no request origin.
|
|
||||||
addr := conn.Request().RemoteAddr |
|
||||||
if wsaddr := conn.RemoteAddr().(*websocket.Addr); wsaddr.URL != nil { |
|
||||||
// Add origin if present.
|
|
||||||
addr += "(" + wsaddr.URL.String() + ")" |
|
||||||
} |
|
||||||
rpcconn = connWithRemoteAddr{conn, addr} |
|
||||||
} |
|
||||||
return NewCodec(rpcconn, encoder, decoder) |
|
||||||
} |
|
||||||
|
|
||||||
// NewWSServer creates a new websocket RPC server around an API provider.
|
|
||||||
//
|
|
||||||
// Deprecated: use Server.WebsocketHandler
|
|
||||||
func NewWSServer(allowedOrigins []string, srv *Server) *http.Server { |
|
||||||
return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} |
|
||||||
} |
|
||||||
|
|
||||||
// wsHandshakeValidator returns a handler that verifies the origin during the
|
|
||||||
// websocket upgrade process. When a '*' is specified as an allowed origins all
|
|
||||||
// connections are accepted.
|
|
||||||
func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error { |
|
||||||
origins := mapset.NewSet() |
|
||||||
allowAllOrigins := false |
|
||||||
|
|
||||||
for _, origin := range allowedOrigins { |
|
||||||
if origin == "*" { |
|
||||||
allowAllOrigins = true |
|
||||||
} |
|
||||||
if origin != "" { |
|
||||||
origins.Add(strings.ToLower(origin)) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// allow localhost if no allowedOrigins are specified.
|
|
||||||
if len(origins.ToSlice()) == 0 { |
|
||||||
origins.Add("http://localhost") |
|
||||||
if hostname, err := os.Hostname(); err == nil { |
|
||||||
origins.Add("http://" + strings.ToLower(hostname)) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
log.Debug(fmt.Sprintf("Allowed origin(s) for WS RPC interface %v", origins.ToSlice())) |
|
||||||
|
|
||||||
f := func(cfg *websocket.Config, req *http.Request) error { |
|
||||||
// Skip origin verification if no Origin header is present. The origin check
|
|
||||||
// is supposed to protect against browser based attacks. Browsers always set
|
|
||||||
// Origin. Non-browser software can put anything in origin and checking it doesn't
|
|
||||||
// provide additional security.
|
|
||||||
if _, ok := req.Header["Origin"]; !ok { |
|
||||||
return nil |
|
||||||
} |
|
||||||
// Verify origin against whitelist.
|
|
||||||
origin := strings.ToLower(req.Header.Get("Origin")) |
|
||||||
if allowAllOrigins || origins.Contains(origin) { |
|
||||||
return nil |
|
||||||
} |
|
||||||
log.Warn("Rejected WebSocket connection", "origin", origin) |
|
||||||
return errors.New("origin not allowed") |
|
||||||
} |
|
||||||
|
|
||||||
return f |
|
||||||
} |
|
||||||
|
|
||||||
func wsGetConfig(endpoint, origin string) (*websocket.Config, error) { |
|
||||||
if origin == "" { |
|
||||||
var err error |
|
||||||
if origin, err = os.Hostname(); err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
if strings.HasPrefix(endpoint, "wss") { |
|
||||||
origin = "https://" + strings.ToLower(origin) |
|
||||||
} else { |
|
||||||
origin = "http://" + strings.ToLower(origin) |
|
||||||
} |
|
||||||
} |
|
||||||
config, err := websocket.NewConfig(endpoint, origin) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
if config.Location.User != nil { |
|
||||||
b64auth := base64.StdEncoding.EncodeToString([]byte(config.Location.User.String())) |
|
||||||
config.Header.Add("Authorization", "Basic "+b64auth) |
|
||||||
config.Location.User = nil |
|
||||||
} |
|
||||||
return config, nil |
|
||||||
} |
|
||||||
|
|
||||||
// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
|
|
||||||
// that is listening on the given endpoint.
|
|
||||||
//
|
|
||||||
// The context is used for the initial connection establishment. It does not
|
|
||||||
// affect subsequent interactions with the client.
|
|
||||||
func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { |
|
||||||
config, err := wsGetConfig(endpoint, origin) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
|
|
||||||
return newClient(ctx, func(ctx context.Context) (ServerCodec, error) { |
|
||||||
conn, err := wsDialContext(ctx, config) |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return newWebsocketCodec(conn), nil |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) { |
|
||||||
var conn net.Conn |
|
||||||
var err error |
|
||||||
switch config.Location.Scheme { |
|
||||||
case "ws": |
|
||||||
conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location)) |
|
||||||
case "wss": |
|
||||||
dialer := contextDialer(ctx) |
|
||||||
conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig) |
|
||||||
default: |
|
||||||
err = websocket.ErrBadScheme |
|
||||||
} |
|
||||||
if err != nil { |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
ws, err := websocket.NewClient(config, conn) |
|
||||||
if err != nil { |
|
||||||
conn.Close() |
|
||||||
return nil, err |
|
||||||
} |
|
||||||
return ws, err |
|
||||||
} |
|
||||||
|
|
||||||
var wsPortMap = map[string]string{"ws": "80", "wss": "443"} |
|
||||||
|
|
||||||
func wsDialAddress(location *url.URL) string { |
|
||||||
if _, ok := wsPortMap[location.Scheme]; ok { |
|
||||||
if _, _, err := net.SplitHostPort(location.Host); err != nil { |
|
||||||
return net.JoinHostPort(location.Host, wsPortMap[location.Scheme]) |
|
||||||
} |
|
||||||
} |
|
||||||
return location.Host |
|
||||||
} |
|
||||||
|
|
||||||
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { |
|
||||||
d := &net.Dialer{KeepAlive: tcpKeepAliveInterval} |
|
||||||
return d.DialContext(ctx, network, addr) |
|
||||||
} |
|
||||||
|
|
||||||
func contextDialer(ctx context.Context) *net.Dialer { |
|
||||||
dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} |
|
||||||
if deadline, ok := ctx.Deadline(); ok { |
|
||||||
dialer.Deadline = deadline |
|
||||||
} else { |
|
||||||
dialer.Deadline = time.Now().Add(defaultDialTimeout) |
|
||||||
} |
|
||||||
return dialer |
|
||||||
} |
|
Loading…
Reference in new issue