Merge pull request #130 from harmony-one/HAR-67_libp2p
[HAR-67] abstract out two version hostspull/139/head
commit
290155b55b
@ -1,183 +0,0 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/gob" |
||||
"fmt" |
||||
"os" |
||||
|
||||
"github.com/harmony-one/harmony/blockchain" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2pv2" |
||||
"github.com/harmony-one/harmony/proto" |
||||
"github.com/harmony-one/harmony/proto/client" |
||||
"github.com/harmony-one/harmony/proto/consensus" |
||||
proto_identity "github.com/harmony-one/harmony/proto/identity" |
||||
proto_node "github.com/harmony-one/harmony/proto/node" |
||||
netp2p "github.com/libp2p/go-libp2p-net" |
||||
) |
||||
|
||||
// NodeHandlerV1 handles a new incoming connection.
|
||||
func (node *Node) NodeHandlerV1(s netp2p.Stream) { |
||||
defer s.Close() |
||||
|
||||
// Read p2p message payload
|
||||
content, err := p2pv2.ReadData(s) |
||||
|
||||
if err != nil { |
||||
node.log.Error("Read p2p data failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
// TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p.
|
||||
node.MaybeBroadcastAsValidator(content) |
||||
|
||||
consensusObj := node.Consensus |
||||
|
||||
msgCategory, err := proto.GetMessageCategory(content) |
||||
if err != nil { |
||||
node.log.Error("Read node type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgType, err := proto.GetMessageType(content) |
||||
if err != nil { |
||||
node.log.Error("Read action type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgPayload, err := proto.GetMessagePayload(content) |
||||
if err != nil { |
||||
node.log.Error("Read message payload failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
switch msgCategory { |
||||
case proto.Identity: |
||||
actionType := proto_identity.IDMessageType(msgType) |
||||
switch actionType { |
||||
case proto_identity.Identity: |
||||
messageType := proto_identity.MessageType(msgPayload[0]) |
||||
switch messageType { |
||||
case proto_identity.Register: |
||||
fmt.Println("received a identity message") |
||||
// TODO(ak): fix it.
|
||||
// node.processPOWMessage(msgPayload)
|
||||
node.log.Info("NET: received message: IDENTITY/REGISTER") |
||||
default: |
||||
node.log.Error("Announce message should be sent to IdentityChain") |
||||
} |
||||
} |
||||
case proto.Consensus: |
||||
actionType := consensus.ConMessageType(msgType) |
||||
switch actionType { |
||||
case consensus.Consensus: |
||||
if consensusObj.IsLeader { |
||||
node.log.Info("NET: received message: Consensus/Leader") |
||||
consensusObj.ProcessMessageLeader(msgPayload) |
||||
} else { |
||||
node.log.Info("NET: received message: Consensus/Validator") |
||||
consensusObj.ProcessMessageValidator(msgPayload) |
||||
} |
||||
} |
||||
case proto.Node: |
||||
actionType := proto_node.MessageType(msgType) |
||||
switch actionType { |
||||
case proto_node.Transaction: |
||||
node.log.Info("NET: received message: Node/Transaction") |
||||
node.transactionMessageHandler(msgPayload) |
||||
case proto_node.Block: |
||||
node.log.Info("NET: received message: Node/Block") |
||||
blockMsgType := proto_node.BlockMessageType(msgPayload[0]) |
||||
switch blockMsgType { |
||||
case proto_node.Sync: |
||||
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type
|
||||
blocks := new([]*blockchain.Block) |
||||
decoder.Decode(blocks) |
||||
if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { |
||||
node.Client.UpdateBlocks(*blocks) |
||||
} |
||||
} |
||||
case proto_node.Client: |
||||
node.log.Info("NET: received message: Node/Client") |
||||
clientMsgType := proto_node.ClientMessageType(msgPayload[0]) |
||||
switch clientMsgType { |
||||
case proto_node.LookupUtxo: |
||||
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type
|
||||
|
||||
fetchUtxoMessage := new(proto_node.FetchUtxoMessage) |
||||
decoder.Decode(fetchUtxoMessage) |
||||
|
||||
utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) |
||||
|
||||
p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) |
||||
} |
||||
case proto_node.Control: |
||||
node.log.Info("NET: received message: Node/Control") |
||||
controlType := msgPayload[0] |
||||
if proto_node.ControlMessageType(controlType) == proto_node.STOP { |
||||
if node.Chain == nil { |
||||
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) |
||||
|
||||
sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() |
||||
node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) |
||||
|
||||
avgBlockSizeInBytes := 0 |
||||
txCount := 0 |
||||
blockCount := 0 |
||||
totalTxCount := 0 |
||||
totalBlockCount := 0 |
||||
avgTxSize := 0 |
||||
|
||||
for _, block := range node.blockchain.Blocks { |
||||
if block.IsStateBlock() { |
||||
totalTxCount += int(block.State.NumTransactions) |
||||
totalBlockCount += int(block.State.NumBlocks) |
||||
} else { |
||||
byteBuffer := bytes.NewBuffer([]byte{}) |
||||
encoder := gob.NewEncoder(byteBuffer) |
||||
encoder.Encode(block) |
||||
avgBlockSizeInBytes += len(byteBuffer.Bytes()) |
||||
|
||||
txCount += len(block.Transactions) |
||||
blockCount++ |
||||
totalTxCount += len(block.TransactionIds) |
||||
totalBlockCount++ |
||||
|
||||
byteBuffer = bytes.NewBuffer([]byte{}) |
||||
encoder = gob.NewEncoder(byteBuffer) |
||||
encoder.Encode(block.Transactions) |
||||
avgTxSize += len(byteBuffer.Bytes()) |
||||
} |
||||
} |
||||
if blockCount != 0 { |
||||
avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount |
||||
avgTxSize = avgTxSize / txCount |
||||
} |
||||
|
||||
node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) |
||||
} else { |
||||
node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) |
||||
} |
||||
|
||||
os.Exit(0) |
||||
} |
||||
case proto_node.PING: |
||||
// Leader receives PING from new node.
|
||||
node.pingMessageHandler(msgPayload) |
||||
case proto_node.PONG: |
||||
// The new node receives back from leader.
|
||||
node.pongMessageHandler(msgPayload) |
||||
} |
||||
case proto.Client: |
||||
actionType := client.MessageType(msgType) |
||||
node.log.Info("NET: received message: Client/Transaction") |
||||
switch actionType { |
||||
case client.Transaction: |
||||
if node.Client != nil { |
||||
node.Client.TransactionMessageHandler(msgPayload) |
||||
} |
||||
} |
||||
default: |
||||
node.log.Error("Unknown", "MsgCateory:", msgCategory) |
||||
} |
||||
} |
@ -0,0 +1,21 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/host" |
||||
) |
||||
|
||||
// SendMessage sends data to ip, port
|
||||
func (node *Node) SendMessage(p p2p.Peer, data []byte) { |
||||
host.SendMessage(node.host, p, data) |
||||
} |
||||
|
||||
// BroadcastMessage broadcasts message to peers
|
||||
func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) { |
||||
host.BroadcastMessage(node.host, peers, data) |
||||
} |
||||
|
||||
// GetHost returns the p2p host
|
||||
func (node *Node) GetHost() host.Host { |
||||
return node.host |
||||
} |
@ -1,55 +0,0 @@ |
||||
package p2p_test |
||||
|
||||
import ( |
||||
"bufio" |
||||
"net" |
||||
"testing" |
||||
|
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
func setUpTestServer(times int, t *testing.T, conCreated chan struct{}) { |
||||
t.Parallel() |
||||
ln, _ := net.Listen("tcp", ":8081") |
||||
conCreated <- struct{}{} |
||||
conn, _ := ln.Accept() |
||||
defer conn.Close() |
||||
|
||||
var ( |
||||
w = bufio.NewWriter(conn) |
||||
) |
||||
for times > 0 { |
||||
times-- |
||||
data, err := p2p.ReadMessageContent(conn) |
||||
if err != nil { |
||||
t.Fatalf("error when ReadMessageContent %v", err) |
||||
} |
||||
data = p2p.CreateMessage(byte(1), data) |
||||
w.Write(data) |
||||
w.Flush() |
||||
} |
||||
} |
||||
func TestNewNewNode(t *testing.T) { |
||||
times := 100 |
||||
conCreated := make(chan struct{}) |
||||
go setUpTestServer(times, t, conCreated) |
||||
<-conCreated |
||||
|
||||
conn, _ := net.Dial("tcp", "127.0.0.1:8081") |
||||
defer conn.Close() |
||||
|
||||
for times > 0 { |
||||
times-- |
||||
|
||||
myMsg := "minhdoan" |
||||
p2p.SendMessageContent(conn, []byte(myMsg)) |
||||
|
||||
data, err := p2p.ReadMessageContent(conn) |
||||
if err != nil { |
||||
t.Error("got an error when trying to receive an expected message from server.") |
||||
} |
||||
if string(data) != myMsg { |
||||
t.Error("did not receive expected message") |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,13 @@ |
||||
package host |
||||
|
||||
import ( |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
// Host is the client + server in p2p network.
|
||||
type Host interface { |
||||
GetSelfPeer() p2p.Peer |
||||
SendMessage(p2p.Peer, []byte) error |
||||
BindHandlerAndServe(handler p2p.StreamHandler) |
||||
Close() error |
||||
} |
@ -0,0 +1,99 @@ |
||||
package hostv1 |
||||
|
||||
import ( |
||||
"io" |
||||
"net" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
// HostV1 is the version 1 p2p host, using direct socket call.
|
||||
type HostV1 struct { |
||||
self p2p.Peer |
||||
listener net.Listener |
||||
quit chan bool |
||||
} |
||||
|
||||
// New creates a HostV1
|
||||
func New(self p2p.Peer) *HostV1 { |
||||
h := &HostV1{ |
||||
self: self, |
||||
quit: make(chan bool, 1), |
||||
} |
||||
return h |
||||
} |
||||
|
||||
// GetSelfPeer gets self peer
|
||||
func (host *HostV1) GetSelfPeer() p2p.Peer { |
||||
return host.self |
||||
} |
||||
|
||||
// BindHandlerAndServe Version 0 p2p. Going to be deprecated.
|
||||
func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { |
||||
port := host.self.Port |
||||
addr := net.JoinHostPort(host.self.IP, port) |
||||
var err error |
||||
host.listener, err = net.Listen("tcp4", addr) |
||||
if err != nil { |
||||
log.Error("Socket listen port failed", "addr", addr, "err", err) |
||||
return |
||||
} |
||||
if host.listener == nil { |
||||
log.Error("Listen returned nil", "addr", addr) |
||||
return |
||||
} |
||||
backoff := p2p.NewExpBackoff(250*time.Millisecond, 15*time.Second, 2.0) |
||||
for { // Keep listening
|
||||
select { |
||||
case <-host.quit: |
||||
return |
||||
default: |
||||
{ |
||||
conn, err := host.listener.Accept() |
||||
if err != nil { |
||||
log.Error("Error listening on port.", "port", port, |
||||
"err", err) |
||||
backoff.Sleep() |
||||
continue |
||||
} |
||||
// log.Debug("Received New connection", "local", conn.LocalAddr(), "remote", conn.RemoteAddr())
|
||||
go handler(conn) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// SendMessage sends message to peer
|
||||
func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { |
||||
addr := net.JoinHostPort(peer.IP, peer.Port) |
||||
conn, err := net.Dial("tcp", addr) |
||||
// log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr)
|
||||
|
||||
if err != nil { |
||||
log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) |
||||
return |
||||
} |
||||
defer conn.Close() |
||||
|
||||
nw, err := conn.Write(message) |
||||
if err != nil { |
||||
log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) |
||||
return |
||||
} |
||||
if nw < len(message) { |
||||
log.Warn("Write() returned short count", |
||||
"addr", conn.RemoteAddr(), "actual", nw, "expected", len(message)) |
||||
return io.ErrShortWrite |
||||
} |
||||
|
||||
// No ack (reply) message from the receiver for now.
|
||||
return |
||||
} |
||||
|
||||
// Close closes the host
|
||||
func (host *HostV1) Close() error { |
||||
host.quit <- true |
||||
return host.listener.Close() |
||||
} |
@ -0,0 +1,94 @@ |
||||
package hostv2 |
||||
|
||||
import ( |
||||
"bufio" |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
libp2p "github.com/libp2p/go-libp2p" |
||||
libp2phost "github.com/libp2p/go-libp2p-host" |
||||
net "github.com/libp2p/go-libp2p-net" |
||||
peer "github.com/libp2p/go-libp2p-peer" |
||||
peerstore "github.com/libp2p/go-libp2p-peerstore" |
||||
multiaddr "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
const ( |
||||
// BatchSizeInByte The batch size in byte (64MB) in which we return data
|
||||
BatchSizeInByte = 1 << 16 |
||||
// ProtocolID The ID of protocol used in stream handling.
|
||||
ProtocolID = "/harmony/0.0.1" |
||||
) |
||||
|
||||
// HostV2 is the version 2 p2p host
|
||||
type HostV2 struct { |
||||
h libp2phost.Host |
||||
self p2p.Peer |
||||
} |
||||
|
||||
// Peerstore returns the peer store
|
||||
func (host *HostV2) Peerstore() peerstore.Peerstore { |
||||
return host.h.Peerstore() |
||||
} |
||||
|
||||
// New creates a host for p2p communication
|
||||
func New(self p2p.Peer) *HostV2 { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", self.IP, self.Port) |
||||
sourceAddr, err := multiaddr.NewMultiaddr(addr) |
||||
catchError(err) |
||||
priv := addrToPrivKey(addr) |
||||
p2pHost, err := libp2p.New(context.Background(), |
||||
libp2p.ListenAddrs(sourceAddr), |
||||
libp2p.Identity(priv), |
||||
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves.
|
||||
// TODO(ricl): Other features to probe
|
||||
// libp2p.EnableRelay; libp2p.Routing;
|
||||
) |
||||
catchError(err) |
||||
log.Debug("Host is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addrs", sourceAddr) |
||||
h := &HostV2{ |
||||
h: p2pHost, |
||||
self: self, |
||||
} |
||||
return h |
||||
} |
||||
|
||||
// GetSelfPeer gets self peer
|
||||
func (host *HostV2) GetSelfPeer() p2p.Peer { |
||||
return host.self |
||||
} |
||||
|
||||
// BindHandlerAndServe bind a streamHandler to the harmony protocol.
|
||||
func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { |
||||
host.h.SetStreamHandler(ProtocolID, func(s net.Stream) { |
||||
handler(s) |
||||
}) |
||||
// Hang forever
|
||||
<-make(chan struct{}) |
||||
} |
||||
|
||||
// SendMessage a p2p message sending function with signature compatible to p2pv1.
|
||||
func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port) |
||||
targetAddr, err := multiaddr.NewMultiaddr(addr) |
||||
|
||||
priv := addrToPrivKey(addr) |
||||
peerID, _ := peer.IDFromPrivateKey(priv) |
||||
host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) |
||||
s, err := host.h.NewStream(context.Background(), peerID, ProtocolID) |
||||
catchError(err) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
w := bufio.NewWriter(bufio.NewWriter(s)) |
||||
|
||||
// Create a thread to read and write data.
|
||||
go writeData(w, message) |
||||
return nil |
||||
} |
||||
|
||||
// Close closes the host
|
||||
func (host *HostV2) Close() error { |
||||
return host.h.Close() |
||||
} |
@ -1,4 +1,4 @@ |
||||
package p2pv2 |
||||
package hostv2 |
||||
|
||||
import ( |
||||
"bufio" |
@ -0,0 +1,28 @@ |
||||
package p2p |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/dedis/kyber" |
||||
) |
||||
|
||||
// Stream is abstract p2p stream from where we read message
|
||||
type Stream interface { |
||||
Read([]byte) (int, error) |
||||
Write([]byte) (int, error) |
||||
Close() error |
||||
SetReadDeadline(time.Time) error |
||||
} |
||||
|
||||
// StreamHandler handles incoming p2p message.
|
||||
type StreamHandler func(Stream) |
||||
|
||||
// Peer is the object for a p2p peer (node)
|
||||
type Peer struct { |
||||
IP string // IP address of the peer
|
||||
Port string // Port number of the peer
|
||||
PubKey kyber.Point // Public key of the peer
|
||||
Ready bool // Ready is true if the peer is ready to join consensus.
|
||||
ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard
|
||||
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available.
|
||||
} |
@ -0,0 +1,25 @@ |
||||
package p2pimpl |
||||
|
||||
import ( |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/host" |
||||
"github.com/harmony-one/harmony/p2p/host/hostv1" |
||||
"github.com/harmony-one/harmony/p2p/host/hostv2" |
||||
) |
||||
|
||||
// Version The version number of p2p library
|
||||
// 1 - Direct socket connection
|
||||
// 2 - libp2p
|
||||
const Version = 1 |
||||
|
||||
// NewHost starts the host
|
||||
func NewHost(peer p2p.Peer) host.Host { |
||||
// log.Debug("New Host", "ip/port", net.JoinHostPort(peer.IP, peer.Port))
|
||||
if Version == 1 { |
||||
h := hostv1.New(peer) |
||||
return h |
||||
} |
||||
|
||||
h := hostv2.New(peer) |
||||
return h |
||||
} |
@ -1,146 +0,0 @@ |
||||
package p2pv2 |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
libp2p "github.com/libp2p/go-libp2p" |
||||
host "github.com/libp2p/go-libp2p-host" |
||||
net "github.com/libp2p/go-libp2p-net" |
||||
peer "github.com/libp2p/go-libp2p-peer" |
||||
peerstore "github.com/libp2p/go-libp2p-peerstore" |
||||
multiaddr "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
var ( |
||||
myHost host.Host // TODO(ricl): this should be a field in node.
|
||||
) |
||||
|
||||
const ( |
||||
// BatchSizeInByte The batch size in byte (64MB) in which we return data
|
||||
BatchSizeInByte = 1 << 16 |
||||
// ProtocolID The ID of protocol used in stream handling.
|
||||
ProtocolID = "/harmony/0.0.1" |
||||
) |
||||
|
||||
// InitHost Initialize a host for p2p communication
|
||||
func InitHost(ip, port string) { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) |
||||
sourceAddr, err := multiaddr.NewMultiaddr(addr) |
||||
catchError(err) |
||||
priv := addrToPrivKey(addr) |
||||
myHost, err = libp2p.New(context.Background(), |
||||
libp2p.ListenAddrs(sourceAddr), |
||||
libp2p.Identity(priv), |
||||
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves.
|
||||
// TODO(ricl): Other features to probe
|
||||
// libp2p.EnableRelay; libp2p.Routing;
|
||||
) |
||||
catchError(err) |
||||
log.Debug("Host is up!", "port", port, "id", myHost.ID().Pretty(), "addrs", sourceAddr) |
||||
} |
||||
|
||||
// BindHandler bind a streamHandler to the harmony protocol.
|
||||
func BindHandler(handler net.StreamHandler) { |
||||
myHost.SetStreamHandler(ProtocolID, handler) |
||||
} |
||||
|
||||
// Send a p2p message sending function with signature compatible to p2pv1.
|
||||
func Send(ip, port string, message []byte) error { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) |
||||
targetAddr, err := multiaddr.NewMultiaddr(addr) |
||||
|
||||
priv := addrToPrivKey(addr) |
||||
peerID, _ := peer.IDFromPrivateKey(priv) |
||||
myHost.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) |
||||
s, err := myHost.NewStream(context.Background(), peerID, ProtocolID) |
||||
catchError(err) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
w := bufio.NewWriter(bufio.NewWriter(s)) |
||||
|
||||
// Create a thread to read and write data.
|
||||
go writeData(w, message) |
||||
return nil |
||||
} |
||||
|
||||
// ReadData Call this function in streamHandler to get the binary data.
|
||||
func ReadData(s net.Stream) ([]byte, error) { |
||||
timeoutDuration := 1 * time.Second |
||||
s.SetReadDeadline(time.Now().Add(timeoutDuration)) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) |
||||
|
||||
contentBuf := bytes.NewBuffer([]byte{}) |
||||
// Read 1 byte for message type
|
||||
_, err := rw.ReadByte() |
||||
switch err { |
||||
case nil: |
||||
//log.Printf("Received p2p message type: %x\n", msgType)
|
||||
case io.EOF: |
||||
fallthrough |
||||
default: |
||||
log.Error("Error reading the p2p message type field", "err", err) |
||||
return contentBuf.Bytes(), err |
||||
} |
||||
// TODO: check on msgType and take actions accordingly
|
||||
|
||||
// Read 4 bytes for message size
|
||||
fourBytes := make([]byte, 4) |
||||
n, err := rw.Read(fourBytes) |
||||
if err != nil { |
||||
log.Error("Error reading the p2p message size field", "err", err) |
||||
return contentBuf.Bytes(), err |
||||
} else if n < len(fourBytes) { |
||||
log.Error("Invalid byte size", "bytes", n) |
||||
return contentBuf.Bytes(), err |
||||
} |
||||
|
||||
//log.Print(fourBytes)
|
||||
// Number of bytes for the message content
|
||||
bytesToRead := binary.BigEndian.Uint32(fourBytes) |
||||
//log.Printf("The content size is %d bytes.", bytesToRead)
|
||||
|
||||
// Read the content in chunk of size `BatchSizeInByte`
|
||||
tmpBuf := make([]byte, BatchSizeInByte) |
||||
ILOOP: |
||||
for { |
||||
// TODO(ricl): is this necessary? If yes, figure out how to make it work
|
||||
// timeoutDuration := 10 * time.Second
|
||||
// s.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if bytesToRead < BatchSizeInByte { |
||||
// Read the last number of bytes less than `BatchSizeInByte`
|
||||
tmpBuf = make([]byte, bytesToRead) |
||||
} |
||||
n, err := rw.Read(tmpBuf) |
||||
contentBuf.Write(tmpBuf[:n]) |
||||
|
||||
switch err { |
||||
case io.EOF: |
||||
// TODO: should we return error here, or just ignore it?
|
||||
log.Error("EOF reached while reading p2p message") |
||||
break ILOOP |
||||
case nil: |
||||
bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop
|
||||
if bytesToRead <= 0 { |
||||
break ILOOP |
||||
} |
||||
default: |
||||
log.Error("Error reading p2p message") |
||||
return []byte{}, err |
||||
} |
||||
} |
||||
return contentBuf.Bytes(), nil |
||||
} |
||||
|
||||
// GetHost Get the p2p host
|
||||
func GetHost() host.Host { |
||||
return myHost |
||||
} |
Loading…
Reference in new issue