diff --git a/p2p/helper.go b/p2p/helper.go index 9196f2a2a..bcada959d 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -2,7 +2,6 @@ package p2p import ( "bufio" - "bytes" "encoding/binary" "io" "time" @@ -28,46 +27,39 @@ content (n bytes) - actual message content const BatchSizeInByte = 1 << 16 // ReadMessageContent reads the message type and content size, and return the actual content. -func ReadMessageContent(s Stream) ([]byte, error) { +func ReadMessageContent(s Stream) (content []byte, err error) { var ( - contentBuf = bytes.NewBuffer([]byte{}) - r = bufio.NewReader(s) + r = bufio.NewReader(s) ) timeoutDuration := 1 * time.Second - s.SetReadDeadline(time.Now().Add(timeoutDuration)) + if err = s.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil { + log.Error("cannot reset deadline for message header read", "error", err) + return + } //// Read 1 byte for message type - _, err := r.ReadByte() - switch err { - case io.EOF: - log.Error("Error reading the p2p message type field", "io.EOF", err) - return contentBuf.Bytes(), err - case nil: - //log.Printf("Received p2p message type: %x\n", msgType) - default: - log.Error("Error reading the p2p message type field", "msg", err) - return contentBuf.Bytes(), err + if _, err = r.ReadByte(); err != nil { + log.Error("failed to read p2p message type field", "error", err) + return } // TODO: check on msgType and take actions accordingly //// Read 4 bytes for message size fourBytes := make([]byte, 4) - n, err := r.Read(fourBytes) - if err != nil { - log.Error("Error reading the p2p message size field") - return contentBuf.Bytes(), err - } else if n < len(fourBytes) { - log.Error("Failed reading the p2p message size field: only read", "Num of bytes", n) - return contentBuf.Bytes(), err + if _, err = io.ReadFull(r, fourBytes); err != nil { + log.Error("failed to read p2p message size field", "error", err) + return } contentLength := int(binary.BigEndian.Uint32(fourBytes)) - tmpBuf := make([]byte, contentLength) + contentBuf := make([]byte, contentLength) timeoutDuration = 20 * time.Second - s.SetReadDeadline(time.Now().Add(timeoutDuration)) - m, err := io.ReadFull(r, tmpBuf) - if err != nil || m < contentLength { - log.Error("Read %v bytes, we need %v bytes", m, contentLength) - return []byte{}, err + if err = s.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil { + log.Error("cannot reset deadline for message content read", "error", err) + return + } + if _, err = io.ReadFull(r, contentBuf); err != nil { + log.Error("failed to read p2p message contents", "error", err) + return } - contentBuf.Write(tmpBuf) - return contentBuf.Bytes(), nil + content = contentBuf + return } diff --git a/p2p/host/hostv1/hostv1.go b/p2p/host/hostv1/hostv1.go index d8ac3e439..8ff031607 100644 --- a/p2p/host/hostv1/hostv1.go +++ b/p2p/host/hostv1/hostv1.go @@ -80,24 +80,25 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { } // SendMessage sends message to peer -func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { +func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) error { + logger := log.New("from", host.self, "to", peer, "PeerID", peer.PeerID) addr := net.JoinHostPort(peer.IP, peer.Port) conn, err := net.Dial("tcp", addr) - if err != nil { - log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) - return fmt.Errorf("Dail Failed") + logger.Warn("Dial() failed", "address", addr, "error", err) + return fmt.Errorf("Dial(%s) failed: %v", addr, err) } - defer conn.Close() + defer func() { + if err := conn.Close(); err != nil { + logger.Warn("Close() failed", "error", err) + } + }() - nw, err := conn.Write(message) - if err != nil { - log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) - return fmt.Errorf("Write Failed") - } - if nw < len(message) { - log.Warn("Write() returned short count", - "addr", conn.RemoteAddr(), "actual", nw, "expected", len(message)) + if nw, err := conn.Write(message); err != nil { + logger.Warn("Write() failed", "error", err) + return fmt.Errorf("Write() failed: %v", err) + } else if nw < len(message) { + logger.Warn("Short Write()", "actual", nw, "expected", len(message)) return io.ErrShortWrite } diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index 8f07027e5..80f9157ed 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -3,6 +3,7 @@ package hostv2 import ( "context" "fmt" + "io" "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/harmony/p2p" @@ -108,13 +109,21 @@ func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { // SendMessage a p2p message sending function with signature compatible to p2pv1. func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { + logger := log.New("from", host.self, "to", p, "PeerID", p.PeerID) s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID) if err != nil { - log.Error("Failed to send message", "from", host.self, "to", p, "error", err, "PeerID", p.PeerID) - return err + logger.Error("NewStream() failed", "peerID", p.PeerID, + "protocolID", ProtocolID, "error", err) + return fmt.Errorf("NewStream(%v, %v) failed: %v", p.PeerID, + ProtocolID, err) + } + if nw, err := s.Write(message); err != nil { + logger.Error("Write() failed", "error", err) + return fmt.Errorf("Write() failed: %v", err) + } else if nw < len(message) { + logger.Error("Short Write()", "expected", len(message), "actual", nw) + return io.ErrShortWrite } - - s.Write(message) return nil } diff --git a/p2p/host/hostv2/util.go b/p2p/host/hostv2/util.go index 3c3307625..e72c4ce28 100644 --- a/p2p/host/hostv2/util.go +++ b/p2p/host/hostv2/util.go @@ -1,8 +1,6 @@ package hostv2 import ( - "bufio" - "github.com/ethereum/go-ethereum/log" ) @@ -12,8 +10,3 @@ func catchError(err error) { panic(err) } } - -func writeData(w *bufio.Writer, data []byte) { - w.Write(data) - w.Flush() -} diff --git a/p2p/host/message.go b/p2p/host/message.go index 26b9a3fce..c19e1375e 100644 --- a/p2p/host/message.go +++ b/p2p/host/message.go @@ -1,7 +1,6 @@ package host import ( - "bytes" "encoding/binary" "net" "runtime" @@ -52,17 +51,11 @@ func BroadcastMessageFromLeader(h p2p.Host, peers []p2p.Peer, msg []byte, lostPe // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] func ConstructP2pMessage(msgType byte, content []byte) []byte { - - firstByte := byte(17) // messageType 0x11 - sizeBytes := make([]byte, 4) // contentSize - - binary.BigEndian.PutUint32(sizeBytes, uint32(len(content))) - - byteBuffer := bytes.NewBuffer([]byte{}) - byteBuffer.WriteByte(firstByte) - byteBuffer.Write(sizeBytes) - byteBuffer.Write(content) - return byteBuffer.Bytes() + message := make([]byte, 5+len(content)) + message[0] = 17 // messageType 0x11 + binary.BigEndian.PutUint32(message[1:5], uint32(len(content))) + copy(message[5:], content) + return message } // BroadcastMessageFromValidator sends the message to a list of peers from a validator. @@ -93,9 +86,7 @@ func send(h p2p.Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) { backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2) for trial := 0; trial < 10; trial++ { - var err error - err = h.SendMessage(peer, message) - if err == nil { + if err := h.SendMessage(peer, message); err == nil { if trial > 0 { log.Warn("retry send", "rety", trial) } diff --git a/p2p/host/message_test.go b/p2p/host/message_test.go index 694d59cb9..197a5ff81 100644 --- a/p2p/host/message_test.go +++ b/p2p/host/message_test.go @@ -32,7 +32,9 @@ func TestSendMessage(test *testing.T) { host2 := hostv2.New(peer2, priKey2) msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04} - host1.AddPeer(&peer2) + if err := host1.AddPeer(&peer2); err != nil { + test.Fatalf("cannot add peer2 to host1: %v", err) + } go host2.BindHandlerAndServe(handler) SendMessage(host1, peer2, msg, nil) @@ -40,7 +42,11 @@ func TestSendMessage(test *testing.T) { } func handler(s p2p.Stream) { - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + panic(fmt.Sprintf("Close(%v) failed: %v", s, err)) + } + }() content, err := p2p.ReadMessageContent(s) if err != nil { panic("Read p2p data failed") diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index 2155523a6..c6e023ab5 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -5,10 +5,11 @@ package mock import ( + reflect "reflect" + gomock "github.com/golang/mock/gomock" p2p "github.com/harmony-one/harmony/p2p" peer "github.com/libp2p/go-libp2p-peer" - reflect "reflect" ) // MockHost is a mock of Host interface diff --git a/p2p/mock_stream.go b/p2p/mock_stream.go index 6046d6fd5..27ce7fac5 100644 --- a/p2p/mock_stream.go +++ b/p2p/mock_stream.go @@ -5,9 +5,10 @@ package p2p import ( - gomock "github.com/golang/mock/gomock" reflect "reflect" time "time" + + gomock "github.com/golang/mock/gomock" ) // MockStream is a mock of Stream interface