commit
1aa5cc8ea9
@ -1,21 +1,18 @@ |
||||
package beaconchain |
||||
|
||||
import ( |
||||
"net" |
||||
|
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/proto" |
||||
proto_identity "github.com/harmony-one/harmony/proto/identity" |
||||
) |
||||
|
||||
// BeaconChainHandler handles registration of new Identities
|
||||
func (bc *BeaconChain) BeaconChainHandler(conn net.Conn) { |
||||
content, err := p2p.ReadMessageContent(conn) |
||||
func (bc *BeaconChain) BeaconChainHandler(s p2p.Stream) { |
||||
content, err := p2p.ReadMessageContent(s) |
||||
if err != nil { |
||||
bc.log.Error("Read p2p data failed") |
||||
return |
||||
} |
||||
bc.log.Info("received connection", "connectionIp", conn.RemoteAddr()) |
||||
msgCategory, err := proto.GetMessageCategory(content) |
||||
if err != nil { |
||||
bc.log.Error("Read message category failed", "err", err) |
@ -1,183 +0,0 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/gob" |
||||
"fmt" |
||||
"os" |
||||
|
||||
"github.com/harmony-one/harmony/blockchain" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2pv2" |
||||
"github.com/harmony-one/harmony/proto" |
||||
"github.com/harmony-one/harmony/proto/client" |
||||
"github.com/harmony-one/harmony/proto/consensus" |
||||
proto_identity "github.com/harmony-one/harmony/proto/identity" |
||||
proto_node "github.com/harmony-one/harmony/proto/node" |
||||
netp2p "github.com/libp2p/go-libp2p-net" |
||||
) |
||||
|
||||
// NodeHandlerV1 handles a new incoming connection.
|
||||
func (node *Node) NodeHandlerV1(s netp2p.Stream) { |
||||
defer s.Close() |
||||
|
||||
// Read p2p message payload
|
||||
content, err := p2pv2.ReadData(s) |
||||
|
||||
if err != nil { |
||||
node.log.Error("Read p2p data failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
// TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p.
|
||||
node.MaybeBroadcastAsValidator(content) |
||||
|
||||
consensusObj := node.Consensus |
||||
|
||||
msgCategory, err := proto.GetMessageCategory(content) |
||||
if err != nil { |
||||
node.log.Error("Read node type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgType, err := proto.GetMessageType(content) |
||||
if err != nil { |
||||
node.log.Error("Read action type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgPayload, err := proto.GetMessagePayload(content) |
||||
if err != nil { |
||||
node.log.Error("Read message payload failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
switch msgCategory { |
||||
case proto.Identity: |
||||
actionType := proto_identity.IDMessageType(msgType) |
||||
switch actionType { |
||||
case proto_identity.Identity: |
||||
messageType := proto_identity.MessageType(msgPayload[0]) |
||||
switch messageType { |
||||
case proto_identity.Register: |
||||
fmt.Println("received a identity message") |
||||
// TODO(ak): fix it.
|
||||
// node.processPOWMessage(msgPayload)
|
||||
node.log.Info("NET: received message: IDENTITY/REGISTER") |
||||
default: |
||||
node.log.Error("Announce message should be sent to IdentityChain") |
||||
} |
||||
} |
||||
case proto.Consensus: |
||||
actionType := consensus.ConMessageType(msgType) |
||||
switch actionType { |
||||
case consensus.Consensus: |
||||
if consensusObj.IsLeader { |
||||
node.log.Info("NET: received message: Consensus/Leader") |
||||
consensusObj.ProcessMessageLeader(msgPayload) |
||||
} else { |
||||
node.log.Info("NET: received message: Consensus/Validator") |
||||
consensusObj.ProcessMessageValidator(msgPayload) |
||||
} |
||||
} |
||||
case proto.Node: |
||||
actionType := proto_node.MessageType(msgType) |
||||
switch actionType { |
||||
case proto_node.Transaction: |
||||
node.log.Info("NET: received message: Node/Transaction") |
||||
node.transactionMessageHandler(msgPayload) |
||||
case proto_node.Block: |
||||
node.log.Info("NET: received message: Node/Block") |
||||
blockMsgType := proto_node.BlockMessageType(msgPayload[0]) |
||||
switch blockMsgType { |
||||
case proto_node.Sync: |
||||
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type
|
||||
blocks := new([]*blockchain.Block) |
||||
decoder.Decode(blocks) |
||||
if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { |
||||
node.Client.UpdateBlocks(*blocks) |
||||
} |
||||
} |
||||
case proto_node.Client: |
||||
node.log.Info("NET: received message: Node/Client") |
||||
clientMsgType := proto_node.ClientMessageType(msgPayload[0]) |
||||
switch clientMsgType { |
||||
case proto_node.LookupUtxo: |
||||
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type
|
||||
|
||||
fetchUtxoMessage := new(proto_node.FetchUtxoMessage) |
||||
decoder.Decode(fetchUtxoMessage) |
||||
|
||||
utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) |
||||
|
||||
p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) |
||||
} |
||||
case proto_node.Control: |
||||
node.log.Info("NET: received message: Node/Control") |
||||
controlType := msgPayload[0] |
||||
if proto_node.ControlMessageType(controlType) == proto_node.STOP { |
||||
if node.Chain == nil { |
||||
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) |
||||
|
||||
sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() |
||||
node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) |
||||
|
||||
avgBlockSizeInBytes := 0 |
||||
txCount := 0 |
||||
blockCount := 0 |
||||
totalTxCount := 0 |
||||
totalBlockCount := 0 |
||||
avgTxSize := 0 |
||||
|
||||
for _, block := range node.blockchain.Blocks { |
||||
if block.IsStateBlock() { |
||||
totalTxCount += int(block.State.NumTransactions) |
||||
totalBlockCount += int(block.State.NumBlocks) |
||||
} else { |
||||
byteBuffer := bytes.NewBuffer([]byte{}) |
||||
encoder := gob.NewEncoder(byteBuffer) |
||||
encoder.Encode(block) |
||||
avgBlockSizeInBytes += len(byteBuffer.Bytes()) |
||||
|
||||
txCount += len(block.Transactions) |
||||
blockCount++ |
||||
totalTxCount += len(block.TransactionIds) |
||||
totalBlockCount++ |
||||
|
||||
byteBuffer = bytes.NewBuffer([]byte{}) |
||||
encoder = gob.NewEncoder(byteBuffer) |
||||
encoder.Encode(block.Transactions) |
||||
avgTxSize += len(byteBuffer.Bytes()) |
||||
} |
||||
} |
||||
if blockCount != 0 { |
||||
avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount |
||||
avgTxSize = avgTxSize / txCount |
||||
} |
||||
|
||||
node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) |
||||
} else { |
||||
node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) |
||||
} |
||||
|
||||
os.Exit(0) |
||||
} |
||||
case proto_node.PING: |
||||
// Leader receives PING from new node.
|
||||
node.pingMessageHandler(msgPayload) |
||||
case proto_node.PONG: |
||||
// The new node receives back from leader.
|
||||
node.pongMessageHandler(msgPayload) |
||||
} |
||||
case proto.Client: |
||||
actionType := client.MessageType(msgType) |
||||
node.log.Info("NET: received message: Client/Transaction") |
||||
switch actionType { |
||||
case client.Transaction: |
||||
if node.Client != nil { |
||||
node.Client.TransactionMessageHandler(msgPayload) |
||||
} |
||||
} |
||||
default: |
||||
node.log.Error("Unknown", "MsgCateory:", msgCategory) |
||||
} |
||||
} |
@ -0,0 +1,21 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/host" |
||||
) |
||||
|
||||
// SendMessage sends data to ip, port
|
||||
func (node *Node) SendMessage(p p2p.Peer, data []byte) { |
||||
host.SendMessage(node.host, p, data) |
||||
} |
||||
|
||||
// BroadcastMessage broadcasts message to peers
|
||||
func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) { |
||||
host.BroadcastMessage(node.host, peers, data) |
||||
} |
||||
|
||||
// GetHost returns the p2p host
|
||||
func (node *Node) GetHost() host.Host { |
||||
return node.host |
||||
} |
@ -1,55 +0,0 @@ |
||||
package p2p_test |
||||
|
||||
import ( |
||||
"bufio" |
||||
"net" |
||||
"testing" |
||||
|
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
func setUpTestServer(times int, t *testing.T, conCreated chan struct{}) { |
||||
t.Parallel() |
||||
ln, _ := net.Listen("tcp", ":8081") |
||||
conCreated <- struct{}{} |
||||
conn, _ := ln.Accept() |
||||
defer conn.Close() |
||||
|
||||
var ( |
||||
w = bufio.NewWriter(conn) |
||||
) |
||||
for times > 0 { |
||||
times-- |
||||
data, err := p2p.ReadMessageContent(conn) |
||||
if err != nil { |
||||
t.Fatalf("error when ReadMessageContent %v", err) |
||||
} |
||||
data = p2p.CreateMessage(byte(1), data) |
||||
w.Write(data) |
||||
w.Flush() |
||||
} |
||||
} |
||||
func TestNewNewNode(t *testing.T) { |
||||
times := 100 |
||||
conCreated := make(chan struct{}) |
||||
go setUpTestServer(times, t, conCreated) |
||||
<-conCreated |
||||
|
||||
conn, _ := net.Dial("tcp", "127.0.0.1:8081") |
||||
defer conn.Close() |
||||
|
||||
for times > 0 { |
||||
times-- |
||||
|
||||
myMsg := "minhdoan" |
||||
p2p.SendMessageContent(conn, []byte(myMsg)) |
||||
|
||||
data, err := p2p.ReadMessageContent(conn) |
||||
if err != nil { |
||||
t.Error("got an error when trying to receive an expected message from server.") |
||||
} |
||||
if string(data) != myMsg { |
||||
t.Error("did not receive expected message") |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,13 @@ |
||||
package host |
||||
|
||||
import ( |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
// Host is the client + server in p2p network.
|
||||
type Host interface { |
||||
GetSelfPeer() p2p.Peer |
||||
SendMessage(p2p.Peer, []byte) error |
||||
BindHandlerAndServe(handler p2p.StreamHandler) |
||||
Close() error |
||||
} |
@ -0,0 +1,99 @@ |
||||
package hostv1 |
||||
|
||||
import ( |
||||
"io" |
||||
"net" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
// HostV1 is the version 1 p2p host, using direct socket call.
|
||||
type HostV1 struct { |
||||
self p2p.Peer |
||||
listener net.Listener |
||||
quit chan bool |
||||
} |
||||
|
||||
// New creates a HostV1
|
||||
func New(self p2p.Peer) *HostV1 { |
||||
h := &HostV1{ |
||||
self: self, |
||||
quit: make(chan bool, 1), |
||||
} |
||||
return h |
||||
} |
||||
|
||||
// GetSelfPeer gets self peer
|
||||
func (host *HostV1) GetSelfPeer() p2p.Peer { |
||||
return host.self |
||||
} |
||||
|
||||
// BindHandlerAndServe Version 0 p2p. Going to be deprecated.
|
||||
func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { |
||||
port := host.self.Port |
||||
addr := net.JoinHostPort(host.self.IP, port) |
||||
var err error |
||||
host.listener, err = net.Listen("tcp4", addr) |
||||
if err != nil { |
||||
log.Error("Socket listen port failed", "addr", addr, "err", err) |
||||
return |
||||
} |
||||
if host.listener == nil { |
||||
log.Error("Listen returned nil", "addr", addr) |
||||
return |
||||
} |
||||
backoff := p2p.NewExpBackoff(250*time.Millisecond, 15*time.Second, 2.0) |
||||
for { // Keep listening
|
||||
select { |
||||
case <-host.quit: |
||||
return |
||||
default: |
||||
{ |
||||
conn, err := host.listener.Accept() |
||||
if err != nil { |
||||
log.Error("Error listening on port.", "port", port, |
||||
"err", err) |
||||
backoff.Sleep() |
||||
continue |
||||
} |
||||
// log.Debug("Received New connection", "local", conn.LocalAddr(), "remote", conn.RemoteAddr())
|
||||
go handler(conn) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// SendMessage sends message to peer
|
||||
func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { |
||||
addr := net.JoinHostPort(peer.IP, peer.Port) |
||||
conn, err := net.Dial("tcp", addr) |
||||
// log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr)
|
||||
|
||||
if err != nil { |
||||
log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) |
||||
return |
||||
} |
||||
defer conn.Close() |
||||
|
||||
nw, err := conn.Write(message) |
||||
if err != nil { |
||||
log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) |
||||
return |
||||
} |
||||
if nw < len(message) { |
||||
log.Warn("Write() returned short count", |
||||
"addr", conn.RemoteAddr(), "actual", nw, "expected", len(message)) |
||||
return io.ErrShortWrite |
||||
} |
||||
|
||||
// No ack (reply) message from the receiver for now.
|
||||
return |
||||
} |
||||
|
||||
// Close closes the host
|
||||
func (host *HostV1) Close() error { |
||||
host.quit <- true |
||||
return host.listener.Close() |
||||
} |
@ -0,0 +1,94 @@ |
||||
package hostv2 |
||||
|
||||
import ( |
||||
"bufio" |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
libp2p "github.com/libp2p/go-libp2p" |
||||
libp2phost "github.com/libp2p/go-libp2p-host" |
||||
net "github.com/libp2p/go-libp2p-net" |
||||
peer "github.com/libp2p/go-libp2p-peer" |
||||
peerstore "github.com/libp2p/go-libp2p-peerstore" |
||||
multiaddr "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
const ( |
||||
// BatchSizeInByte The batch size in byte (64MB) in which we return data
|
||||
BatchSizeInByte = 1 << 16 |
||||
// ProtocolID The ID of protocol used in stream handling.
|
||||
ProtocolID = "/harmony/0.0.1" |
||||
) |
||||
|
||||
// HostV2 is the version 2 p2p host
|
||||
type HostV2 struct { |
||||
h libp2phost.Host |
||||
self p2p.Peer |
||||
} |
||||
|
||||
// Peerstore returns the peer store
|
||||
func (host *HostV2) Peerstore() peerstore.Peerstore { |
||||
return host.h.Peerstore() |
||||
} |
||||
|
||||
// New creates a host for p2p communication
|
||||
func New(self p2p.Peer) *HostV2 { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", self.IP, self.Port) |
||||
sourceAddr, err := multiaddr.NewMultiaddr(addr) |
||||
catchError(err) |
||||
priv := addrToPrivKey(addr) |
||||
p2pHost, err := libp2p.New(context.Background(), |
||||
libp2p.ListenAddrs(sourceAddr), |
||||
libp2p.Identity(priv), |
||||
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves.
|
||||
// TODO(ricl): Other features to probe
|
||||
// libp2p.EnableRelay; libp2p.Routing;
|
||||
) |
||||
catchError(err) |
||||
log.Debug("Host is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addrs", sourceAddr) |
||||
h := &HostV2{ |
||||
h: p2pHost, |
||||
self: self, |
||||
} |
||||
return h |
||||
} |
||||
|
||||
// GetSelfPeer gets self peer
|
||||
func (host *HostV2) GetSelfPeer() p2p.Peer { |
||||
return host.self |
||||
} |
||||
|
||||
// BindHandlerAndServe bind a streamHandler to the harmony protocol.
|
||||
func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { |
||||
host.h.SetStreamHandler(ProtocolID, func(s net.Stream) { |
||||
handler(s) |
||||
}) |
||||
// Hang forever
|
||||
<-make(chan struct{}) |
||||
} |
||||
|
||||
// SendMessage a p2p message sending function with signature compatible to p2pv1.
|
||||
func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port) |
||||
targetAddr, err := multiaddr.NewMultiaddr(addr) |
||||
|
||||
priv := addrToPrivKey(addr) |
||||
peerID, _ := peer.IDFromPrivateKey(priv) |
||||
host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) |
||||
s, err := host.h.NewStream(context.Background(), peerID, ProtocolID) |
||||
catchError(err) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
w := bufio.NewWriter(bufio.NewWriter(s)) |
||||
|
||||
// Create a thread to read and write data.
|
||||
go writeData(w, message) |
||||
return nil |
||||
} |
||||
|
||||
// Close closes the host
|
||||
func (host *HostV2) Close() error { |
||||
return host.h.Close() |
||||
} |
@ -1,4 +1,4 @@ |
||||
package p2pv2 |
||||
package hostv2 |
||||
|
||||
import ( |
||||
"bufio" |
@ -0,0 +1,28 @@ |
||||
package p2p |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/dedis/kyber" |
||||
) |
||||
|
||||
// Stream is abstract p2p stream from where we read message
|
||||
type Stream interface { |
||||
Read([]byte) (int, error) |
||||
Write([]byte) (int, error) |
||||
Close() error |
||||
SetReadDeadline(time.Time) error |
||||
} |
||||
|
||||
// StreamHandler handles incoming p2p message.
|
||||
type StreamHandler func(Stream) |
||||
|
||||
// Peer is the object for a p2p peer (node)
|
||||
type Peer struct { |
||||
IP string // IP address of the peer
|
||||
Port string // Port number of the peer
|
||||
PubKey kyber.Point // Public key of the peer
|
||||
Ready bool // Ready is true if the peer is ready to join consensus.
|
||||
ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard
|
||||
// TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available.
|
||||
} |
@ -0,0 +1,25 @@ |
||||
package p2pimpl |
||||
|
||||
import ( |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/host" |
||||
"github.com/harmony-one/harmony/p2p/host/hostv1" |
||||
"github.com/harmony-one/harmony/p2p/host/hostv2" |
||||
) |
||||
|
||||
// Version The version number of p2p library
|
||||
// 1 - Direct socket connection
|
||||
// 2 - libp2p
|
||||
const Version = 1 |
||||
|
||||
// NewHost starts the host
|
||||
func NewHost(peer p2p.Peer) host.Host { |
||||
// log.Debug("New Host", "ip/port", net.JoinHostPort(peer.IP, peer.Port))
|
||||
if Version == 1 { |
||||
h := hostv1.New(peer) |
||||
return h |
||||
} |
||||
|
||||
h := hostv2.New(peer) |
||||
return h |
||||
} |
@ -1,146 +0,0 @@ |
||||
package p2pv2 |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
libp2p "github.com/libp2p/go-libp2p" |
||||
host "github.com/libp2p/go-libp2p-host" |
||||
net "github.com/libp2p/go-libp2p-net" |
||||
peer "github.com/libp2p/go-libp2p-peer" |
||||
peerstore "github.com/libp2p/go-libp2p-peerstore" |
||||
multiaddr "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
var ( |
||||
myHost host.Host // TODO(ricl): this should be a field in node.
|
||||
) |
||||
|
||||
const ( |
||||
// BatchSizeInByte The batch size in byte (64MB) in which we return data
|
||||
BatchSizeInByte = 1 << 16 |
||||
// ProtocolID The ID of protocol used in stream handling.
|
||||
ProtocolID = "/harmony/0.0.1" |
||||
) |
||||
|
||||
// InitHost Initialize a host for p2p communication
|
||||
func InitHost(ip, port string) { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) |
||||
sourceAddr, err := multiaddr.NewMultiaddr(addr) |
||||
catchError(err) |
||||
priv := addrToPrivKey(addr) |
||||
myHost, err = libp2p.New(context.Background(), |
||||
libp2p.ListenAddrs(sourceAddr), |
||||
libp2p.Identity(priv), |
||||
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves.
|
||||
// TODO(ricl): Other features to probe
|
||||
// libp2p.EnableRelay; libp2p.Routing;
|
||||
) |
||||
catchError(err) |
||||
log.Debug("Host is up!", "port", port, "id", myHost.ID().Pretty(), "addrs", sourceAddr) |
||||
} |
||||
|
||||
// BindHandler bind a streamHandler to the harmony protocol.
|
||||
func BindHandler(handler net.StreamHandler) { |
||||
myHost.SetStreamHandler(ProtocolID, handler) |
||||
} |
||||
|
||||
// Send a p2p message sending function with signature compatible to p2pv1.
|
||||
func Send(ip, port string, message []byte) error { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) |
||||
targetAddr, err := multiaddr.NewMultiaddr(addr) |
||||
|
||||
priv := addrToPrivKey(addr) |
||||
peerID, _ := peer.IDFromPrivateKey(priv) |
||||
myHost.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) |
||||
s, err := myHost.NewStream(context.Background(), peerID, ProtocolID) |
||||
catchError(err) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
w := bufio.NewWriter(bufio.NewWriter(s)) |
||||
|
||||
// Create a thread to read and write data.
|
||||
go writeData(w, message) |
||||
return nil |
||||
} |
||||
|
||||
// ReadData Call this function in streamHandler to get the binary data.
|
||||
func ReadData(s net.Stream) ([]byte, error) { |
||||
timeoutDuration := 1 * time.Second |
||||
s.SetReadDeadline(time.Now().Add(timeoutDuration)) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) |
||||
|
||||
contentBuf := bytes.NewBuffer([]byte{}) |
||||
// Read 1 byte for message type
|
||||
_, err := rw.ReadByte() |
||||
switch err { |
||||
case nil: |
||||
//log.Printf("Received p2p message type: %x\n", msgType)
|
||||
case io.EOF: |
||||
fallthrough |
||||
default: |
||||
log.Error("Error reading the p2p message type field", "err", err) |
||||
return contentBuf.Bytes(), err |
||||
} |
||||
// TODO: check on msgType and take actions accordingly
|
||||
|
||||
// Read 4 bytes for message size
|
||||
fourBytes := make([]byte, 4) |
||||
n, err := rw.Read(fourBytes) |
||||
if err != nil { |
||||
log.Error("Error reading the p2p message size field", "err", err) |
||||
return contentBuf.Bytes(), err |
||||
} else if n < len(fourBytes) { |
||||
log.Error("Invalid byte size", "bytes", n) |
||||
return contentBuf.Bytes(), err |
||||
} |
||||
|
||||
//log.Print(fourBytes)
|
||||
// Number of bytes for the message content
|
||||
bytesToRead := binary.BigEndian.Uint32(fourBytes) |
||||
//log.Printf("The content size is %d bytes.", bytesToRead)
|
||||
|
||||
// Read the content in chunk of size `BatchSizeInByte`
|
||||
tmpBuf := make([]byte, BatchSizeInByte) |
||||
ILOOP: |
||||
for { |
||||
// TODO(ricl): is this necessary? If yes, figure out how to make it work
|
||||
// timeoutDuration := 10 * time.Second
|
||||
// s.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if bytesToRead < BatchSizeInByte { |
||||
// Read the last number of bytes less than `BatchSizeInByte`
|
||||
tmpBuf = make([]byte, bytesToRead) |
||||
} |
||||
n, err := rw.Read(tmpBuf) |
||||
contentBuf.Write(tmpBuf[:n]) |
||||
|
||||
switch err { |
||||
case io.EOF: |
||||
// TODO: should we return error here, or just ignore it?
|
||||
log.Error("EOF reached while reading p2p message") |
||||
break ILOOP |
||||
case nil: |
||||
bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop
|
||||
if bytesToRead <= 0 { |
||||
break ILOOP |
||||
} |
||||
default: |
||||
log.Error("Error reading p2p message") |
||||
return []byte{}, err |
||||
} |
||||
} |
||||
return contentBuf.Bytes(), nil |
||||
} |
||||
|
||||
// GetHost Get the p2p host
|
||||
func GetHost() host.Host { |
||||
return myHost |
||||
} |
@ -0,0 +1,95 @@ |
||||
package explorer |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"log" |
||||
"net" |
||||
"net/http" |
||||
|
||||
"github.com/gorilla/mux" |
||||
) |
||||
|
||||
// Constants for explorer service.
|
||||
const ( |
||||
ExplorerServicePort = "5000" |
||||
) |
||||
|
||||
// Service is the struct for explorer service.
|
||||
type Service struct { |
||||
people []Person |
||||
router *mux.Router |
||||
} |
||||
|
||||
// Init is to do init for ExplorerService.
|
||||
func (s *Service) Init() { |
||||
s.people = append(s.people, Person{ID: "1", Firstname: "John", Lastname: "Doe", Address: &Address{City: "City X", State: "State X"}}) |
||||
s.people = append(s.people, Person{ID: "2", Firstname: "Koko", Lastname: "Doe", Address: &Address{City: "City Z", State: "State Y"}}) |
||||
} |
||||
|
||||
// Run is to run serving explorer.
|
||||
func (s *Service) Run() { |
||||
// Init address.
|
||||
addr := net.JoinHostPort("", ExplorerServicePort) |
||||
|
||||
// Set up router
|
||||
s.router = mux.NewRouter() |
||||
s.router.HandleFunc("/people", s.GetPeopleEndpoint).Methods("GET") |
||||
s.router.HandleFunc("/people/{id}", s.GetPersonEndpoint).Methods("GET") |
||||
s.router.HandleFunc("/people/{id}", s.CreatePersonEndpoint).Methods("POST") |
||||
s.router.HandleFunc("/people/{id}", s.DeletePersonEndpoint).Methods("DELETE") |
||||
// Do serving now.
|
||||
go log.Fatal(http.ListenAndServe(addr, s.router)) |
||||
} |
||||
|
||||
// Person is fake struct for testing.
|
||||
type Person struct { |
||||
ID string `json:"id,omitempty"` |
||||
Firstname string `json:"firstname,omitempty"` |
||||
Lastname string `json:"lastname,omitempty"` |
||||
Address *Address `json:"address,omitempty"` |
||||
} |
||||
|
||||
// Address is fake struct for testing.
|
||||
type Address struct { |
||||
City string `json:"city,omitempty"` |
||||
State string `json:"state,omitempty"` |
||||
} |
||||
|
||||
// GetPersonEndpoint is the specific person end point.
|
||||
func (s *Service) GetPersonEndpoint(w http.ResponseWriter, r *http.Request) { |
||||
params := mux.Vars(r) |
||||
for _, item := range s.people { |
||||
if item.ID == params["id"] { |
||||
json.NewEncoder(w).Encode(item) |
||||
return |
||||
} |
||||
} |
||||
json.NewEncoder(w).Encode(&Person{}) |
||||
} |
||||
|
||||
// GetPeopleEndpoint is the people end point.
|
||||
func (s *Service) GetPeopleEndpoint(w http.ResponseWriter, r *http.Request) { |
||||
json.NewEncoder(w).Encode(s.people) |
||||
} |
||||
|
||||
// CreatePersonEndpoint is post people/{id} end point.
|
||||
func (s *Service) CreatePersonEndpoint(w http.ResponseWriter, r *http.Request) { |
||||
params := mux.Vars(r) |
||||
var person Person |
||||
_ = json.NewDecoder(r.Body).Decode(&person) |
||||
person.ID = params["id"] |
||||
s.people = append(s.people, person) |
||||
json.NewEncoder(w).Encode(s.people) |
||||
} |
||||
|
||||
// DeletePersonEndpoint is delete people/{id} end point.
|
||||
func (s *Service) DeletePersonEndpoint(w http.ResponseWriter, r *http.Request) { |
||||
params := mux.Vars(r) |
||||
for index, item := range s.people { |
||||
if item.ID == params["id"] { |
||||
s.people = append(s.people[:index], s.people[index+1:]...) |
||||
break |
||||
} |
||||
json.NewEncoder(w).Encode(s.people) |
||||
} |
||||
} |
@ -0,0 +1,158 @@ |
||||
package explorer_test |
||||
|
||||
// http://www.golangprograms.com/golang-restful-api-using-grom-and-gorilla-mux.html
|
||||
// https://dev.to/codehakase/building-a-restful-api-with-go
|
||||
// https://thenewstack.io/make-a-restful-json-api-go/
|
||||
// https://medium.com/@kelvin_sp/building-and-testing-a-rest-api-in-golang-using-gorilla-mux-and-mysql-1f0518818ff6
|
||||
// var a App
|
||||
|
||||
// func TestMain(m *testing.M) {
|
||||
// a = App{}
|
||||
// a.Initialize("root", "", "rest_api_example")
|
||||
|
||||
// ensureTableExists()
|
||||
|
||||
// code := m.Run()
|
||||
|
||||
// clearTable()
|
||||
|
||||
// os.Exit(code)
|
||||
// }
|
||||
|
||||
// func ensureTableExists() {
|
||||
// if _, err := a.DB.Exec(tableCreationQuery); err != nil {
|
||||
// log.Fatal(err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func clearTable() {
|
||||
// a.DB.Exec("DELETE FROM users")
|
||||
// a.DB.Exec("ALTER TABLE users AUTO_INCREMENT = 1")
|
||||
// }
|
||||
|
||||
// func executeRequest(req *http.Request) *httptest.ResponseRecorder {
|
||||
// rr := httptest.NewRecorder()
|
||||
// a.Router.ServeHTTP(rr, req)
|
||||
|
||||
// return rr
|
||||
// }
|
||||
|
||||
// func checkResponseCode(t *testing.T, expected, actual int) {
|
||||
// if expected != actual {
|
||||
// t.Errorf("Expected response code %d. Got %d\n", expected, actual)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func TestGetNonExistentUser(t *testing.T) {
|
||||
// clearTable()
|
||||
|
||||
// req, _ := http.NewRequest("GET", "/user/45", nil)
|
||||
// response := executeRequest(req)
|
||||
|
||||
// checkResponseCode(t, http.StatusNotFound, response.Code)
|
||||
|
||||
// var m map[string]string
|
||||
// json.Unmarshal(response.Body.Bytes(), &m)
|
||||
// if m["error"] != "User not found" {
|
||||
// t.Errorf("Expected the 'error' key of the response to be set to 'User not found'. Got '%s'", m["error"])
|
||||
// }
|
||||
// }
|
||||
|
||||
// func TestCreateUser(t *testing.T) {
|
||||
// clearTable()
|
||||
|
||||
// payload := []byte(`{"name":"test user","age":30}`)
|
||||
|
||||
// req, _ := http.NewRequest("POST", "/user", bytes.NewBuffer(payload))
|
||||
// response := executeRequest(req)
|
||||
|
||||
// checkResponseCode(t, http.StatusCreated, response.Code)
|
||||
|
||||
// var m map[string]interface{}
|
||||
// json.Unmarshal(response.Body.Bytes(), &m)
|
||||
|
||||
// if m["name"] != "test user" {
|
||||
// t.Errorf("Expected user name to be 'test user'. Got '%v'", m["name"])
|
||||
// }
|
||||
|
||||
// if m["age"] != 30.0 {
|
||||
// t.Errorf("Expected user age to be '30'. Got '%v'", m["age"])
|
||||
// }
|
||||
|
||||
// // the id is compared to 1.0 because JSON unmarshaling converts numbers to
|
||||
// // floats, when the target is a map[string]interface{}
|
||||
// if m["id"] != 1.0 {
|
||||
// t.Errorf("Expected product ID to be '1'. Got '%v'", m["id"])
|
||||
// }
|
||||
// }
|
||||
|
||||
// func addUsers(count int) {
|
||||
// if count < 1 {
|
||||
// count = 1
|
||||
// }
|
||||
|
||||
// for i := 0; i < count; i++ {
|
||||
// statement := fmt.Sprintf("INSERT INTO users(name, age) VALUES('%s', %d)", ("User " + strconv.Itoa(i+1)), ((i + 1) * 10))
|
||||
// a.DB.Exec(statement)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func TestGetUser(t *testing.T) {
|
||||
// clearTable()
|
||||
// addUsers(1)
|
||||
|
||||
// req, _ := http.NewRequest("GET", "/user/1", nil)
|
||||
// response := executeRequest(req)
|
||||
|
||||
// checkResponseCode(t, http.StatusOK, response.Code)
|
||||
// }
|
||||
|
||||
// func TestUpdateUser(t *testing.T) {
|
||||
// clearTable()
|
||||
// addUsers(1)
|
||||
|
||||
// req, _ := http.NewRequest("GET", "/user/1", nil)
|
||||
// response := executeRequest(req)
|
||||
// var originalUser map[string]interface{}
|
||||
// json.Unmarshal(response.Body.Bytes(), &originalUser)
|
||||
|
||||
// payload := []byte(`{"name":"test user - updated name","age":21}`)
|
||||
|
||||
// req, _ = http.NewRequest("PUT", "/user/1", bytes.NewBuffer(payload))
|
||||
// response = executeRequest(req)
|
||||
|
||||
// checkResponseCode(t, http.StatusOK, response.Code)
|
||||
|
||||
// var m map[string]interface{}
|
||||
// json.Unmarshal(response.Body.Bytes(), &m)
|
||||
|
||||
// if m["id"] != originalUser["id"] {
|
||||
// t.Errorf("Expected the id to remain the same (%v). Got %v", originalUser["id"], m["id"])
|
||||
// }
|
||||
|
||||
// if m["name"] == originalUser["name"] {
|
||||
// t.Errorf("Expected the name to change from '%v' to '%v'. Got '%v'", originalUser["name"], m["name"], m["name"])
|
||||
// }
|
||||
|
||||
// if m["age"] == originalUser["age"] {
|
||||
// t.Errorf("Expected the age to change from '%v' to '%v'. Got '%v'", originalUser["age"], m["age"], m["age"])
|
||||
// }
|
||||
// }
|
||||
|
||||
// func TestDeleteUser(t *testing.T) {
|
||||
// clearTable()
|
||||
// addUsers(1)
|
||||
|
||||
// req, _ := http.NewRequest("GET", "/user/1", nil)
|
||||
// response := executeRequest(req)
|
||||
// checkResponseCode(t, http.StatusOK, response.Code)
|
||||
|
||||
// req, _ = http.NewRequest("DELETE", "/user/1", nil)
|
||||
// response = executeRequest(req)
|
||||
|
||||
// checkResponseCode(t, http.StatusOK, response.Code)
|
||||
|
||||
// req, _ = http.NewRequest("GET", "/user/1", nil)
|
||||
// response = executeRequest(req)
|
||||
// checkResponseCode(t, http.StatusNotFound, response.Code)
|
||||
// }
|
@ -1,7 +1,7 @@ |
||||
package downloader |
||||
|
||||
import ( |
||||
pb "github.com/harmony-one/harmony/syncing/downloader/proto" |
||||
pb "github.com/harmony-one/harmony/services/syncing/downloader/proto" |
||||
) |
||||
|
||||
// DownloadInterface is the interface for downloader package.
|
Loading…
Reference in new issue