gometalinter p2p/...

Mostly fixes error handling, but there are a few functional changes:

* ReadMessageContent() returns an error upon SetReadDeadline() failures.
* hostv2 SendMessage() returns an error upon write failures.
pull/304/head
Eugene Kim 6 years ago
parent 8edebd2250
commit 1e39837f5a
  1. 52
      p2p/helper.go
  2. 27
      p2p/host/hostv1/hostv1.go
  3. 17
      p2p/host/hostv2/hostv2.go
  4. 7
      p2p/host/hostv2/util.go
  5. 21
      p2p/host/message.go
  6. 10
      p2p/host/message_test.go
  7. 3
      p2p/host/mock/host_mock.go
  8. 3
      p2p/mock_stream.go

@ -2,7 +2,6 @@ package p2p
import ( import (
"bufio" "bufio"
"bytes"
"encoding/binary" "encoding/binary"
"io" "io"
"time" "time"
@ -28,46 +27,39 @@ content (n bytes) - actual message content
const BatchSizeInByte = 1 << 16 const BatchSizeInByte = 1 << 16
// ReadMessageContent reads the message type and content size, and return the actual content. // ReadMessageContent reads the message type and content size, and return the actual content.
func ReadMessageContent(s Stream) ([]byte, error) { func ReadMessageContent(s Stream) (content []byte, err error) {
var ( var (
contentBuf = bytes.NewBuffer([]byte{}) r = bufio.NewReader(s)
r = bufio.NewReader(s)
) )
timeoutDuration := 1 * time.Second timeoutDuration := 1 * time.Second
s.SetReadDeadline(time.Now().Add(timeoutDuration)) if err = s.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil {
log.Error("cannot reset deadline for message header read", "error", err)
return
}
//// Read 1 byte for message type //// Read 1 byte for message type
_, err := r.ReadByte() if _, err = r.ReadByte(); err != nil {
switch err { log.Error("failed to read p2p message type field", "error", err)
case io.EOF: return
log.Error("Error reading the p2p message type field", "io.EOF", err)
return contentBuf.Bytes(), err
case nil:
//log.Printf("Received p2p message type: %x\n", msgType)
default:
log.Error("Error reading the p2p message type field", "msg", err)
return contentBuf.Bytes(), err
} }
// TODO: check on msgType and take actions accordingly // TODO: check on msgType and take actions accordingly
//// Read 4 bytes for message size //// Read 4 bytes for message size
fourBytes := make([]byte, 4) fourBytes := make([]byte, 4)
n, err := r.Read(fourBytes) if _, err = io.ReadFull(r, fourBytes); err != nil {
if err != nil { log.Error("failed to read p2p message size field", "error", err)
log.Error("Error reading the p2p message size field") return
return contentBuf.Bytes(), err
} else if n < len(fourBytes) {
log.Error("Failed reading the p2p message size field: only read", "Num of bytes", n)
return contentBuf.Bytes(), err
} }
contentLength := int(binary.BigEndian.Uint32(fourBytes)) contentLength := int(binary.BigEndian.Uint32(fourBytes))
tmpBuf := make([]byte, contentLength) contentBuf := make([]byte, contentLength)
timeoutDuration = 20 * time.Second timeoutDuration = 20 * time.Second
s.SetReadDeadline(time.Now().Add(timeoutDuration)) if err = s.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil {
m, err := io.ReadFull(r, tmpBuf) log.Error("cannot reset deadline for message content read", "error", err)
if err != nil || m < contentLength { return
log.Error("Read %v bytes, we need %v bytes", m, contentLength) }
return []byte{}, err if _, err = io.ReadFull(r, contentBuf); err != nil {
log.Error("failed to read p2p message contents", "error", err)
return
} }
contentBuf.Write(tmpBuf) content = contentBuf
return contentBuf.Bytes(), nil return
} }

@ -80,24 +80,25 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) {
} }
// SendMessage sends message to peer // SendMessage sends message to peer
func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) error {
logger := log.New("from", host.self, "to", peer, "PeerID", peer.PeerID)
addr := net.JoinHostPort(peer.IP, peer.Port) addr := net.JoinHostPort(peer.IP, peer.Port)
conn, err := net.Dial("tcp", addr) conn, err := net.Dial("tcp", addr)
if err != nil { if err != nil {
log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) logger.Warn("Dial() failed", "address", addr, "error", err)
return fmt.Errorf("Dail Failed") return fmt.Errorf("Dial(%s) failed: %v", addr, err)
} }
defer conn.Close() defer func() {
if err := conn.Close(); err != nil {
logger.Warn("Close() failed", "error", err)
}
}()
nw, err := conn.Write(message) if nw, err := conn.Write(message); err != nil {
if err != nil { logger.Warn("Write() failed", "error", err)
log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) return fmt.Errorf("Write() failed: %v", err)
return fmt.Errorf("Write Failed") } else if nw < len(message) {
} logger.Warn("Short Write()", "actual", nw, "expected", len(message))
if nw < len(message) {
log.Warn("Write() returned short count",
"addr", conn.RemoteAddr(), "actual", nw, "expected", len(message))
return io.ErrShortWrite return io.ErrShortWrite
} }

@ -3,6 +3,7 @@ package hostv2
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -108,13 +109,21 @@ func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) {
// SendMessage a p2p message sending function with signature compatible to p2pv1. // SendMessage a p2p message sending function with signature compatible to p2pv1.
func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error {
logger := log.New("from", host.self, "to", p, "PeerID", p.PeerID)
s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID) s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID)
if err != nil { if err != nil {
log.Error("Failed to send message", "from", host.self, "to", p, "error", err, "PeerID", p.PeerID) logger.Error("NewStream() failed", "peerID", p.PeerID,
return err "protocolID", ProtocolID, "error", err)
return fmt.Errorf("NewStream(%v, %v) failed: %v", p.PeerID,
ProtocolID, err)
}
if nw, err := s.Write(message); err != nil {
logger.Error("Write() failed", "error", err)
return fmt.Errorf("Write() failed: %v", err)
} else if nw < len(message) {
logger.Error("Short Write()", "expected", len(message), "actual", nw)
return io.ErrShortWrite
} }
s.Write(message)
return nil return nil
} }

@ -1,8 +1,6 @@
package hostv2 package hostv2
import ( import (
"bufio"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
@ -12,8 +10,3 @@ func catchError(err error) {
panic(err) panic(err)
} }
} }
func writeData(w *bufio.Writer, data []byte) {
w.Write(data)
w.Flush()
}

@ -1,7 +1,6 @@
package host package host
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"net" "net"
"runtime" "runtime"
@ -52,17 +51,11 @@ func BroadcastMessageFromLeader(h p2p.Host, peers []p2p.Peer, msg []byte, lostPe
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte { func ConstructP2pMessage(msgType byte, content []byte) []byte {
message := make([]byte, 5+len(content))
firstByte := byte(17) // messageType 0x11 message[0] = 17 // messageType 0x11
sizeBytes := make([]byte, 4) // contentSize binary.BigEndian.PutUint32(message[1:5], uint32(len(content)))
copy(message[5:], content)
binary.BigEndian.PutUint32(sizeBytes, uint32(len(content))) return message
byteBuffer := bytes.NewBuffer([]byte{})
byteBuffer.WriteByte(firstByte)
byteBuffer.Write(sizeBytes)
byteBuffer.Write(content)
return byteBuffer.Bytes()
} }
// BroadcastMessageFromValidator sends the message to a list of peers from a validator. // BroadcastMessageFromValidator sends the message to a list of peers from a validator.
@ -93,9 +86,7 @@ func send(h p2p.Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2) backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2)
for trial := 0; trial < 10; trial++ { for trial := 0; trial < 10; trial++ {
var err error if err := h.SendMessage(peer, message); err == nil {
err = h.SendMessage(peer, message)
if err == nil {
if trial > 0 { if trial > 0 {
log.Warn("retry send", "rety", trial) log.Warn("retry send", "rety", trial)
} }

@ -32,7 +32,9 @@ func TestSendMessage(test *testing.T) {
host2 := hostv2.New(peer2, priKey2) host2 := hostv2.New(peer2, priKey2)
msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04} msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04}
host1.AddPeer(&peer2) if err := host1.AddPeer(&peer2); err != nil {
test.Fatalf("cannot add peer2 to host1: %v", err)
}
go host2.BindHandlerAndServe(handler) go host2.BindHandlerAndServe(handler)
SendMessage(host1, peer2, msg, nil) SendMessage(host1, peer2, msg, nil)
@ -40,7 +42,11 @@ func TestSendMessage(test *testing.T) {
} }
func handler(s p2p.Stream) { func handler(s p2p.Stream) {
defer s.Close() defer func() {
if err := s.Close(); err != nil {
panic(fmt.Sprintf("Close(%v) failed: %v", s, err))
}
}()
content, err := p2p.ReadMessageContent(s) content, err := p2p.ReadMessageContent(s)
if err != nil { if err != nil {
panic("Read p2p data failed") panic("Read p2p data failed")

@ -5,10 +5,11 @@
package mock package mock
import ( import (
reflect "reflect"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
p2p "github.com/harmony-one/harmony/p2p" p2p "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
reflect "reflect"
) )
// MockHost is a mock of Host interface // MockHost is a mock of Host interface

@ -5,9 +5,10 @@
package p2p package p2p
import ( import (
gomock "github.com/golang/mock/gomock"
reflect "reflect" reflect "reflect"
time "time" time "time"
gomock "github.com/golang/mock/gomock"
) )
// MockStream is a mock of Stream interface // MockStream is a mock of Stream interface

Loading…
Cancel
Save