commit
d2be940879
@ -0,0 +1,183 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/gob" |
||||
"fmt" |
||||
"os" |
||||
|
||||
"github.com/harmony-one/harmony/blockchain" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2pv2" |
||||
"github.com/harmony-one/harmony/proto" |
||||
"github.com/harmony-one/harmony/proto/client" |
||||
"github.com/harmony-one/harmony/proto/consensus" |
||||
proto_identity "github.com/harmony-one/harmony/proto/identity" |
||||
proto_node "github.com/harmony-one/harmony/proto/node" |
||||
netp2p "github.com/libp2p/go-libp2p-net" |
||||
) |
||||
|
||||
// NodeHandlerV1 handles a new incoming connection.
|
||||
func (node *Node) NodeHandlerV1(s netp2p.Stream) { |
||||
defer s.Close() |
||||
|
||||
// Read p2p message payload
|
||||
content, err := p2pv2.ReadData(s) |
||||
|
||||
if err != nil { |
||||
node.log.Error("Read p2p data failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
// TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p.
|
||||
node.MaybeBroadcastAsValidator(content) |
||||
|
||||
consensusObj := node.Consensus |
||||
|
||||
msgCategory, err := proto.GetMessageCategory(content) |
||||
if err != nil { |
||||
node.log.Error("Read node type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgType, err := proto.GetMessageType(content) |
||||
if err != nil { |
||||
node.log.Error("Read action type failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
msgPayload, err := proto.GetMessagePayload(content) |
||||
if err != nil { |
||||
node.log.Error("Read message payload failed", "err", err, "node", node) |
||||
return |
||||
} |
||||
|
||||
switch msgCategory { |
||||
case proto.Identity: |
||||
actionType := proto_identity.IDMessageType(msgType) |
||||
switch actionType { |
||||
case proto_identity.Identity: |
||||
messageType := proto_identity.MessageType(msgPayload[0]) |
||||
switch messageType { |
||||
case proto_identity.Register: |
||||
fmt.Println("received a identity message") |
||||
// TODO(ak): fix it.
|
||||
// node.processPOWMessage(msgPayload)
|
||||
node.log.Info("NET: received message: IDENTITY/REGISTER") |
||||
default: |
||||
node.log.Error("Announce message should be sent to IdentityChain") |
||||
} |
||||
} |
||||
case proto.Consensus: |
||||
actionType := consensus.ConMessageType(msgType) |
||||
switch actionType { |
||||
case consensus.Consensus: |
||||
if consensusObj.IsLeader { |
||||
node.log.Info("NET: received message: Consensus/Leader") |
||||
consensusObj.ProcessMessageLeader(msgPayload) |
||||
} else { |
||||
node.log.Info("NET: received message: Consensus/Validator") |
||||
consensusObj.ProcessMessageValidator(msgPayload) |
||||
} |
||||
} |
||||
case proto.Node: |
||||
actionType := proto_node.MessageType(msgType) |
||||
switch actionType { |
||||
case proto_node.Transaction: |
||||
node.log.Info("NET: received message: Node/Transaction") |
||||
node.transactionMessageHandler(msgPayload) |
||||
case proto_node.Block: |
||||
node.log.Info("NET: received message: Node/Block") |
||||
blockMsgType := proto_node.BlockMessageType(msgPayload[0]) |
||||
switch blockMsgType { |
||||
case proto_node.Sync: |
||||
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type
|
||||
blocks := new([]*blockchain.Block) |
||||
decoder.Decode(blocks) |
||||
if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { |
||||
node.Client.UpdateBlocks(*blocks) |
||||
} |
||||
} |
||||
case proto_node.Client: |
||||
node.log.Info("NET: received message: Node/Client") |
||||
clientMsgType := proto_node.ClientMessageType(msgPayload[0]) |
||||
switch clientMsgType { |
||||
case proto_node.LookupUtxo: |
||||
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type
|
||||
|
||||
fetchUtxoMessage := new(proto_node.FetchUtxoMessage) |
||||
decoder.Decode(fetchUtxoMessage) |
||||
|
||||
utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) |
||||
|
||||
p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) |
||||
} |
||||
case proto_node.Control: |
||||
node.log.Info("NET: received message: Node/Control") |
||||
controlType := msgPayload[0] |
||||
if proto_node.ControlMessageType(controlType) == proto_node.STOP { |
||||
if node.Chain == nil { |
||||
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) |
||||
|
||||
sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() |
||||
node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) |
||||
|
||||
avgBlockSizeInBytes := 0 |
||||
txCount := 0 |
||||
blockCount := 0 |
||||
totalTxCount := 0 |
||||
totalBlockCount := 0 |
||||
avgTxSize := 0 |
||||
|
||||
for _, block := range node.blockchain.Blocks { |
||||
if block.IsStateBlock() { |
||||
totalTxCount += int(block.State.NumTransactions) |
||||
totalBlockCount += int(block.State.NumBlocks) |
||||
} else { |
||||
byteBuffer := bytes.NewBuffer([]byte{}) |
||||
encoder := gob.NewEncoder(byteBuffer) |
||||
encoder.Encode(block) |
||||
avgBlockSizeInBytes += len(byteBuffer.Bytes()) |
||||
|
||||
txCount += len(block.Transactions) |
||||
blockCount++ |
||||
totalTxCount += len(block.TransactionIds) |
||||
totalBlockCount++ |
||||
|
||||
byteBuffer = bytes.NewBuffer([]byte{}) |
||||
encoder = gob.NewEncoder(byteBuffer) |
||||
encoder.Encode(block.Transactions) |
||||
avgTxSize += len(byteBuffer.Bytes()) |
||||
} |
||||
} |
||||
if blockCount != 0 { |
||||
avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount |
||||
avgTxSize = avgTxSize / txCount |
||||
} |
||||
|
||||
node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) |
||||
} else { |
||||
node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) |
||||
} |
||||
|
||||
os.Exit(0) |
||||
} |
||||
case proto_node.PING: |
||||
// Leader receives PING from new node.
|
||||
node.pingMessageHandler(msgPayload) |
||||
case proto_node.PONG: |
||||
// The new node receives back from leader.
|
||||
node.pongMessageHandler(msgPayload) |
||||
} |
||||
case proto.Client: |
||||
actionType := client.MessageType(msgType) |
||||
node.log.Info("NET: received message: Client/Transaction") |
||||
switch actionType { |
||||
case client.Transaction: |
||||
if node.Client != nil { |
||||
node.Client.TransactionMessageHandler(msgPayload) |
||||
} |
||||
} |
||||
default: |
||||
node.log.Error("Unknown", "MsgCateory:", msgCategory) |
||||
} |
||||
} |
@ -0,0 +1,146 @@ |
||||
package p2pv2 |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
libp2p "github.com/libp2p/go-libp2p" |
||||
host "github.com/libp2p/go-libp2p-host" |
||||
net "github.com/libp2p/go-libp2p-net" |
||||
peer "github.com/libp2p/go-libp2p-peer" |
||||
peerstore "github.com/libp2p/go-libp2p-peerstore" |
||||
multiaddr "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
var ( |
||||
myHost host.Host // TODO(ricl): this should be a field in node.
|
||||
) |
||||
|
||||
const ( |
||||
// BatchSizeInByte The batch size in byte (64MB) in which we return data
|
||||
BatchSizeInByte = 1 << 16 |
||||
// ProtocolID The ID of protocol used in stream handling.
|
||||
ProtocolID = "/harmony/0.0.1" |
||||
) |
||||
|
||||
// InitHost Initialize a host for p2p communication
|
||||
func InitHost(ip, port string) { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) |
||||
sourceAddr, err := multiaddr.NewMultiaddr(addr) |
||||
catchError(err) |
||||
priv := addrToPrivKey(addr) |
||||
myHost, err = libp2p.New(context.Background(), |
||||
libp2p.ListenAddrs(sourceAddr), |
||||
libp2p.Identity(priv), |
||||
libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves.
|
||||
// TODO(ricl): Other features to probe
|
||||
// libp2p.EnableRelay; libp2p.Routing;
|
||||
) |
||||
catchError(err) |
||||
log.Debug("Host is up!", "port", port, "id", myHost.ID().Pretty(), "addrs", sourceAddr) |
||||
} |
||||
|
||||
// BindHandler bind a streamHandler to the harmony protocol.
|
||||
func BindHandler(handler net.StreamHandler) { |
||||
myHost.SetStreamHandler(ProtocolID, handler) |
||||
} |
||||
|
||||
// Send a p2p message sending function with signature compatible to p2pv1.
|
||||
func Send(ip, port string, message []byte) error { |
||||
addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) |
||||
targetAddr, err := multiaddr.NewMultiaddr(addr) |
||||
|
||||
priv := addrToPrivKey(addr) |
||||
peerID, _ := peer.IDFromPrivateKey(priv) |
||||
myHost.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) |
||||
s, err := myHost.NewStream(context.Background(), peerID, ProtocolID) |
||||
catchError(err) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
w := bufio.NewWriter(bufio.NewWriter(s)) |
||||
|
||||
// Create a thread to read and write data.
|
||||
go writeData(w, message) |
||||
return nil |
||||
} |
||||
|
||||
// ReadData Call this function in streamHandler to get the binary data.
|
||||
func ReadData(s net.Stream) ([]byte, error) { |
||||
timeoutDuration := 1 * time.Second |
||||
s.SetReadDeadline(time.Now().Add(timeoutDuration)) |
||||
|
||||
// Create a buffered stream so that read and writes are non blocking.
|
||||
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) |
||||
|
||||
contentBuf := bytes.NewBuffer([]byte{}) |
||||
// Read 1 byte for message type
|
||||
_, err := rw.ReadByte() |
||||
switch err { |
||||
case nil: |
||||
//log.Printf("Received p2p message type: %x\n", msgType)
|
||||
case io.EOF: |
||||
fallthrough |
||||
default: |
||||
log.Error("Error reading the p2p message type field", "err", err) |
||||
return contentBuf.Bytes(), err |
||||
} |
||||
// TODO: check on msgType and take actions accordingly
|
||||
|
||||
// Read 4 bytes for message size
|
||||
fourBytes := make([]byte, 4) |
||||
n, err := rw.Read(fourBytes) |
||||
if err != nil { |
||||
log.Error("Error reading the p2p message size field", "err", err) |
||||
return contentBuf.Bytes(), err |
||||
} else if n < len(fourBytes) { |
||||
log.Error("Invalid byte size", "bytes", n) |
||||
return contentBuf.Bytes(), err |
||||
} |
||||
|
||||
//log.Print(fourBytes)
|
||||
// Number of bytes for the message content
|
||||
bytesToRead := binary.BigEndian.Uint32(fourBytes) |
||||
//log.Printf("The content size is %d bytes.", bytesToRead)
|
||||
|
||||
// Read the content in chunk of size `BatchSizeInByte`
|
||||
tmpBuf := make([]byte, BatchSizeInByte) |
||||
ILOOP: |
||||
for { |
||||
// TODO(ricl): is this necessary? If yes, figure out how to make it work
|
||||
// timeoutDuration := 10 * time.Second
|
||||
// s.SetReadDeadline(time.Now().Add(timeoutDuration))
|
||||
if bytesToRead < BatchSizeInByte { |
||||
// Read the last number of bytes less than `BatchSizeInByte`
|
||||
tmpBuf = make([]byte, bytesToRead) |
||||
} |
||||
n, err := rw.Read(tmpBuf) |
||||
contentBuf.Write(tmpBuf[:n]) |
||||
|
||||
switch err { |
||||
case io.EOF: |
||||
// TODO: should we return error here, or just ignore it?
|
||||
log.Error("EOF reached while reading p2p message") |
||||
break ILOOP |
||||
case nil: |
||||
bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop
|
||||
if bytesToRead <= 0 { |
||||
break ILOOP |
||||
} |
||||
default: |
||||
log.Error("Error reading p2p message") |
||||
return []byte{}, err |
||||
} |
||||
} |
||||
return contentBuf.Bytes(), nil |
||||
} |
||||
|
||||
// GetHost Get the p2p host
|
||||
func GetHost() host.Host { |
||||
return myHost |
||||
} |
@ -0,0 +1,31 @@ |
||||
package p2pv2 |
||||
|
||||
import ( |
||||
"bufio" |
||||
"hash/fnv" |
||||
"math/rand" |
||||
|
||||
"github.com/harmony-one/harmony/log" |
||||
ic "github.com/libp2p/go-libp2p-crypto" |
||||
) |
||||
|
||||
func catchError(err error) { |
||||
if err != nil { |
||||
log.Error("catchError", "err", err) |
||||
panic(err) |
||||
} |
||||
} |
||||
|
||||
func addrToPrivKey(addr string) ic.PrivKey { |
||||
h := fnv.New32a() |
||||
_, err := h.Write([]byte(addr)) |
||||
catchError(err) |
||||
r := rand.New(rand.NewSource(int64(h.Sum32()))) // Hack: forcing the random see to be the hash of addr so that we can recover priv from ip + port.
|
||||
priv, _, err := ic.GenerateKeyPairWithReader(ic.RSA, 512, r) |
||||
return priv |
||||
} |
||||
|
||||
func writeData(w *bufio.Writer, data []byte) { |
||||
w.Write(data) |
||||
w.Flush() |
||||
} |
Loading…
Reference in new issue