From 9ac26459359bed3755d058e2784cb73697ea81cc Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Wed, 28 Nov 2018 21:23:22 -0800 Subject: [PATCH 1/8] integrate libp2p as version 2 p2p --- client/txgen/main.go | 24 +++-- node/node.go | 15 ++- node/node_handler.go | 219 +++++++++++++++++++++++++++++++++++++++++++ node/node_test.go | 20 ++-- p2p/helper.go | 8 +- p2p/peer.go | 14 ++- p2pv2/host.go | 147 +++++++++++++++++++++++++++++ p2pv2/util.go | 33 +++++++ trie/database.go | 7 +- 9 files changed, 454 insertions(+), 33 deletions(-) create mode 100644 p2pv2/host.go create mode 100644 p2pv2/util.go diff --git a/client/txgen/main.go b/client/txgen/main.go index 1e89f56bd..33e3bdd30 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -3,15 +3,18 @@ package main import ( "flag" "fmt" - "github.com/ethereum/go-ethereum/rlp" - "github.com/harmony-one/harmony/client/txgen/txgen" - "github.com/harmony-one/harmony/core/types" "os" "path" "runtime" "sync" "time" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/client/txgen/txgen" + "github.com/harmony-one/harmony/core/types" + + "github.com/harmony-one/harmony/p2pv2" + "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/client" client_config "github.com/harmony-one/harmony/client/config" @@ -85,11 +88,15 @@ func main() { } // Client/txgenerator server node setup - clientPort := config.GetClientPort() - consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) + clientPeer := config.GetClientPeer() + consensusObj := consensus.NewConsensus(clientPeer.Ip, clientPeer.Port, "0", nil, p2p.Peer{}) clientNode := node.New(consensusObj, nil) + // Add self peer. + // TODO(ricl): setting self peer should be moved into consensus / node New! + clientNode.SelfPeer = *clientPeer - if clientPort != "" { + if clientPeer != nil { + p2pv2.InitHost(clientPeer.Ip, clientPeer.Port) // TODO: this should be moved into client node. clientNode.Client = client.NewClient(&shardIDLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders @@ -127,10 +134,9 @@ func main() { // Start the client server to listen to leader's message go func() { - clientNode.StartServer(clientPort) + clientNode.StartServer(clientPeer.Port) }() } - // Transaction generation process time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() @@ -199,7 +205,7 @@ func main() { txs, crossTxs := txgen.GenerateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes, setting) // Put cross shard tx into a pending list waiting for proofs from leaders - if clientPort != "" { + if clientPeer != nil { clientNode.Client.PendingCrossTxsMutex.Lock() for _, tx := range crossTxs { clientNode.Client.PendingCrossTxs[tx.ID] = tx diff --git a/node/node.go b/node/node.go index 5c20c5581..bd0d0533d 100644 --- a/node/node.go +++ b/node/node.go @@ -25,6 +25,7 @@ import ( "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2pv2" proto_identity "github.com/harmony-one/harmony/proto/identity" proto_node "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/syncing/downloader" @@ -154,9 +155,16 @@ func (node *Node) StartServer(port string) { // Disable this temporarily. // node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers()) } - fmt.Println("going to start server on port:", port) - //node.log.Debug("Starting server", "node", node, "port", port) - node.listenOnPort(port) + if p2p.Version == 1 { + fmt.Println("going to start server on port:", port) + //node.log.Debug("Starting server", "node", node, "port", port) + node.listenOnPort(port) + } else { + p2pv2.InitHost(node.SelfPeer.Ip, port) + p2pv2.BindHandler(node.NodeHandlerV1) + // Hang forever + <-make(chan struct{}) + } } // SetLog sets log for Node. @@ -165,6 +173,7 @@ func (node *Node) SetLog() *Node { return node } +// Version 0 p2p. Going to be deprecated. func (node *Node) listenOnPort(port string) { addr := net.JoinHostPort("", port) listen, err := net.Listen("tcp4", addr) diff --git a/node/node_handler.go b/node/node_handler.go index 47eb7cca9..a4fad126b 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -17,11 +17,13 @@ import ( hmy_crypto "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2pv2" "github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto/client" "github.com/harmony-one/harmony/proto/consensus" proto_identity "github.com/harmony-one/harmony/proto/identity" proto_node "github.com/harmony-one/harmony/proto/node" + netp2p "github.com/libp2p/go-libp2p-net" ) const ( @@ -206,6 +208,223 @@ func (node *Node) NodeHandler(conn net.Conn) { } } +// NodeHandlerV1 handles a new incoming connection. +func (node *Node) NodeHandlerV1(s netp2p.Stream) { + defer s.Close() + + // Read p2p message payload + content, err := p2pv2.ReadData(s) + + if err != nil { + node.log.Error("Read p2p data failed", "err", err, "node", node) + return + } + // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. + node.MaybeBroadcastAsValidator(content) + + consensusObj := node.Consensus + + msgCategory, err := proto.GetMessageCategory(content) + if err != nil { + node.log.Error("Read node type failed", "err", err, "node", node) + return + } + + msgType, err := proto.GetMessageType(content) + if err != nil { + node.log.Error("Read action type failed", "err", err, "node", node) + return + } + + msgPayload, err := proto.GetMessagePayload(content) + if err != nil { + node.log.Error("Read message payload failed", "err", err, "node", node) + return + } + + switch msgCategory { + case proto.Identity: + actionType := proto_identity.IdentityMessageType(msgType) + switch actionType { + case proto_identity.Identity: + messageType := proto_identity.MessageType(msgPayload[0]) + switch messageType { + case proto_identity.Register: + fmt.Println("received a identity message") + // TODO(ak): fix it. + // node.processPOWMessage(msgPayload) + node.log.Info("NET: received message: IDENTITY/REGISTER") + default: + node.log.Error("Announce message should be sent to IdentityChain") + } + } + case proto.Consensus: + actionType := consensus.ConsensusMessageType(msgType) + switch actionType { + case consensus.Consensus: + if consensusObj.IsLeader { + node.log.Info("NET: received message: Consensus/Leader") + consensusObj.ProcessMessageLeader(msgPayload) + } else { + node.log.Info("NET: received message: Consensus/Validator") + consensusObj.ProcessMessageValidator(msgPayload) + } + } + case proto.Node: + actionType := proto_node.NodeMessageType(msgType) + switch actionType { + case proto_node.Transaction: + node.log.Info("NET: received message: Node/Transaction") + node.transactionMessageHandler(msgPayload) + case proto_node.Block: + node.log.Info("NET: received message: Node/Block") + blockMsgType := proto_node.BlockMessageType(msgPayload[0]) + switch blockMsgType { + case proto_node.Sync: + decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type + blocks := new([]*blockchain.Block) + decoder.Decode(blocks) + if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { + node.Client.UpdateBlocks(*blocks) + } + } + case proto_node.BlockchainSync: + node.log.Info("NET: received message: Node/BlockchainSync") + node.handleBlockchainSyncV1(msgPayload, s) + case proto_node.Client: + node.log.Info("NET: received message: Node/Client") + clientMsgType := proto_node.ClientMessageType(msgPayload[0]) + switch clientMsgType { + case proto_node.LookupUtxo: + decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type + + fetchUtxoMessage := new(proto_node.FetchUtxoMessage) + decoder.Decode(fetchUtxoMessage) + + utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) + + p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) + } + case proto_node.Control: + node.log.Info("NET: received message: Node/Control") + controlType := msgPayload[0] + if proto_node.ControlMessageType(controlType) == proto_node.STOP { + if node.Chain == nil { + node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) + + sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() + node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) + + avgBlockSizeInBytes := 0 + txCount := 0 + blockCount := 0 + totalTxCount := 0 + totalBlockCount := 0 + avgTxSize := 0 + + for _, block := range node.blockchain.Blocks { + if block.IsStateBlock() { + totalTxCount += int(block.State.NumTransactions) + totalBlockCount += int(block.State.NumBlocks) + } else { + byteBuffer := bytes.NewBuffer([]byte{}) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(block) + avgBlockSizeInBytes += len(byteBuffer.Bytes()) + + txCount += len(block.Transactions) + blockCount++ + totalTxCount += len(block.TransactionIds) + totalBlockCount++ + + byteBuffer = bytes.NewBuffer([]byte{}) + encoder = gob.NewEncoder(byteBuffer) + encoder.Encode(block.Transactions) + avgTxSize += len(byteBuffer.Bytes()) + } + } + if blockCount != 0 { + avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount + avgTxSize = avgTxSize / txCount + } + + node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) + } else { + node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) + } + + os.Exit(0) + } + case proto_node.PING: + node.pingMessageHandler(msgPayload) + case proto_node.PONG: + node.pongMessageHandler(msgPayload) + } + case proto.Client: + actionType := client.ClientMessageType(msgType) + node.log.Info("NET: received message: Client/Transaction") + switch actionType { + case client.Transaction: + if node.Client != nil { + node.Client.TransactionMessageHandler(msgPayload) + } + } + default: + node.log.Error("Unknown", "MsgCateory:", msgCategory) + } +} + +// Refactor by moving this code into a sync package. +func (node *Node) handleBlockchainSyncV1(payload []byte, s netp2p.Stream) { + // TODO(minhdoan): Looking to removing this. + w := bufio.NewWriter(bufio.NewWriter(s)) +FOR_LOOP: + for { + syncMsgType := proto_node.BlockchainSyncMessageType(payload[0]) + switch syncMsgType { + case proto_node.GetBlock: + block := node.blockchain.FindBlock(payload[1:33]) + w.Write(block.Serialize()) + w.Flush() + case proto_node.GetLastBlockHashes: + blockchainSyncMessage := proto_node.BlockchainSyncMessage{ + BlockHeight: len(node.blockchain.Blocks), + BlockHashes: node.blockchain.GetBlockHashes(), + } + w.Write(proto_node.SerializeBlockchainSyncMessage(&blockchainSyncMessage)) + w.Flush() + case proto_node.Done: + break FOR_LOOP + } + content, err := p2pv2.ReadData(s) + + if err != nil { + node.log.Error("Failed in reading message content from syncing node", err) + return + } + + msgCategory, _ := proto.GetMessageCategory(content) + if err != nil || msgCategory != proto.Node { + node.log.Error("Failed in reading message category from syncing node", err) + return + } + + msgType, err := proto.GetMessageType(content) + actionType := proto_node.NodeMessageType(msgType) + if err != nil || actionType != proto_node.BlockchainSync { + node.log.Error("Failed in reading message type from syncing node", err) + return + } + + payload, err = proto.GetMessagePayload(content) + if err != nil { + node.log.Error("Failed in reading payload from syncing node", err) + return + } + } + node.log.Info("HOORAY: Done sending info to syncing node.") +} + // Refactor by moving this code into a sync package. func (node *Node) handleBlockchainSync(payload []byte, conn net.Conn) { // TODO(minhdoan): Looking to removing this. diff --git a/node/node_test.go b/node/node_test.go index 175a0b1b8..f76aa29b0 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -147,16 +147,16 @@ func exitServer() { os.Exit(0) } -func TestPingPongHandler(test *testing.T) { - leader := p2p.Peer{Ip: "127.0.0.1", Port: "8881"} - validator := p2p.Peer{Ip: "127.0.0.1", Port: "9991"} - consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader, validator}, leader) +// func TestPingPongHandler(test *testing.T) { +// leader := p2p.Peer{Ip: "127.0.0.1", Port: "8881"} +// // validator := p2p.Peer{Ip: "127.0.0.1", Port: "9991"} +// consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) - node := New(consensus, nil) +// node := New(consensus, nil) - // go sendPingMessage(leader) - go sendPongMessage(leader) - go exitServer() +// // go sendPingMessage(leader) +// go sendPongMessage(leader) +// go exitServer() - node.StartServer("8881") -} +// node.StartServer("8881") +// } diff --git a/p2p/helper.go b/p2p/helper.go index d748a0aae..62e428837 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -26,16 +26,14 @@ content (n bytes) - actual message content const BATCH_SIZE = 1 << 16 -// Read the message type and content size, and return the actual content. +// ReadMessageContent Read the message type and content size, and return the actual content. func ReadMessageContent(conn net.Conn) ([]byte, error) { var ( contentBuf = bytes.NewBuffer([]byte{}) r = bufio.NewReader(conn) ) - timeoutDuration := 1 * time.Second conn.SetReadDeadline(time.Now().Add(timeoutDuration)) - //// Read 1 byte for message type _, err := r.ReadByte() switch err { @@ -49,7 +47,6 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { return contentBuf.Bytes(), err } // TODO: check on msgType and take actions accordingly - //// Read 4 bytes for message size fourBytes := make([]byte, 4) n, err := r.Read(fourBytes) @@ -60,12 +57,10 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { log.Printf("Failed reading the p2p message size field: only read %d bytes", n) return contentBuf.Bytes(), err } - //log.Print(fourBytes) // Number of bytes for the message content bytesToRead := binary.BigEndian.Uint32(fourBytes) //log.Printf("The content size is %d bytes.", bytesToRead) - //// Read the content in chunk of 16 * 1024 bytes tmpBuf := make([]byte, BATCH_SIZE) ILOOP: @@ -78,7 +73,6 @@ ILOOP: } n, err := r.Read(tmpBuf) contentBuf.Write(tmpBuf[:n]) - switch err { case io.EOF: // TODO: should we return error here, or just ignore it? diff --git a/p2p/peer.go b/p2p/peer.go index 78fd6e1ef..16adce722 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/p2pv2" "github.com/dedis/kyber" ) @@ -25,6 +26,11 @@ type Peer struct { const MaxBroadCast = 20 +// Version The version number of p2p library +// 1 - Direct socket connection +// 2 - libp2p +const Version = 1 + // SendMessage sends the message to the peer func SendMessage(peer Peer, msg []byte) { // Construct normal p2p message @@ -133,7 +139,13 @@ func send(ip, port string, message []byte) { backoff := NewExpBackoff(250*time.Millisecond, 10*time.Second, 2) for trial := 0; trial < 10; trial++ { - err := sendWithSocketClient(ip, port, message) + var err error + if Version == 1 { + // TODO(ricl): remove sendWithSocketClient related code. + err = sendWithSocketClient(ip, port, message) + } else { + err = p2pv2.Send(ip, port, message) + } if err == nil { if trial > 0 { log.Warn("retry sendWithSocketClient", "rety", trial) diff --git a/p2pv2/host.go b/p2pv2/host.go new file mode 100644 index 000000000..219fb0190 --- /dev/null +++ b/p2pv2/host.go @@ -0,0 +1,147 @@ +package p2pv2 + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "time" + + "github.com/harmony-one/harmony/log" + libp2p "github.com/libp2p/go-libp2p" + host "github.com/libp2p/go-libp2p-host" + net "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + multiaddr "github.com/multiformats/go-multiaddr" +) + +var ( + myHost host.Host // TODO(ricl): this should be a field in node. +) + +const ( + // BatchSize The batch size in which we return data + BatchSize = 1 << 16 + // ProtocolID The ID of protocol used in stream handling. + ProtocolID = "/harmony/0.0.1" +) + +// InitHost Initialize a host for p2p communication +func InitHost(ip, port string) { + addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) + sourceAddr, err := multiaddr.NewMultiaddr(addr) + catchError(err) + // TODO(ricl): use ip as well. + priv := addrToPrivKey(addr) + myHost, err = libp2p.New(context.Background(), + libp2p.ListenAddrs(sourceAddr), + libp2p.Identity(priv), + libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves. + // TODO(ricl): Other features to probe + // libp2p.EnableRelay; libp2p.Routing; + ) + catchError(err) + log.Debug("Host is up!", "port", port, "id", myHost.ID().Pretty(), "addrs", sourceAddr) +} + +// BindHandler bind a streamHandler to the harmony protocol. +func BindHandler(handler net.StreamHandler) { + myHost.SetStreamHandler(ProtocolID, handler) +} + +// Send a p2p message sending function with signature compatible to p2pv1. +func Send(ip, port string, message []byte) error { + addr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%s", port) + targetAddr, err := multiaddr.NewMultiaddr(addr) + + priv := addrToPrivKey(addr) + peerID, _ := peer.IDFromPrivateKey(priv) + myHost.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) + s, err := myHost.NewStream(context.Background(), peerID, ProtocolID) + catchError(err) + + // Create a buffered stream so that read and writes are non blocking. + w := bufio.NewWriter(bufio.NewWriter(s)) + + // Create a thread to read and write data. + go writeData(w, message) + return nil +} + +// ReadData Call this function in streamHandler to get the binary data. +func ReadData(s net.Stream) ([]byte, error) { + timeoutDuration := 1 * time.Second + s.SetReadDeadline(time.Now().Add(timeoutDuration)) + + // Create a buffered stream so that read and writes are non blocking. + rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) + + contentBuf := bytes.NewBuffer([]byte{}) + // Read 1 byte for message type + _, err := rw.ReadByte() + switch err { + case nil: + //log.Printf("Received p2p message type: %x\n", msgType) + case io.EOF: + fallthrough + default: + log.Error("Error reading the p2p message type field", "err", err) + return contentBuf.Bytes(), err + } + // TODO: check on msgType and take actions accordingly + + // Read 4 bytes for message size + fourBytes := make([]byte, 4) + n, err := rw.Read(fourBytes) + if err != nil { + log.Error("Error reading the p2p message size field", "err", err) + return contentBuf.Bytes(), err + } else if n < len(fourBytes) { + log.Error("Invalid byte size", "bytes", n) + return contentBuf.Bytes(), err + } + + //log.Print(fourBytes) + // Number of bytes for the message content + bytesToRead := binary.BigEndian.Uint32(fourBytes) + //log.Printf("The content size is %d bytes.", bytesToRead) + + // Read the content in chunk of 16 * 1024 bytes + tmpBuf := make([]byte, BatchSize) +ILOOP: + for { + // TODO(ricl): is this necessary? If yes, figure out how to make it work + // timeoutDuration := 10 * time.Second + // s.SetReadDeadline(time.Now().Add(timeoutDuration)) + if bytesToRead < BatchSize { + // Read the last number of bytes less than 1024 + tmpBuf = make([]byte, bytesToRead) + } + n, err := rw.Read(tmpBuf) + contentBuf.Write(tmpBuf[:n]) + + switch err { + case io.EOF: + // TODO: should we return error here, or just ignore it? + log.Error("EOF reached while reading p2p message") + break ILOOP + case nil: + bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop + if bytesToRead <= 0 { + break ILOOP + } + default: + log.Error("Error reading p2p message") + return []byte{}, err + } + } + return contentBuf.Bytes(), nil +} + +// GetHost Get the p2p host +func GetHost() host.Host { + return myHost +} diff --git a/p2pv2/util.go b/p2pv2/util.go new file mode 100644 index 000000000..664e708b0 --- /dev/null +++ b/p2pv2/util.go @@ -0,0 +1,33 @@ +package p2pv2 + +import ( + "bufio" + "hash/fnv" + "math/rand" + + "github.com/harmony-one/harmony/log" + ic "github.com/libp2p/go-libp2p-crypto" +) + +func catchError(err error) { + if err != nil { + log.Error("catchError", "err", err) + panic(err) + } +} + +func addrToPrivKey(addr string) ic.PrivKey { + h := fnv.New32a() + _, err := h.Write([]byte(addr)) + catchError(err) + r := rand.New(rand.NewSource(int64(h.Sum32()))) // Hack: forcing the random see to be the hash of addr so that we can recover priv from ip + port. + priv, _, err := ic.GenerateKeyPairWithReader(ic.RSA, 512, r) + return priv +} + +func writeData(w *bufio.Writer, data []byte) { + for { + w.Write(data) + w.Flush() + } +} diff --git a/trie/database.go b/trie/database.go index 1cf5301b6..b1718c933 100644 --- a/trie/database.go +++ b/trie/database.go @@ -18,14 +18,15 @@ package trie import ( "fmt" + "io" + "sync" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" hdb "github.com/harmony-one/harmony/db" - "io" - "sync" - "time" ) var ( From ba47c85faf145072bea43c8ef0f4993488e854d9 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Thu, 29 Nov 2018 00:04:12 -0800 Subject: [PATCH 2/8] ip+port => peer; update signature of consensus.New and node.New as preparation for p2p --- benchmark.go | 4 ++-- client/txgen/main.go | 9 +++------ client/wallet/main.go | 2 +- client/wallet_v2/main.go | 2 +- consensus/consensus.go | 16 +++++++--------- consensus/consensus_leader_msg_test.go | 4 ++-- consensus/consensus_test.go | 4 ++-- consensus/consensus_validator.go | 6 +++--- consensus/consensus_validator_msg_test.go | 4 ++-- node/node.go | 5 ++++- node/node_test.go | 14 +++++++------- utils/distribution_config.go | 2 +- utils/utils.go | 16 ++++++---------- 13 files changed, 41 insertions(+), 47 deletions(-) diff --git a/benchmark.go b/benchmark.go index 5b698c651..5560264eb 100644 --- a/benchmark.go +++ b/benchmark.go @@ -168,7 +168,7 @@ func main() { } // Consensus object. - consensus := consensus.NewConsensus(*ip, *port, shardID, peers, leader) + consensus := consensus.New(selfPeer, shardID, peers, leader) consensus.MinPeers = *minPeers // Start Profiler for leader if profile argument is on @@ -183,7 +183,7 @@ func main() { // Set logger to attack model. attack.GetInstance().SetLogger(consensus.Log) // Current node. - currentNode := node.New(consensus, ldb) + currentNode := node.New(consensus, ldb, selfPeer) // Add self peer. currentNode.SelfPeer = selfPeer // Add sync node configuration. diff --git a/client/txgen/main.go b/client/txgen/main.go index 33e3bdd30..c5501876b 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -81,7 +81,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for shardID := range shardIDLeaderMap { - node := node.New(&consensus.Consensus{ShardID: shardID}, nil) + node := node.New(&consensus.Consensus{ShardID: shardID}, nil, p2p.Peer{}) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.NumOfAddress) nodes = append(nodes, node) @@ -89,11 +89,8 @@ func main() { // Client/txgenerator server node setup clientPeer := config.GetClientPeer() - consensusObj := consensus.NewConsensus(clientPeer.Ip, clientPeer.Port, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj, nil) - // Add self peer. - // TODO(ricl): setting self peer should be moved into consensus / node New! - clientNode.SelfPeer = *clientPeer + consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{}) + clientNode := node.New(consensusObj, nil, *clientPeer) if clientPeer != nil { p2pv2.InitHost(clientPeer.Ip, clientPeer.Port) // TODO: this should be moved into client node. diff --git a/client/wallet/main.go b/client/wallet/main.go index ed15f4e9e..338370247 100644 --- a/client/wallet/main.go +++ b/client/wallet/main.go @@ -256,7 +256,7 @@ func CreateWalletServerNode() *node.Node { shardIDLeaderMap = getShardIDToLeaderMap() clientPeer = &p2p.Peer{Port: "127.0.0.1", Ip: "1234"} } - walletNode := node.New(nil, nil) + walletNode := node.New(nil, nil, *clientPeer) // TODO(ricl): shouldn't the selfPeer for client being clientPeer?? walletNode.Client = client.NewClient(&shardIDLeaderMap) walletNode.ClientPeer = clientPeer return walletNode diff --git a/client/wallet_v2/main.go b/client/wallet_v2/main.go index ed15f4e9e..467bd49b4 100644 --- a/client/wallet_v2/main.go +++ b/client/wallet_v2/main.go @@ -256,7 +256,7 @@ func CreateWalletServerNode() *node.Node { shardIDLeaderMap = getShardIDToLeaderMap() clientPeer = &p2p.Peer{Port: "127.0.0.1", Ip: "1234"} } - walletNode := node.New(nil, nil) + walletNode := node.New(nil, nil, *clientPeer) walletNode.Client = client.NewClient(&shardIDLeaderMap) walletNode.ClientPeer = clientPeer return walletNode diff --git a/consensus/consensus.go b/consensus/consensus.go index 1c8fd8cd1..11f11e2a4 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -102,13 +102,11 @@ type BlockConsensusStatus struct { state State // the latest state of the consensus } -// NewConsensus creates a new Consensus object -// TODO(minhdoan): Maybe convert it into just New -// FYI, see https://golang.org/doc/effective_go.html?#package-names -func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { +// New creates a new Consensus object +func New(selfPeer p2p.Peer, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { consensus := Consensus{} - if leader.Port == port && leader.Ip == ip { + if leader.Port == selfPeer.Port && leader.Ip == selfPeer.Ip { consensus.IsLeader = true } else { consensus.IsLeader = false @@ -121,7 +119,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * consensus.leader = leader for _, peer := range peers { - consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) + consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) } // Initialize cosign bitmap @@ -146,7 +144,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * // For now use socket address as 16 byte Id // TODO: populate with correct Id - consensus.nodeID = utils.GetUniqueIdFromPeer(p2p.Peer{Ip: ip, Port: port}) + consensus.nodeID = utils.GetUniqueIDFromPeer(selfPeer) // Set private key for myself so that I can sign messages. consensus.priKey = crypto.Ed25519Curve.Scalar().SetInt64(int64(consensus.nodeID)) @@ -239,12 +237,12 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { count := 0 for _, peer := range peers { - _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) + _, ok := consensus.validators.Load(utils.GetUniqueIDFromPeer(peer)) if !ok { if peer.ValidatorID == -1 { peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueId()) } - consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) + consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) } count++ diff --git a/consensus/consensus_leader_msg_test.go b/consensus/consensus_leader_msg_test.go index 471357781..eab46485f 100644 --- a/consensus/consensus_leader_msg_test.go +++ b/consensus/consensus_leader_msg_test.go @@ -12,7 +12,7 @@ import ( func TestConstructAnnounceMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} header := consensus.blockHeader msg := consensus.constructAnnounceMessage() @@ -35,7 +35,7 @@ func TestConstructChallengeMessage(test *testing.T) { validatorPubKey := pki.GetPublicKeyFromScalar(leaderPriKey) validator := p2p.Peer{Ip: "3", Port: "5", PubKey: validatorPubKey} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} (*consensus.commitments)[0] = leaderPubKey (*consensus.commitments)[1] = validatorPubKey diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index ec6b6dd87..0d6528097 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -6,10 +6,10 @@ import ( "github.com/harmony-one/harmony/p2p" ) -func TestNewConsensus(test *testing.T) { +func TestNew(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) if consensus.consensusID != 0 { test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusID) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index fa25af968..fa1bcfe65 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -72,7 +72,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { // Verify block data // check leader Id - myLeaderID := utils.GetUniqueIdFromPeer(consensus.leader) + myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader) if leaderID != myLeaderID { consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus) return @@ -175,7 +175,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState // Verify block data and the aggregated signatures // check leader Id - myLeaderID := utils.GetUniqueIdFromPeer(consensus.leader) + myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader) if leaderID != myLeaderID { consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus) return @@ -325,7 +325,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { // Verify block data // check leader Id - myLeaderID := utils.GetUniqueIdFromPeer(consensus.leader) + myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader) if leaderID != myLeaderID { consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus) return diff --git a/consensus/consensus_validator_msg_test.go b/consensus/consensus_validator_msg_test.go index 07d263c54..0ea798a35 100644 --- a/consensus/consensus_validator_msg_test.go +++ b/consensus/consensus_validator_msg_test.go @@ -11,7 +11,7 @@ import ( func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} _, msg := consensus.constructCommitMessage(consensus_proto.Commit) @@ -23,7 +23,7 @@ func TestConstructCommitMessage(test *testing.T) { func TestConstructResponseMessage(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} msg := consensus.constructResponseMessage(consensus_proto.Response, crypto.Ed25519Curve.Scalar()) diff --git a/node/node.go b/node/node.go index bd0d0533d..232eeeabf 100644 --- a/node/node.go +++ b/node/node.go @@ -261,7 +261,7 @@ func DeserializeNode(d []byte) *NetworkNode { } // New creates a new node. -func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { +func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node { node := Node{} if consensus != nil { @@ -315,6 +315,9 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node.BlockChannelAccount = make(chan *types.Block) node.Worker = worker.New(params.TestChainConfig, chain, bft.NewFaker()) } + + node.SelfPeer = selfPeer + // Logger node.log = log.New() if consensus.IsLeader { diff --git a/node/node_test.go b/node/node_test.go index f76aa29b0..5640e9d6d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -18,9 +18,9 @@ import ( func TestNewNewNode(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil) + node := New(consensus, nil, leader) if node.Consensus == nil { test.Error("Consensus is not initialized for the node") } @@ -45,9 +45,9 @@ func TestNewNewNode(test *testing.T) { func TestCountNumTransactionsInBlockchain(test *testing.T) { leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil) + node := New(consensus, nil, leader) node.AddTestingAddresses(1000) if node.countNumTransactionsInBlockchain() != 1001 { test.Error("Count of transactions in the blockchain is incorrect") @@ -79,9 +79,9 @@ func TestAddPeers(test *testing.T) { } leader := p2p.Peer{Ip: "1", Port: "2"} validator := p2p.Peer{Ip: "3", Port: "5"} - consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil) + node := New(consensus, nil, leader) r1 := node.AddPeers(peers1) e1 := 2 if r1 != e1 { @@ -150,7 +150,7 @@ func exitServer() { // func TestPingPongHandler(test *testing.T) { // leader := p2p.Peer{Ip: "127.0.0.1", Port: "8881"} // // validator := p2p.Peer{Ip: "127.0.0.1", Port: "9991"} -// consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) +// consensus := consensus.New("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) // node := New(consensus, nil) diff --git a/utils/distribution_config.go b/utils/distribution_config.go index 55ccb8cfa..ed93c8657 100644 --- a/utils/distribution_config.go +++ b/utils/distribution_config.go @@ -165,6 +165,6 @@ func (config *DistributionConfig) GetMyConfigEntry(ip string, port string) *Conf func setKey(peer *p2p.Peer) { // Get public key deterministically based on ip and port - priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIdFromPeer(*peer))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) + priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIDFromPeer(*peer))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) peer.PubKey = pki.GetPublicKeyFromScalar(priKey) } diff --git a/utils/utils.go b/utils/utils.go index e35e0756a..ec56a541c 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -25,17 +25,13 @@ func ConvertFixedDataIntoByteArray(data interface{}) []byte { } // TODO(minhdoan): this is probably a hack, probably needs some strong non-collision hash. -func GetUniqueIdFromPeer(peer p2p.Peer) uint16 { - reg, err := regexp.Compile("[^0-9]+") - if err != nil { - log.Panic("Regex Compilation Failed", "err", err) - } - socketId := reg.ReplaceAllString(peer.Ip+peer.Port, "") // A integer Id formed by unique IP/PORT pair - value, _ := strconv.Atoi(socketId) - return uint16(value) +// GetUniqueIDFromPeer -- +func GetUniqueIDFromPeer(peer p2p.Peer) uint16 { + return GetUniqueIDFromIPPort(peer.Ip, peer.Port) } -func GetUniqueIdFromIpPort(ip, port string) uint16 { +// GetUniqueIDFromIPPort -- +func GetUniqueIDFromIPPort(ip, port string) uint16 { reg, err := regexp.Compile("[^0-9]+") if err != nil { log.Panic("Regex Compilation Failed", "err", err) @@ -65,7 +61,7 @@ func RunCmd(name string, args ...string) error { } func GenKey(ip, port string) (kyber.Scalar, kyber.Point) { - priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIdFromIpPort(ip, port))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) + priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIDFromIPPort(ip, port))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) pubKey := pki.GetPublicKeyFromScalar(priKey) return priKey, pubKey From 94fdb1e20c8b6251e90a8583ce32777788498db2 Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 29 Nov 2018 21:55:12 -0800 Subject: [PATCH 3/8] HAR-84: fix golint warnings on proto/ directory Signed-off-by: Leo Chen --- node/node_handler.go | 6 +++--- proto/client/client.go | 17 ++++++++++------- proto/common.go | 14 +++++++------- proto/consensus/consensus.go | 16 ++++++++-------- proto/identity/identity.go | 12 ++++++------ 5 files changed, 34 insertions(+), 31 deletions(-) diff --git a/node/node_handler.go b/node/node_handler.go index 69b68e612..54626dab7 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -75,7 +75,7 @@ func (node *Node) NodeHandler(conn net.Conn) { switch msgCategory { case proto.Identity: - actionType := proto_identity.IdentityMessageType(msgType) + actionType := proto_identity.IDMessageType(msgType) switch actionType { case proto_identity.Identity: messageType := proto_identity.MessageType(msgPayload[0]) @@ -90,7 +90,7 @@ func (node *Node) NodeHandler(conn net.Conn) { } } case proto.Consensus: - actionType := consensus.ConsensusMessageType(msgType) + actionType := consensus.ConMessageType(msgType) switch actionType { case consensus.Consensus: if consensusObj.IsLeader { @@ -189,7 +189,7 @@ func (node *Node) NodeHandler(conn net.Conn) { node.pongMessageHandler(msgPayload) } case proto.Client: - actionType := client.ClientMessageType(msgType) + actionType := client.MessageType(msgType) node.log.Info("NET: received message: Client/Transaction") switch actionType { case client.Transaction: diff --git a/proto/client/client.go b/proto/client/client.go index 5bcc08177..be7644cb8 100644 --- a/proto/client/client.go +++ b/proto/client/client.go @@ -8,28 +8,31 @@ import ( "github.com/harmony-one/harmony/proto" ) -// The specific types of message under Client category -type ClientMessageType byte +// MessageType is the specific types of message under Client category +type MessageType byte +// Message type supported by client const ( - Transaction ClientMessageType = iota + Transaction MessageType = iota // TODO: add more types ) -// The types of messages used for Client/Transaction +// TransactionMessageType defines the types of messages used for Client/Transaction type TransactionMessageType int +// The proof of accept or reject returned by the leader to the client tnat issued cross shard transactions const ( - ProofOfLock TransactionMessageType = iota // The proof of accept or reject returned by the leader to the client tnat issued cross shard transactions. + ProofOfLock TransactionMessageType = iota UtxoResponse ) +// FetchUtxoResponseMessage is the data structure of UTXO map type FetchUtxoResponseMessage struct { UtxoMap blockchain.UtxoMap ShardID uint32 } -// [leader] Constructs the proof of accept or reject message that will be sent to client +// ConstructProofOfAcceptOrRejectMessage constructs the proof of accept or reject message that will be sent to client func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)}) byteBuffer.WriteByte(byte(Transaction)) @@ -40,7 +43,7 @@ func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof return byteBuffer.Bytes() } -// Constructs the response message to fetch utxo message +// ConstructFetchUtxoResponseMessage constructs the response message to fetch utxo message func ConstructFetchUtxoResponseMessage(utxoMap *blockchain.UtxoMap, shardID uint32) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)}) byteBuffer.WriteByte(byte(Transaction)) diff --git a/proto/common.go b/proto/common.go index be8100e0f..a552e98b3 100644 --- a/proto/common.go +++ b/proto/common.go @@ -21,7 +21,7 @@ n - 2 bytes - actual message payload ---- content end ----- */ -// The message category enum +// MessageCategory defines the message category enum type MessageCategory byte //Consensus and other message categories @@ -39,26 +39,26 @@ const MessageCategoryBytes = 1 // MessageTypeBytes is the number of bytes message type takes const MessageTypeBytes = 1 -// Get the message category from the p2p message content +// GetMessageCategory gets the message category from the p2p message content func GetMessageCategory(message []byte) (MessageCategory, error) { if len(message) < MessageCategoryBytes { - return 0, errors.New("Failed to get message category: no data available.") + return 0, errors.New("failed to get message category: no data available") } return MessageCategory(message[MessageCategoryBytes-1]), nil } -// Get the message type from the p2p message content +// GetMessageType gets the message type from the p2p message content func GetMessageType(message []byte) (byte, error) { if len(message) < MessageCategoryBytes+MessageTypeBytes { - return 0, errors.New("Failed to get message type: no data available.") + return 0, errors.New("failed to get message type: no data available") } return byte(message[MessageCategoryBytes+MessageTypeBytes-1]), nil } -// Get the node message payload from the p2p message content +// GetMessagePayload gets the node message payload from the p2p message content func GetMessagePayload(message []byte) ([]byte, error) { if len(message) < MessageCategoryBytes+MessageTypeBytes { - return []byte{}, errors.New("Failed to get message payload: no data available.") + return []byte{}, errors.New("failed to get message payload: no data available") } return message[MessageCategoryBytes+MessageTypeBytes:], nil } diff --git a/proto/consensus/consensus.go b/proto/consensus/consensus.go index d389dd4fb..94c086292 100644 --- a/proto/consensus/consensus.go +++ b/proto/consensus/consensus.go @@ -71,12 +71,12 @@ Response: // MessageTypeBytes is the number of bytes consensus message type occupies const MessageTypeBytes = 1 -// ConsensusMessageType is the specific types of message under Consensus category -type ConsensusMessageType byte +// ConMessageType is the specific types of message under Consensus category +type ConMessageType byte // Consensus message type constants. const ( - Consensus ConsensusMessageType = iota + Consensus ConMessageType = iota // TODO: add more types ) @@ -117,23 +117,23 @@ func (msgType MessageType) String() string { return names[msgType] } -// Get the consensus message type from the consensus message +// GetConsensusMessageType gets the consensus message type from the consensus message func GetConsensusMessageType(message []byte) (MessageType, error) { if len(message) < 1 { - return 0, errors.New("Failed to get consensus message type: no data available.") + return 0, errors.New("failed to get consensus message type: no data available") } return MessageType(message[0]), nil } -// Get the consensus message payload from the consensus message +// GetConsensusMessagePayload gets the consensus message payload from the consensus message func GetConsensusMessagePayload(message []byte) ([]byte, error) { if len(message) < 2 { - return []byte{}, errors.New("Failed to get consensus message payload: no data available.") + return []byte{}, errors.New("failed to get consensus message payload: no data available") } return message[MessageTypeBytes:], nil } -// Concatenate msgType as one byte with payload, and return the whole byte array +// ConstructConsensusMessage concatenates msgType as one byte with payload, and return the whole byte array func ConstructConsensusMessage(consensusMsgType MessageType, payload []byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Consensus)}) byteBuffer.WriteByte(byte(Consensus)) diff --git a/proto/identity/identity.go b/proto/identity/identity.go index 716dfe165..cafba9cc3 100644 --- a/proto/identity/identity.go +++ b/proto/identity/identity.go @@ -10,12 +10,12 @@ import ( // IdentityMessageTypeBytes is the number of bytes consensus message type occupies const IdentityMessageTypeBytes = 1 -// IdentityMessageType is the identity message type. -type IdentityMessageType byte +// IDMessageType is the identity message type. +type IDMessageType byte // Constants of IdentityMessageType. const ( - Identity IdentityMessageType = iota + Identity IDMessageType = iota // TODO: add more types ) @@ -44,7 +44,7 @@ func (msgType MessageType) String() string { // GetIdentityMessageType Get the identity message type from the identity message func GetIdentityMessageType(message []byte) (MessageType, error) { if len(message) < 1 { - return 0, errors.New("Failed to get identity message type: no data available.") + return 0, errors.New("failed to get identity message type: no data available") } return MessageType(message[0]), nil } @@ -52,12 +52,12 @@ func GetIdentityMessageType(message []byte) (MessageType, error) { // GetIdentityMessagePayload message payload from the identity message func GetIdentityMessagePayload(message []byte) ([]byte, error) { if len(message) < 2 { - return []byte{}, errors.New("Failed to get identity message payload: no data available.") + return []byte{}, errors.New("failed to get identity message payload: no data available") } return message[IdentityMessageTypeBytes:], nil } -// Concatenate msgType as one byte with payload, and return the whole byte array +// ConstructIdentityMessage concatenates msgType as one byte with payload, and return the whole byte array func ConstructIdentityMessage(identityMessageType MessageType, payload []byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Identity)}) byteBuffer.WriteByte(byte(Identity)) From b69bfd6a137a3f0e02f93dece2be3eda8a2045dc Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Thu, 29 Nov 2018 22:14:27 -0800 Subject: [PATCH 4/8] fix golint for client/txgent beaconchain --- beaconchain/beaconchain_handler.go | 3 +-- client/txgen/main.go | 16 +++++++++------- client/txgen/txgen/account_txs_generator.go | 2 ++ client/txgen/txgen/utxo_txs_generator.go | 11 ++++++----- client/wallet/main.go | 14 ++++++++------ client/wallet_v2/main.go | 14 ++++++++------ 6 files changed, 34 insertions(+), 26 deletions(-) diff --git a/beaconchain/beaconchain_handler.go b/beaconchain/beaconchain_handler.go index 5ed1c105f..c3b7ba6c9 100644 --- a/beaconchain/beaconchain_handler.go +++ b/beaconchain/beaconchain_handler.go @@ -17,9 +17,8 @@ func (IDC *BeaconChain) BeaconChainHandler(conn net.Conn) { if err != nil { IDC.log.Error("Read p2p data failed") return - } else { - IDC.log.Info("received connection") } + IDC.log.Info("received connection") msgCategory, err := proto.GetMessageCategory(content) if err != nil { IDC.log.Error("Read message category failed", "err", err) diff --git a/client/txgen/main.go b/client/txgen/main.go index a952ad1f7..08225b2d0 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -98,15 +98,15 @@ func main() { log.Debug("Received new block from leader", "len", len(blocks)) for _, block := range blocks { for _, node := range nodes { - shardId := block.ShardID + shardID := block.ShardID accountBlock := new(types.Block) err := rlp.DecodeBytes(block.AccountBlock, accountBlock) if err == nil { - shardId = accountBlock.ShardId() + shardID = accountBlock.ShardId() } - if node.Consensus.ShardID == shardId { - log.Debug("Adding block from leader", "shardID", shardId) + if node.Consensus.ShardID == shardID { + log.Debug("Adding block from leader", "shardID", shardID) // Add it to blockchain node.AddNewBlock(block) utxoPoolMutex.Lock() @@ -159,7 +159,7 @@ func main() { utxoPoolMutex.Lock() log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions + for shardID := range shardIDLeaderMap { // Generate simulated transactions go func(shardID uint32) { txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) @@ -200,7 +200,7 @@ func main() { utxoPoolMutex.Lock() log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions + for shardID := range shardIDLeaderMap { // Generate simulated transactions go func(shardID uint32) { txs, crossTxs := txgen.GenerateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes, setting) @@ -217,7 +217,7 @@ func main() { // Put txs into corresponding shards shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) for _, crossTx := range crossTxs { - for curShardID, _ := range client.GetInputShardIDsOfCrossShardTx(crossTx) { + for curShardID := range client.GetInputShardIDsOfCrossShardTx(crossTx) { shardIDTxsMap[curShardID] = append(shardIDTxsMap[curShardID], crossTx) } } @@ -248,12 +248,14 @@ func main() { time.Sleep(3000 * time.Millisecond) } +// SendTxsToLeader sends txs to leader. func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) { log.Debug("[Generator] Sending txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessage(txs) p2p.SendMessage(leader, msg) } +// SendTxsToLeaderAccount sends txs to leader account. func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) { log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessageAccount(txs) diff --git a/client/txgen/txgen/account_txs_generator.go b/client/txgen/txgen/account_txs_generator.go index 8b007a4c8..035c0a03c 100644 --- a/client/txgen/txgen/account_txs_generator.go +++ b/client/txgen/txgen/account_txs_generator.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/node" ) +// TxGenSettings is the settings for TX generation. type TxGenSettings struct { NumOfAddress int CrossShard bool @@ -16,6 +17,7 @@ type TxGenSettings struct { CrossShardRatio int } +// GenerateSimulatedTransactionsAccount generates simulated transaction for account model. func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, setting TxGenSettings) (types.Transactions, types.Transactions) { _ = setting // TODO: take use of settings node := dataNodes[shardID] diff --git a/client/txgen/txgen/utxo_txs_generator.go b/client/txgen/txgen/utxo_txs_generator.go index 852ca40f6..30bb4be54 100644 --- a/client/txgen/txgen/utxo_txs_generator.go +++ b/client/txgen/txgen/utxo_txs_generator.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/node" ) +// TxInfo is the transaction info. type TxInfo struct { // Global Input shardID int @@ -27,7 +28,7 @@ type TxInfo struct { txCount int } -// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. +// GenerateSimulatedTransactions generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. // The transactions are generated by going through the existing utxos and // randomly select a subset of them as the input for each new transaction. The output // address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses. @@ -39,13 +40,13 @@ type TxInfo struct { // token (1000) to each address in [0 - N). See node.AddTestingAddresses() // // Params: -// subsetId - the which subset of the utxo to work on (used to select addresses) +// subsetID - the which subset of the utxo to work on (used to select addresses) // shardID - the shardID for current shard // dataNodes - nodes containing utxopools of all shards // Returns: // all single-shard txs // all cross-shard txs -func GenerateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node, setting TxGenSettings) ([]*blockchain.Transaction, []*blockchain.Transaction) { +func GenerateSimulatedTransactions(subsetID, numSubset int, shardID int, dataNodes []*node.Node, setting TxGenSettings) ([]*blockchain.Transaction, []*blockchain.Transaction) { /* UTXO map structure: address - [ @@ -68,7 +69,7 @@ func GenerateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNod UTXOLOOP: // Loop over all addresses for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { - if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time + if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetID%numSubset { // Work on one subset of utxo at a time txInfo.address = address // Loop over all txIDs for the address for txIDStr, utxoMap := range txMap { @@ -82,7 +83,7 @@ UTXOLOOP: // Loop over all utxos for the txID utxoSize := len(utxoMap) batchSize := utxoSize / numSubset - i := subsetId % numSubset + i := subsetID % numSubset counter := 0 for index, value := range utxoMap { counter++ diff --git a/client/wallet/main.go b/client/wallet/main.go index 866dbe89c..98f3c70f5 100644 --- a/client/wallet/main.go +++ b/client/wallet/main.go @@ -244,6 +244,7 @@ func getShardIDToLeaderMap() map[uint32]p2p.Peer { return shardIDLeaderMap } +// CreateWalletServerNode creates wallet server node. func CreateWalletServerNode() *node.Node { configr := client_config.NewConfig() var shardIDLeaderMap map[uint32]p2p.Peer @@ -262,7 +263,7 @@ func CreateWalletServerNode() *node.Node { return walletNode } -// Issue the transaction to the Harmony network +// ExecuteTransaction issues the transaction to the Harmony network func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error { if tx.IsCrossShard() { walletNode.Client.PendingCrossTxsMutex.Lock() @@ -292,7 +293,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } } -// Fetch utxos of specified address from the Harmony network +// FetchUtxos fetches utxos of specified address from the Harmony network func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) @@ -316,6 +317,7 @@ func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockch } } +// PrintUtxoBalance prints utxo balance. func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { addressBalance := make(map[[20]byte]int) for _, utxoMap := range shardUtxoMap { @@ -338,7 +340,7 @@ func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { } } -// Read the addresses stored in local keystore +// ReadAddresses reads the addresses stored in local keystore func ReadAddresses() [][20]byte { priKeys := ReadPrivateKeys() addresses := [][20]byte{} @@ -348,7 +350,7 @@ func ReadAddresses() [][20]byte { return addresses } -// Store the specified private key in local keystore +// StorePrivateKey stores the specified private key in local keystore func StorePrivateKey(priKey []byte) { for _, address := range ReadAddresses() { if address == pki.GetAddressFromPrivateKey(crypto.Ed25519Curve.Scalar().SetBytes(priKey)) { @@ -369,12 +371,12 @@ func StorePrivateKey(priKey []byte) { f.Close() } -// Delete all data in the local keystore +// ClearKeystore deletes all data in the local keystore func ClearKeystore() { ioutil.WriteFile("keystore", []byte{}, 0644) } -// Read all the private key stored in local keystore +// ReadPrivateKeys reads all the private key stored in local keystore func ReadPrivateKeys() []kyber.Scalar { keys, err := ioutil.ReadFile("keystore") if err != nil { diff --git a/client/wallet_v2/main.go b/client/wallet_v2/main.go index 866dbe89c..cd4b8fea4 100644 --- a/client/wallet_v2/main.go +++ b/client/wallet_v2/main.go @@ -244,6 +244,7 @@ func getShardIDToLeaderMap() map[uint32]p2p.Peer { return shardIDLeaderMap } +// CreateWalletServerNode creates wallet server node. func CreateWalletServerNode() *node.Node { configr := client_config.NewConfig() var shardIDLeaderMap map[uint32]p2p.Peer @@ -262,7 +263,7 @@ func CreateWalletServerNode() *node.Node { return walletNode } -// Issue the transaction to the Harmony network +// ExecuteTransaction issues the transaction to the Harmony network func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error { if tx.IsCrossShard() { walletNode.Client.PendingCrossTxsMutex.Lock() @@ -292,7 +293,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } } -// Fetch utxos of specified address from the Harmony network +// FetchUtxos fetches utxos of specified address from the Harmony network func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) @@ -316,6 +317,7 @@ func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockch } } +// PrintUtxoBalance prints UTXO balance. func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { addressBalance := make(map[[20]byte]int) for _, utxoMap := range shardUtxoMap { @@ -338,7 +340,7 @@ func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { } } -// Read the addresses stored in local keystore +// ReadAddresses reads the addresses stored in local keystore func ReadAddresses() [][20]byte { priKeys := ReadPrivateKeys() addresses := [][20]byte{} @@ -348,7 +350,7 @@ func ReadAddresses() [][20]byte { return addresses } -// Store the specified private key in local keystore +// StorePrivateKey stores the specified private key in local keystore func StorePrivateKey(priKey []byte) { for _, address := range ReadAddresses() { if address == pki.GetAddressFromPrivateKey(crypto.Ed25519Curve.Scalar().SetBytes(priKey)) { @@ -369,12 +371,12 @@ func StorePrivateKey(priKey []byte) { f.Close() } -// Delete all data in the local keystore +// ClearKeystore deletes all data in the local keystore func ClearKeystore() { ioutil.WriteFile("keystore", []byte{}, 0644) } -// Read all the private key stored in local keystore +// ReadPrivateKeys reads all the private key stored in local keystore func ReadPrivateKeys() []kyber.Scalar { keys, err := ioutil.ReadFile("keystore") if err != nil { From 8c08f5ba5e6ad513b7d28d7b1f6e897e1f7024dd Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Thu, 29 Nov 2018 22:25:51 -0800 Subject: [PATCH 5/8] HAR-84: fix golint warnings in crytpo, p2p Signed-off-by: Leo Chen --- beaconchain/beaconchain_handler.go | 2 +- crypto/cosi.go | 2 +- p2p/backoff.go | 7 +++++-- p2p/helper.go | 11 +++++++---- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/beaconchain/beaconchain_handler.go b/beaconchain/beaconchain_handler.go index c3b7ba6c9..a7a2f0127 100644 --- a/beaconchain/beaconchain_handler.go +++ b/beaconchain/beaconchain_handler.go @@ -47,7 +47,7 @@ func (IDC *BeaconChain) BeaconChainHandler(conn net.Conn) { } switch msgCategory { case proto.Identity: - actionType := proto_identity.IdentityMessageType(msgType) + actionType := proto_identity.IDMessageType(msgType) switch actionType { case proto_identity.Identity: idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload) diff --git a/crypto/cosi.go b/crypto/cosi.go index 675616053..4c98c2c69 100644 --- a/crypto/cosi.go +++ b/crypto/cosi.go @@ -1,5 +1,5 @@ /* -Package cosi implements the collective signing (CoSi) algorithm as presented in +Package crypto implements the collective signing (CoSi) algorithm as presented in the paper "Keeping Authorities 'Honest or Bust' with Decentralized Witness Cosigning" by Ewa Syta et al. See https://arxiv.org/abs/1503.08768. This package only provides the functionality for the cryptographic operations of diff --git a/p2p/backoff.go b/p2p/backoff.go index 2bac92e41..c57a636cb 100644 --- a/p2p/backoff.go +++ b/p2p/backoff.go @@ -16,6 +16,7 @@ type BackoffBase struct { Min, Cur, Max time.Duration } +// NewBackoffBase creates a new BackOffBase structure func NewBackoffBase(min, max time.Duration) *BackoffBase { return &BackoffBase{min, min, max} } @@ -40,21 +41,23 @@ func (b *BackoffBase) Sleep() { } } -// Adjust the duration. Subtypes shall implement this. +// Backoff adjusts the duration. Subtypes shall implement this. func (b *BackoffBase) Backoff() { // default implementation does not backoff } -// Exponential backoff. +// ExpBackoff is an exponential backoff data structure. type ExpBackoff struct { BackoffBase Factor float64 } +// NewExpBackoff creates a new ExpBackOff structure func NewExpBackoff(min, max time.Duration, factor float64) *ExpBackoff { return &ExpBackoff{*NewBackoffBase(min, max), factor} } +// Backoff implements the exponential backoff func (b *ExpBackoff) Backoff() { b.Cur = time.Duration(float64(b.Cur) * b.Factor) } diff --git a/p2p/helper.go b/p2p/helper.go index d748a0aae..b38c999d0 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -24,9 +24,10 @@ content (n bytes) - actual message content */ -const BATCH_SIZE = 1 << 16 +// BatchSize defines the size of buffer +const BatchSize = 1 << 16 -// Read the message type and content size, and return the actual content. +// ReadMessageContent reads the message type and content size, and return the actual content. func ReadMessageContent(conn net.Conn) ([]byte, error) { var ( contentBuf = bytes.NewBuffer([]byte{}) @@ -67,12 +68,12 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { //log.Printf("The content size is %d bytes.", bytesToRead) //// Read the content in chunk of 16 * 1024 bytes - tmpBuf := make([]byte, BATCH_SIZE) + tmpBuf := make([]byte, BatchSize) ILOOP: for { timeoutDuration := 10 * time.Second conn.SetReadDeadline(time.Now().Add(timeoutDuration)) - if bytesToRead < BATCH_SIZE { + if bytesToRead < BatchSize { // Read the last number of bytes less than 1024 tmpBuf = make([]byte, bytesToRead) } @@ -97,6 +98,7 @@ ILOOP: return contentBuf.Bytes(), nil } +// CreateMessage create a general message. FIXME: this is not used func CreateMessage(msgType byte, data []byte) []byte { buffer := bytes.NewBuffer([]byte{}) @@ -110,6 +112,7 @@ func CreateMessage(msgType byte, data []byte) []byte { return buffer.Bytes() } +// SendMessageContent send message over net connection. FIXME: this is not used func SendMessageContent(conn net.Conn, data []byte) { msgToSend := CreateMessage(byte(1), data) w := bufio.NewWriter(conn) From a907a214b37028a06080729e2eda9b8f6f40be3f Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Thu, 29 Nov 2018 23:54:53 -0800 Subject: [PATCH 6/8] BatchSize; remove infinite loop in writeData --- p2pv2/host.go | 14 +++++++------- p2pv2/util.go | 6 ++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/p2pv2/host.go b/p2pv2/host.go index 219fb0190..59427fe68 100644 --- a/p2pv2/host.go +++ b/p2pv2/host.go @@ -23,8 +23,8 @@ var ( ) const ( - // BatchSize The batch size in which we return data - BatchSize = 1 << 16 + // BatchSizeInByte The batch size in byte (64MB) in which we return data + BatchSizeInByte = 1 << 16 // ProtocolID The ID of protocol used in stream handling. ProtocolID = "/harmony/0.0.1" ) @@ -54,7 +54,7 @@ func BindHandler(handler net.StreamHandler) { // Send a p2p message sending function with signature compatible to p2pv1. func Send(ip, port string, message []byte) error { - addr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%s", port) + addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) targetAddr, err := multiaddr.NewMultiaddr(addr) priv := addrToPrivKey(addr) @@ -109,15 +109,15 @@ func ReadData(s net.Stream) ([]byte, error) { bytesToRead := binary.BigEndian.Uint32(fourBytes) //log.Printf("The content size is %d bytes.", bytesToRead) - // Read the content in chunk of 16 * 1024 bytes - tmpBuf := make([]byte, BatchSize) + // Read the content in chunk of size `BatchSizeInByte` + tmpBuf := make([]byte, BatchSizeInByte) ILOOP: for { // TODO(ricl): is this necessary? If yes, figure out how to make it work // timeoutDuration := 10 * time.Second // s.SetReadDeadline(time.Now().Add(timeoutDuration)) - if bytesToRead < BatchSize { - // Read the last number of bytes less than 1024 + if bytesToRead < BatchSizeInByte { + // Read the last number of bytes less than `BatchSizeInByte` tmpBuf = make([]byte, bytesToRead) } n, err := rw.Read(tmpBuf) diff --git a/p2pv2/util.go b/p2pv2/util.go index 664e708b0..52486bd78 100644 --- a/p2pv2/util.go +++ b/p2pv2/util.go @@ -26,8 +26,6 @@ func addrToPrivKey(addr string) ic.PrivKey { } func writeData(w *bufio.Writer, data []byte) { - for { - w.Write(data) - w.Flush() - } + w.Write(data) + w.Flush() } From f2c3290ccd14b07f7936ebc4ec1da803b875ac43 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Fri, 30 Nov 2018 01:23:30 -0800 Subject: [PATCH 7/8] fix test issues --- blockchain/merkle_tree_test.go | 2 +- node/node_test.go | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/blockchain/merkle_tree_test.go b/blockchain/merkle_tree_test.go index 04a7adbee..70ce44ddc 100644 --- a/blockchain/merkle_tree_test.go +++ b/blockchain/merkle_tree_test.go @@ -13,7 +13,7 @@ func TestNewMerkleNode(t *testing.T) { []byte("node3"), } - fmt.Println("TEting") + fmt.Println("Testing") // Level 1 n1 := NewMerkleNode(nil, nil, data[0]) diff --git a/node/node_test.go b/node/node_test.go index 57a52c385..bb0458b4b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -148,20 +148,12 @@ func exitServer() { } // func TestPingPongHandler(test *testing.T) { -<<<<<<< HEAD -// leader := p2p.Peer{Ip: "127.0.0.1", Port: "8881"} -// // validator := p2p.Peer{Ip: "127.0.0.1", Port: "9991"} -// consensus := consensus.New("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) - -======= // leader := p2p.Peer{IP: "127.0.0.1", Port: "8881"} -// consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) ->>>>>>> 8cddd1548aa2057a4d17d149addc5f579146fddd +// // validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"} +// consensus := consensus.New("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) // node := New(consensus, nil) - // // go sendPingMessage(leader) // go sendPongMessage(leader) // go exitServer() - // node.StartServer("8881") // } From ddf0e3cc522685940351a42aa6bd7165b485173e Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 30 Nov 2018 11:09:07 -0800 Subject: [PATCH 8/8] move p2p logic into seperate files --- node/node.go | 1 + node/node_handler.go | 166 ----------------------------------- node/node_handler_p2p.go | 183 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 184 insertions(+), 166 deletions(-) create mode 100644 node/node_handler_p2p.go diff --git a/node/node.go b/node/node.go index bd18d2264..4e4ddaa21 100644 --- a/node/node.go +++ b/node/node.go @@ -352,6 +352,7 @@ func (node *Node) JoinShard(leader p2p.Peer) { ping := proto_node.NewPingMessage(node.SelfPeer) buffer := ping.ConstructPingMessage() + // Talk to leader. p2p.SendMessage(leader, buffer) } } diff --git a/node/node_handler.go b/node/node_handler.go index eff9d4fe6..54626dab7 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -9,8 +9,6 @@ import ( "strconv" "time" - "github.com/harmony-one/harmony/p2pv2" - "github.com/dedis/kyber" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/blockchain" @@ -23,7 +21,6 @@ import ( "github.com/harmony-one/harmony/proto/consensus" proto_identity "github.com/harmony-one/harmony/proto/identity" proto_node "github.com/harmony-one/harmony/proto/node" - netp2p "github.com/libp2p/go-libp2p-net" ) const ( @@ -205,169 +202,6 @@ func (node *Node) NodeHandler(conn net.Conn) { } } -// NodeHandler handles a new incoming connection. -func (node *Node) NodeHandlerV1(s netp2p.Stream) { - defer s.Close() - - // Read p2p message payload - content, err := p2pv2.ReadData(s) - - if err != nil { - node.log.Error("Read p2p data failed", "err", err, "node", node) - return - } - // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. - node.MaybeBroadcastAsValidator(content) - - consensusObj := node.Consensus - - msgCategory, err := proto.GetMessageCategory(content) - if err != nil { - node.log.Error("Read node type failed", "err", err, "node", node) - return - } - - msgType, err := proto.GetMessageType(content) - if err != nil { - node.log.Error("Read action type failed", "err", err, "node", node) - return - } - - msgPayload, err := proto.GetMessagePayload(content) - if err != nil { - node.log.Error("Read message payload failed", "err", err, "node", node) - return - } - - switch msgCategory { - case proto.Identity: - actionType := proto_identity.IDMessageType(msgType) - switch actionType { - case proto_identity.Identity: - messageType := proto_identity.MessageType(msgPayload[0]) - switch messageType { - case proto_identity.Register: - fmt.Println("received a identity message") - // TODO(ak): fix it. - // node.processPOWMessage(msgPayload) - node.log.Info("NET: received message: IDENTITY/REGISTER") - default: - node.log.Error("Announce message should be sent to IdentityChain") - } - } - case proto.Consensus: - actionType := consensus.ConMessageType(msgType) - switch actionType { - case consensus.Consensus: - if consensusObj.IsLeader { - node.log.Info("NET: received message: Consensus/Leader") - consensusObj.ProcessMessageLeader(msgPayload) - } else { - node.log.Info("NET: received message: Consensus/Validator") - consensusObj.ProcessMessageValidator(msgPayload) - } - } - case proto.Node: - actionType := proto_node.MessageType(msgType) - switch actionType { - case proto_node.Transaction: - node.log.Info("NET: received message: Node/Transaction") - node.transactionMessageHandler(msgPayload) - case proto_node.Block: - node.log.Info("NET: received message: Node/Block") - blockMsgType := proto_node.BlockMessageType(msgPayload[0]) - switch blockMsgType { - case proto_node.Sync: - decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type - blocks := new([]*blockchain.Block) - decoder.Decode(blocks) - if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { - node.Client.UpdateBlocks(*blocks) - } - } - case proto_node.Client: - node.log.Info("NET: received message: Node/Client") - clientMsgType := proto_node.ClientMessageType(msgPayload[0]) - switch clientMsgType { - case proto_node.LookupUtxo: - decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type - - fetchUtxoMessage := new(proto_node.FetchUtxoMessage) - decoder.Decode(fetchUtxoMessage) - - utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) - - p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) - } - case proto_node.Control: - node.log.Info("NET: received message: Node/Control") - controlType := msgPayload[0] - if proto_node.ControlMessageType(controlType) == proto_node.STOP { - if node.Chain == nil { - node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) - - sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() - node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) - - avgBlockSizeInBytes := 0 - txCount := 0 - blockCount := 0 - totalTxCount := 0 - totalBlockCount := 0 - avgTxSize := 0 - - for _, block := range node.blockchain.Blocks { - if block.IsStateBlock() { - totalTxCount += int(block.State.NumTransactions) - totalBlockCount += int(block.State.NumBlocks) - } else { - byteBuffer := bytes.NewBuffer([]byte{}) - encoder := gob.NewEncoder(byteBuffer) - encoder.Encode(block) - avgBlockSizeInBytes += len(byteBuffer.Bytes()) - - txCount += len(block.Transactions) - blockCount++ - totalTxCount += len(block.TransactionIds) - totalBlockCount++ - - byteBuffer = bytes.NewBuffer([]byte{}) - encoder = gob.NewEncoder(byteBuffer) - encoder.Encode(block.Transactions) - avgTxSize += len(byteBuffer.Bytes()) - } - } - if blockCount != 0 { - avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount - avgTxSize = avgTxSize / txCount - } - - node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) - } else { - node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) - } - - os.Exit(0) - } - case proto_node.PING: - node.pingMessageHandler(msgPayload) - case proto_node.PONG: - node.pongMessageHandler(msgPayload) - } - case proto.Client: - actionType := client.MessageType(msgType) - node.log.Info("NET: received message: Client/Transaction") - switch actionType { - case client.Transaction: - if node.Client != nil { - node.Client.TransactionMessageHandler(msgPayload) - } - } - default: - node.log.Error("Unknown", "MsgCateory:", msgCategory) - } -} - func (node *Node) transactionMessageHandler(msgPayload []byte) { txMessageType := proto_node.TransactionMessageType(msgPayload[0]) diff --git a/node/node_handler_p2p.go b/node/node_handler_p2p.go new file mode 100644 index 000000000..a31b2b57d --- /dev/null +++ b/node/node_handler_p2p.go @@ -0,0 +1,183 @@ +package node + +import ( + "bytes" + "encoding/gob" + "fmt" + "os" + + "github.com/harmony-one/harmony/blockchain" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2pv2" + "github.com/harmony-one/harmony/proto" + "github.com/harmony-one/harmony/proto/client" + "github.com/harmony-one/harmony/proto/consensus" + proto_identity "github.com/harmony-one/harmony/proto/identity" + proto_node "github.com/harmony-one/harmony/proto/node" + netp2p "github.com/libp2p/go-libp2p-net" +) + +// NodeHandlerV1 handles a new incoming connection. +func (node *Node) NodeHandlerV1(s netp2p.Stream) { + defer s.Close() + + // Read p2p message payload + content, err := p2pv2.ReadData(s) + + if err != nil { + node.log.Error("Read p2p data failed", "err", err, "node", node) + return + } + // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. + node.MaybeBroadcastAsValidator(content) + + consensusObj := node.Consensus + + msgCategory, err := proto.GetMessageCategory(content) + if err != nil { + node.log.Error("Read node type failed", "err", err, "node", node) + return + } + + msgType, err := proto.GetMessageType(content) + if err != nil { + node.log.Error("Read action type failed", "err", err, "node", node) + return + } + + msgPayload, err := proto.GetMessagePayload(content) + if err != nil { + node.log.Error("Read message payload failed", "err", err, "node", node) + return + } + + switch msgCategory { + case proto.Identity: + actionType := proto_identity.IDMessageType(msgType) + switch actionType { + case proto_identity.Identity: + messageType := proto_identity.MessageType(msgPayload[0]) + switch messageType { + case proto_identity.Register: + fmt.Println("received a identity message") + // TODO(ak): fix it. + // node.processPOWMessage(msgPayload) + node.log.Info("NET: received message: IDENTITY/REGISTER") + default: + node.log.Error("Announce message should be sent to IdentityChain") + } + } + case proto.Consensus: + actionType := consensus.ConMessageType(msgType) + switch actionType { + case consensus.Consensus: + if consensusObj.IsLeader { + node.log.Info("NET: received message: Consensus/Leader") + consensusObj.ProcessMessageLeader(msgPayload) + } else { + node.log.Info("NET: received message: Consensus/Validator") + consensusObj.ProcessMessageValidator(msgPayload) + } + } + case proto.Node: + actionType := proto_node.MessageType(msgType) + switch actionType { + case proto_node.Transaction: + node.log.Info("NET: received message: Node/Transaction") + node.transactionMessageHandler(msgPayload) + case proto_node.Block: + node.log.Info("NET: received message: Node/Block") + blockMsgType := proto_node.BlockMessageType(msgPayload[0]) + switch blockMsgType { + case proto_node.Sync: + decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type + blocks := new([]*blockchain.Block) + decoder.Decode(blocks) + if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { + node.Client.UpdateBlocks(*blocks) + } + } + case proto_node.Client: + node.log.Info("NET: received message: Node/Client") + clientMsgType := proto_node.ClientMessageType(msgPayload[0]) + switch clientMsgType { + case proto_node.LookupUtxo: + decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type + + fetchUtxoMessage := new(proto_node.FetchUtxoMessage) + decoder.Decode(fetchUtxoMessage) + + utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) + + p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) + } + case proto_node.Control: + node.log.Info("NET: received message: Node/Control") + controlType := msgPayload[0] + if proto_node.ControlMessageType(controlType) == proto_node.STOP { + if node.Chain == nil { + node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) + + sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() + node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) + + avgBlockSizeInBytes := 0 + txCount := 0 + blockCount := 0 + totalTxCount := 0 + totalBlockCount := 0 + avgTxSize := 0 + + for _, block := range node.blockchain.Blocks { + if block.IsStateBlock() { + totalTxCount += int(block.State.NumTransactions) + totalBlockCount += int(block.State.NumBlocks) + } else { + byteBuffer := bytes.NewBuffer([]byte{}) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(block) + avgBlockSizeInBytes += len(byteBuffer.Bytes()) + + txCount += len(block.Transactions) + blockCount++ + totalTxCount += len(block.TransactionIds) + totalBlockCount++ + + byteBuffer = bytes.NewBuffer([]byte{}) + encoder = gob.NewEncoder(byteBuffer) + encoder.Encode(block.Transactions) + avgTxSize += len(byteBuffer.Bytes()) + } + } + if blockCount != 0 { + avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount + avgTxSize = avgTxSize / txCount + } + + node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) + } else { + node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) + } + + os.Exit(0) + } + case proto_node.PING: + // Leader receives PING from new node. + node.pingMessageHandler(msgPayload) + case proto_node.PONG: + // The new node receives back from leader. + node.pongMessageHandler(msgPayload) + } + case proto.Client: + actionType := client.MessageType(msgType) + node.log.Info("NET: received message: Client/Transaction") + switch actionType { + case client.Transaction: + if node.Client != nil { + node.Client.TransactionMessageHandler(msgPayload) + } + } + default: + node.log.Error("Unknown", "MsgCateory:", msgCategory) + } +}