From 64230da6ee5bb2af614c59f33c7bc94ef8791f53 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Fri, 7 Dec 2018 01:50:42 -0800 Subject: [PATCH] abstract out 2 versions of p2p --- beaconchain/beaconchain.go | 28 +--- beaconchain/beaconchain_handler.go | 7 +- benchmark.go | 13 +- client/client.go | 10 +- client/txgen/main.go | 42 +++-- client/wallet/main.go | 12 +- client/wallet_v2/main.go | 12 +- consensus/consensus.go | 13 +- consensus/consensus_leader.go | 7 +- consensus/consensus_leader_msg_test.go | 9 +- consensus/consensus_test.go | 4 +- consensus/consensus_validator.go | 7 +- consensus/consensus_validator_msg_test.go | 8 +- newnode/newnode.go | 78 ++------- newnode/newnode_handler.go | 12 +- node/node.go | 56 ++----- node/node_handler.go | 24 +-- node/node_handler_p2p.go | 183 ---------------------- node/node_test.go | 37 +++-- node/p2p.go | 21 +++ p2p/helper.go | 31 +--- p2p/helper_test.go | 55 ------- p2p/host/host.go | 13 ++ p2p/host/hostv1/hostv1.go | 99 ++++++++++++ p2p/host/hostv2/hostv2.go | 94 +++++++++++ {p2pv2 => p2p/host/hostv2}/util.go | 2 +- p2p/{peer.go => host/message.go} | 130 +++++---------- p2p/p2p.go | 28 ++++ p2p/p2pimpl/p2pimpl.go | 25 +++ p2pv2/host.go | 146 ----------------- runbeacon/run-beacon.go | 4 + 31 files changed, 483 insertions(+), 727 deletions(-) delete mode 100644 node/node_handler_p2p.go create mode 100644 node/p2p.go delete mode 100644 p2p/helper_test.go create mode 100644 p2p/host/host.go create mode 100644 p2p/host/hostv1/hostv1.go create mode 100644 p2p/host/hostv2/hostv2.go rename {p2pv2 => p2p/host/hostv2}/util.go (97%) rename p2p/{peer.go => host/message.go} (51%) create mode 100644 p2p/p2p.go create mode 100644 p2p/p2pimpl/p2pimpl.go delete mode 100644 p2pv2/host.go diff --git a/beaconchain/beaconchain.go b/beaconchain/beaconchain.go index c2554484a..2aca600ac 100644 --- a/beaconchain/beaconchain.go +++ b/beaconchain/beaconchain.go @@ -2,14 +2,14 @@ package beaconchain import ( "math/rand" - "net" - "os" "sync" "github.com/dedis/kyber" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" + "github.com/harmony-one/harmony/p2p/p2pimpl" "github.com/harmony-one/harmony/proto/bcconn" proto_identity "github.com/harmony-one/harmony/proto/identity" "github.com/harmony-one/harmony/utils" @@ -28,6 +28,7 @@ type BeaconChain struct { NumberOfNodesAdded int IP string Port string + host host.Host } //New beaconchain initialization @@ -39,6 +40,7 @@ func New(numShards int, ip, port string) *BeaconChain { bc.NumberOfNodesAdded = 0 bc.Port = port bc.IP = ip + bc.host = p2pimpl.NewHost(p2p.Peer{IP: ip, Port: port}) return &bc } @@ -61,28 +63,10 @@ func (bc *BeaconChain) AcceptConnections(b []byte) { response := bcconn.ResponseRandomNumber{NumberOfShards: bc.NumberOfShards, NumberOfNodesAdded: bc.NumberOfNodesAdded, Leaders: bc.Leaders} msg := bcconn.SerializeRandomInfo(response) msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg) - p2p.SendMessage(Node.Self, msgToSend) + host.SendMessage(bc.host, Node.Self, msgToSend) } //StartServer a server and process the request by a handler. func (bc *BeaconChain) StartServer() { - bc.log.Info("Starting Beaconchain server ...") - ip := bc.IP - port := bc.Port - addr := net.JoinHostPort(ip, port) - listen, err := net.Listen("tcp", addr) - if err != nil { - bc.log.Crit("Socket listen port failed") - os.Exit(1) - } - defer listen.Close() - for { - bc.log.Info("beacon chain is now listening ..") - conn, err := listen.Accept() - if err != nil { - bc.log.Crit("Error listening on port. Exiting", "8081") - continue - } - go bc.BeaconChainHandler(conn) - } + bc.host.BindHandlerAndServe(bc.BeaconChainHandler) } diff --git a/beaconchain/beaconchain_handler.go b/beaconchain/beaconchain_handler.go index 351e39bf4..af1bc47e1 100644 --- a/beaconchain/beaconchain_handler.go +++ b/beaconchain/beaconchain_handler.go @@ -1,21 +1,18 @@ package beaconchain import ( - "net" - "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/proto" proto_identity "github.com/harmony-one/harmony/proto/identity" ) // BeaconChainHandler handles registration of new Identities -func (bc *BeaconChain) BeaconChainHandler(conn net.Conn) { - content, err := p2p.ReadMessageContent(conn) +func (bc *BeaconChain) BeaconChainHandler(s p2p.Stream) { + content, err := p2p.ReadMessageContent(s) if err != nil { bc.log.Error("Read p2p data failed") return } - bc.log.Info("received connection", "connectionIp", conn.RemoteAddr()) msgCategory, err := proto.GetMessageCategory(content) if err != nil { bc.log.Error("Read message category failed", "err", err) diff --git a/benchmark.go b/benchmark.go index 4e6a52049..9d591dc92 100644 --- a/benchmark.go +++ b/benchmark.go @@ -9,6 +9,8 @@ import ( "runtime" "time" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/harmony-one/harmony/attack" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/db" @@ -117,13 +119,11 @@ func main() { if *peerDiscovery { candidateNode := pkg_newnode.New(*ip, *port) BCPeer := p2p.Peer{IP: *idcIP, Port: *idcPort} - service := candidateNode.NewService(*ip, *port) - candidateNode.ConnectBeaconChain(BCPeer) + candidateNode.ContactBeaconChain(BCPeer) shardID = candidateNode.GetShardID() leader = candidateNode.GetLeader() selfPeer = candidateNode.GetSelfPeer() clientPeer = candidateNode.GetClientPeer() - service.Stop() selfPeer.PubKey = candidateNode.PubK } else { @@ -162,8 +162,9 @@ func main() { ldb, _ = InitLDBDatabase(*ip, *port) } + host := p2pimpl.NewHost(selfPeer) // Consensus object. - consensus := consensus.New(selfPeer, shardID, peers, leader) + consensus := consensus.New(host, shardID, peers, leader) consensus.MinPeers = *minPeers // Start Profiler for leader if profile argument is on @@ -178,9 +179,7 @@ func main() { // Set logger to attack model. attack.GetInstance().SetLogger(consensus.Log) // Current node. - currentNode := node.New(consensus, ldb, selfPeer) - // Add self peer. - currentNode.SelfPeer = selfPeer + currentNode := node.New(host, consensus, ldb) // If there is a client configured in the node list. if clientPeer != nil { currentNode.ClientPeer = clientPeer diff --git a/client/client.go b/client/client.go index 86c93468b..b6088f89d 100644 --- a/client/client.go +++ b/client/client.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "sync" + "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/blockchain" @@ -23,6 +24,9 @@ type Client struct { ShardUtxoMap map[uint32]blockchain.UtxoMap ShardUtxoMapMutex sync.Mutex // Mutex for the UTXO maps log log.Logger // Log utility + + // The p2p host used to send/receive p2p messages + host host.Host } // TransactionMessageHandler is the message handler for Client/Transaction messages. @@ -120,16 +124,16 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) { for shardID, txs := range BuildOutputShardTransactionMap(txsToSend) { - p2p.SendMessage((*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs)) + host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs)) } } // NewClient creates a new Client -func NewClient(leaders *map[uint32]p2p.Peer) *Client { +func NewClient(host host.Host, leaders *map[uint32]p2p.Peer) *Client { client := Client{} client.PendingCrossTxs = make(map[[32]byte]*blockchain.Transaction) client.Leaders = leaders - + client.host = host // Logger client.log = log.New() return &client diff --git a/client/txgen/main.go b/client/txgen/main.go index 5801e20bb..6ef0ace6e 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -21,6 +21,7 @@ import ( "github.com/harmony-one/harmony/newnode" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" proto_node "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/utils" ) @@ -70,21 +71,15 @@ func main() { var config *client_config.Config if *peerDiscovery { - candidateNode := newnode.New(*ip, *port) BCPeer := p2p.Peer{IP: *idcIP, Port: *idcPort} - service := candidateNode.NewService(*ip, *port) - candidateNode.ConnectBeaconChain(BCPeer) + candidateNode.ContactBeaconChain(BCPeer) peers = nil - - service.Stop() - clientPeer = &p2p.Peer{IP: *ip, Port: *port} _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) clientPeer.PubKey = pubKey shardIDLeaderMap = candidateNode.Leaders - } else { // Read the configs config = client_config.NewConfig() @@ -94,6 +89,9 @@ func main() { _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) clientPeer.PubKey = pubKey } + if clientPeer == nil { + panic("Client Peer is nil!") + } debugPrintShardIDLeaderMap(shardIDLeaderMap) // Do cross shard tx if there are more than one shard @@ -116,20 +114,20 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for shardID := range shardIDLeaderMap { - node := node.New(&consensus.Consensus{ShardID: shardID}, nil, *clientPeer) + _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) + clientPeer.PubKey = pubKey + host := p2pimpl.NewHost(*clientPeer) + node := node.New(host, &consensus.Consensus{ShardID: shardID}, nil) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.NumOfAddress) nodes = append(nodes, node) } // Client/txgenerator server node setup - if clientPeer == nil { - panic("Client Peer is nil!") - } - consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj, nil, *clientPeer) - - clientNode.Client = client.NewClient(&shardIDLeaderMap) + host := p2pimpl.NewHost(*clientPeer) + consensusObj := consensus.New(host, "0", nil, p2p.Peer{}) + clientNode := node.New(host, consensusObj, nil) + clientNode.Client = client.NewClient(clientNode.GetHost(), &shardIDLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders updateBlocksFunc := func(blocks []*blockchain.Block) { @@ -222,7 +220,7 @@ func main() { lock.Lock() for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards go func(shardID uint32, txs types.Transactions) { - SendTxsToLeaderAccount(shardIDLeaderMap[shardID], txs) + SendTxsToLeaderAccount(clientNode, shardIDLeaderMap[shardID], txs) }(shardID, txs) } lock.Unlock() @@ -275,7 +273,7 @@ func main() { lock.Lock() for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards go func(shardID uint32, txs []*blockchain.Transaction) { - SendTxsToLeader(shardIDLeaderMap[shardID], txs) + SendTxsToLeader(clientNode, shardIDLeaderMap[shardID], txs) }(shardID, txs) } lock.Unlock() @@ -292,22 +290,22 @@ func main() { } else { peers = append(config.GetValidators(), clientNode.Client.GetLeaders()...) } - p2p.BroadcastMessage(peers, msg) + clientNode.BroadcastMessage(peers, msg) time.Sleep(3000 * time.Millisecond) } // SendTxsToLeader sends txs to leader. -func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) { +func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs []*blockchain.Transaction) { log.Debug("[Generator] Sending txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessage(txs) - p2p.SendMessage(leader, msg) + clientNode.SendMessage(leader, msg) } // SendTxsToLeaderAccount sends txs to leader account. -func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) { +func SendTxsToLeaderAccount(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) { log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessageAccount(txs) - p2p.SendMessage(leader, msg) + clientNode.SendMessage(leader, msg) } func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { diff --git a/client/wallet/main.go b/client/wallet/main.go index c7dffc57c..90261ee01 100644 --- a/client/wallet/main.go +++ b/client/wallet/main.go @@ -9,6 +9,8 @@ import ( "log" "strings" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "io" "io/ioutil" math_rand "math/rand" @@ -257,9 +259,9 @@ func CreateWalletServerNode() *node.Node { shardIDLeaderMap = getShardIDToLeaderMap() clientPeer = &p2p.Peer{Port: "127.0.0.1", IP: "1234"} } - walletNode := node.New(nil, nil, *clientPeer) // TODO(ricl): shouldn't the selfPeer for client being clientPeer?? - walletNode.Client = client.NewClient(&shardIDLeaderMap) - walletNode.ClientPeer = clientPeer + host := p2pimpl.NewHost(*clientPeer) + walletNode := node.New(host, nil, nil) + walletNode.Client = client.NewClient(walletNode.GetHost(), &shardIDLeaderMap) return walletNode } @@ -272,7 +274,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } msg := proto_node.ConstructTransactionListMessage([]*blockchain.Transaction{&tx}) - p2p.BroadcastMessage(walletNode.Client.GetLeaders(), msg) + walletNode.BroadcastMessage(walletNode.Client.GetLeaders(), msg) doneSignal := make(chan int) go func() { @@ -297,7 +299,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) - p2p.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses)) + walletNode.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses)) doneSignal := make(chan int) go func() { diff --git a/client/wallet_v2/main.go b/client/wallet_v2/main.go index 31aea0a98..3e1ff68be 100644 --- a/client/wallet_v2/main.go +++ b/client/wallet_v2/main.go @@ -9,6 +9,8 @@ import ( "log" "strings" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "io" "io/ioutil" math_rand "math/rand" @@ -257,9 +259,9 @@ func CreateWalletServerNode() *node.Node { shardIDLeaderMap = getShardIDToLeaderMap() clientPeer = &p2p.Peer{Port: "127.0.0.1", IP: "1234"} } - walletNode := node.New(nil, nil, *clientPeer) - walletNode.Client = client.NewClient(&shardIDLeaderMap) - walletNode.ClientPeer = clientPeer + host := p2pimpl.NewHost(*clientPeer) + walletNode := node.New(host, nil, nil) + walletNode.Client = client.NewClient(walletNode.GetHost(), &shardIDLeaderMap) return walletNode } @@ -272,7 +274,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } msg := proto_node.ConstructTransactionListMessage([]*blockchain.Transaction{&tx}) - p2p.BroadcastMessage(walletNode.Client.GetLeaders(), msg) + walletNode.BroadcastMessage(walletNode.Client.GetLeaders(), msg) doneSignal := make(chan int) go func() { @@ -297,7 +299,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) - p2p.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses)) + walletNode.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses)) doneSignal := make(chan int) go func() { diff --git a/consensus/consensus.go b/consensus/consensus.go index bfea5de2a..dce293105 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -19,6 +19,7 @@ import ( "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/utils" ) @@ -95,6 +96,9 @@ type Consensus struct { Log log.Logger uniqueIDInstance *utils.UniqueValidatorID + + // The p2p host used to send/receive p2p messages + host host.Host } // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far @@ -109,9 +113,11 @@ type BlockConsensusStatus struct { } // New creates a new Consensus object -func New(selfPeer p2p.Peer, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { +func New(host host.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { consensus := Consensus{} + consensus.host = host + selfPeer := host.GetSelfPeer() if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { consensus.IsLeader = true } else { @@ -411,3 +417,8 @@ func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header func (consensus *Consensus) GetNodeID() uint16 { return consensus.nodeID } + +// SendMessage sends message thru p2p host to peer. +func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) { + host.SendMessage(consensus.host, peer, message) +} diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 08651bceb..10e1e9341 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/profiler" @@ -133,7 +134,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { consensus.Log.Debug("Stop encoding block") msgToSend := consensus.constructAnnounceMessage() - p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend) // Set state to AnnounceDone consensus.state = AnnounceDone consensus.commitByLeader(true) @@ -260,7 +261,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta consensus.responseByLeader(challengeScalar, targetState == ChallengeDone) // Broadcast challenge message - p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend) // Set state to targetState (ChallengeDone or FinalChallengeDone) consensus.state = targetState @@ -417,7 +418,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S // Start the second round of Cosi msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap) - p2p.BroadcastMessageFromLeader(consensus.GetValidatorPeers(), msgToSend) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend) consensus.commitByLeader(false) } else { consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses)) diff --git a/consensus/consensus_leader_msg_test.go b/consensus/consensus_leader_msg_test.go index b04830172..e606ecb51 100644 --- a/consensus/consensus_leader_msg_test.go +++ b/consensus/consensus_leader_msg_test.go @@ -3,6 +3,8 @@ package consensus import ( "testing" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/p2p" @@ -12,7 +14,8 @@ import ( func TestConstructAnnounceMessage(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} header := consensus.blockHeader msg := consensus.constructAnnounceMessage() @@ -34,8 +37,8 @@ func TestConstructChallengeMessage(test *testing.T) { validatorPriKey.UnmarshalBinary(priKeyInBytes[:]) validatorPubKey := pki.GetPublicKeyFromScalar(leaderPriKey) validator := p2p.Peer{IP: "3", Port: "5", PubKey: validatorPubKey} - - consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} (*consensus.commitments)[0] = leaderPubKey (*consensus.commitments)[1] = validatorPubKey diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index a84182815..b48454c7b 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -4,12 +4,14 @@ import ( "testing" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" ) func TestNew(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) if consensus.consensusID != 0 { test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusID) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 5d6ca8346..6ebc71f2c 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -10,7 +10,6 @@ import ( "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/log" - "github.com/harmony-one/harmony/p2p" proto_consensus "github.com/harmony-one/harmony/proto/consensus" "github.com/harmony-one/harmony/utils" ) @@ -130,7 +129,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { // Store the commitment secret consensus.secret[consensusID] = secret - p2p.SendMessage(consensus.leader, msgToSend) + consensus.SendMessage(consensus.leader, msgToSend) // consensus.Log.Warn("Sending Commit to leader", "state", targetState) // Set state to CommitDone @@ -249,7 +248,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState } msgToSend := consensus.constructResponseMessage(msgTypeToSend, response) - p2p.SendMessage(consensus.leader, msgToSend) + consensus.SendMessage(consensus.leader, msgToSend) // consensus.Log.Warn("Sending Response to leader", "state", targetState) // Set state to target state (ResponseDone, FinalResponseDone) consensus.state = targetState @@ -367,7 +366,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { // Store the commitment secret consensus.secret[consensusID] = secret - p2p.SendMessage(consensus.leader, msgToSend) + consensus.SendMessage(consensus.leader, msgToSend) // Set state to CommitDone consensus.state = FinalCommitDone diff --git a/consensus/consensus_validator_msg_test.go b/consensus/consensus_validator_msg_test.go index a96352462..032378619 100644 --- a/consensus/consensus_validator_msg_test.go +++ b/consensus/consensus_validator_msg_test.go @@ -3,6 +3,8 @@ package consensus import ( "testing" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/p2p" consensus_proto "github.com/harmony-one/harmony/proto/consensus" @@ -11,7 +13,8 @@ import ( func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} _, msg := consensus.constructCommitMessage(consensus_proto.Commit) @@ -23,7 +26,8 @@ func TestConstructCommitMessage(test *testing.T) { func TestConstructResponseMessage(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} msg := consensus.constructResponseMessage(consensus_proto.Response, crypto.Ed25519Curve.Scalar()) diff --git a/newnode/newnode.go b/newnode/newnode.go index fd1802998..b584544fd 100644 --- a/newnode/newnode.go +++ b/newnode/newnode.go @@ -2,27 +2,22 @@ package newnode import ( "fmt" - "net" "os" "strconv" - "sync" "time" + "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/dedis/kyber" "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/proto/bcconn" proto_identity "github.com/harmony-one/harmony/proto/identity" "github.com/harmony-one/harmony/utils" ) -// Service is the server for listening. -type Service struct { - ch chan bool - waitGroup *sync.WaitGroup -} - //NewNode is ther struct for a candidate node type NewNode struct { Role string @@ -36,7 +31,7 @@ type NewNode struct { priK kyber.Scalar log log.Logger SetInfo bool - Service *Service + host host.Host } // New candidatenode initialization @@ -48,6 +43,7 @@ func New(ip string, port string) *NewNode { node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} node.log = log.New() node.SetInfo = false + node.host = p2pimpl.NewHost(node.Self) node.Leaders = map[uint32]p2p.Peer{} return &node } @@ -58,66 +54,18 @@ type registerResponseRandomNumber struct { Leaders []*bcconn.NodeInfo } -// NewService starts a newservice in the candidate node -func (node *NewNode) NewService(ip, port string) *Service { - laddr, err := net.ResolveTCPAddr("tcp", ip+":"+port) - if nil != err { - node.log.Crit("cannot resolve the tcp address of the new node", err) - } - listener, err := net.ListenTCP("tcp", laddr) - if nil != err { - node.log.Crit("cannot start a listener for new node", err) - } - node.log.Debug("listening on", "address", laddr.String()) - node.Service = &Service{ - ch: make(chan bool), - waitGroup: &sync.WaitGroup{}, - } - node.Service.waitGroup.Add(1) - go node.Serve(listener) - return node.Service -} - -// Serve Accept connections and spawn a goroutine to serve each one. Stop listening -// if anything is received on the service's channel. -func (node *NewNode) Serve(listener *net.TCPListener) { - defer node.Service.waitGroup.Done() - for { - select { - case <-node.Service.ch: - node.log.Debug("stopping listening on", "address", listener.Addr()) - listener.Close() - node.log.Debug("stopped listening") - return - default: - } - listener.SetDeadline(time.Now().Add(1e9)) // This deadline is for 1 second to accept new connections. - conn, err := listener.AcceptTCP() - if nil != err { - if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { - continue - } - node.log.Error(err.Error()) - - } - node.Service.waitGroup.Add(1) - go node.NodeHandler(conn) - } -} - -// Stop the service by closing the service's channel. Block until the service -// is really stopped. -func (s *Service) Stop() { - close(s.ch) - s.waitGroup.Wait() +// ContactBeaconChain starts a newservice in the candidate node +func (node *NewNode) ContactBeaconChain(BCPeer p2p.Peer) { + go node.host.BindHandlerAndServe(node.NodeHandler) + node.requestBeaconChain(BCPeer) } func (node NewNode) String() string { - return fmt.Sprintf("idc: %v:%v and node infi %v", node.Self.IP, node.Self.Port, node.SetInfo) + return fmt.Sprintf("idc: %v:%v and node info %v", node.Self.IP, node.Self.Port, node.SetInfo) } -// ConnectBeaconChain connects to beacon chain -func (node *NewNode) ConnectBeaconChain(BCPeer p2p.Peer) { +// RequestBeaconChain requests beacon chain for identity data +func (node *NewNode) requestBeaconChain(BCPeer p2p.Peer) { node.log.Info("connecting to beacon chain now ...") pubk, err := node.PubK.MarshalBinary() if err != nil { @@ -141,7 +89,7 @@ checkLoop: gotShardInfo = true break checkLoop } else { - p2p.SendMessage(BCPeer, msgToSend) + host.SendMessage(node.host, BCPeer, msgToSend) } } } diff --git a/newnode/newnode_handler.go b/newnode/newnode_handler.go index 3a9fe695c..9cbc48239 100644 --- a/newnode/newnode_handler.go +++ b/newnode/newnode_handler.go @@ -1,7 +1,7 @@ package newnode import ( - "net" + "time" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/proto" @@ -9,11 +9,11 @@ import ( ) // NodeHandler handles a new incoming connection. -func (node *NewNode) NodeHandler(conn net.Conn) { - defer conn.Close() - defer node.Service.waitGroup.Done() - content, err := p2p.ReadMessageContent(conn) - +func (node *NewNode) NodeHandler(s p2p.Stream) { + defer s.Close() + defer node.host.Close() + s.SetReadDeadline(time.Now().Add(1 * time.Second)) // This deadline is for 1 second to accept new connections. + content, err := p2p.ReadMessageContent(s) if err != nil { node.log.Error("Read p2p data failed", "err", err, "node", node) return diff --git a/node/node.go b/node/node.go index 02ab4f15a..e08b351ce 100644 --- a/node/node.go +++ b/node/node.go @@ -7,7 +7,6 @@ import ( "fmt" "math/big" "math/rand" - "net" "os" "strconv" "strings" @@ -28,7 +27,7 @@ import ( "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2pv2" + "github.com/harmony-one/harmony/p2p/host" proto_node "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/syncing" "github.com/harmony-one/harmony/syncing/downloader" @@ -106,6 +105,8 @@ type Node struct { // Test only TestBankKeys []*ecdsa.PrivateKey + // The p2p host used to send/receive p2p messages + host host.Host // Channel to stop sending ping message StopPing chan int @@ -164,15 +165,7 @@ func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transa // StartServer starts a server and process the request by a handler. func (node *Node) StartServer() { - if p2p.Version == 1 { - node.log.Debug("Starting server", "node", node, "ip", node.SelfPeer.IP, "port", node.SelfPeer.Port) - node.listenOnPort(node.SelfPeer.Port) - } else { - p2pv2.InitHost(node.SelfPeer.IP, node.SelfPeer.Port) - p2pv2.BindHandler(node.NodeHandlerV1) - // Hang forever - <-make(chan struct{}) - } + node.host.BindHandlerAndServe(node.StreamHandler) } // SetLog sets log for Node. @@ -181,32 +174,6 @@ func (node *Node) SetLog() *Node { return node } -// Version 0 p2p. Going to be deprecated. -func (node *Node) listenOnPort(port string) { - addr := net.JoinHostPort("", port) - listen, err := net.Listen("tcp4", addr) - if err != nil { - node.log.Error("Socket listen port failed", "addr", addr, "err", err) - return - } - if listen == nil { - node.log.Error("Listen returned nil", "addr", addr) - return - } - defer listen.Close() - backoff := p2p.NewExpBackoff(250*time.Millisecond, 15*time.Second, 2.0) - for { - conn, err := listen.Accept() - if err != nil { - node.log.Error("Error listening on port.", "port", port, - "err", err) - backoff.Sleep() - continue - } - go node.NodeHandler(conn) - } -} - func (node *Node) String() string { return node.Consensus.String() } @@ -261,10 +228,15 @@ func DeserializeNode(d []byte) *NetworkNode { } // New creates a new node. -func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node { +func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node := Node{} - if consensus != nil { + if host != nil { + node.host = host + node.SelfPeer = host.GetSelfPeer() + } + + if host != nil && consensus != nil { // Consensus and associated channel to communicate blocks node.Consensus = consensus node.BlockChannel = make(chan blockchain.Block) @@ -316,11 +288,9 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node node.Chain = chain node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.BlockChannelAccount = make(chan *types.Block) - node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(selfPeer.PubKey)) + node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) } - node.SelfPeer = selfPeer - // Logger node.log = log.New() if consensus.IsLeader { @@ -425,7 +395,7 @@ func (node *Node) JoinShard(leader p2p.Peer) { buffer := ping.ConstructPingMessage() // Talk to leader. - p2p.SendMessage(leader, buffer) + node.SendMessage(leader, buffer) case <-timeout.C: node.log.Info("JoinShard timeout") return diff --git a/node/node_handler.go b/node/node_handler.go index bb96fb7c5..a8e57987e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/gob" "fmt" - "net" "os" "strconv" "time" @@ -16,6 +15,7 @@ import ( hmy_crypto "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto/client" "github.com/harmony-one/harmony/proto/consensus" @@ -34,17 +34,17 @@ const ( // MaybeBroadcastAsValidator returns if the node is a validator node. func (node *Node) MaybeBroadcastAsValidator(content []byte) { - if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= p2p.MaxBroadCast { - go p2p.BroadcastMessageFromValidator(node.SelfPeer, node.Consensus.GetValidatorPeers(), content) + if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= host.MaxBroadCast { + go host.BroadcastMessageFromValidator(node.host, node.SelfPeer, node.Consensus.GetValidatorPeers(), content) } } -// NodeHandler handles a new incoming connection. -func (node *Node) NodeHandler(conn net.Conn) { - defer conn.Close() +// StreamHandler handles a new incoming connection. +func (node *Node) StreamHandler(s p2p.Stream) { + defer s.Close() // Read p2p message payload - content, err := p2p.ReadMessageContent(conn) + content, err := p2p.ReadMessageContent(s) if err != nil { node.log.Error("Read p2p data failed", "err", err, "node", node) @@ -137,7 +137,7 @@ func (node *Node) NodeHandler(conn net.Conn) { utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) - p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) + node.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) } case proto_node.Control: node.log.Info("NET: received message: Node/Control") @@ -204,7 +204,7 @@ func (node *Node) NodeHandler(conn net.Conn) { } } default: - node.log.Error("Unknown", "MsgCateory:", msgCategory) + node.log.Error("Unknown", "MsgCategory", msgCategory) } // Post processing after receiving messsages. @@ -385,7 +385,7 @@ func (node *Node) SendBackProofOfAcceptOrReject() { node.crossTxToReturnMutex.Unlock() node.log.Debug("SENDING PROOF TO CLIENT", "proofs", len(proofs)) - p2p.SendMessage(*node.ClientPeer, client.ConstructProofOfAcceptOrRejectMessage(proofs)) + node.SendMessage(*node.ClientPeer, client.ConstructProofOfAcceptOrRejectMessage(proofs)) } } @@ -394,7 +394,7 @@ func (node *Node) SendBackProofOfAcceptOrReject() { func (node *Node) BroadcastNewBlock(newBlock *blockchain.Block) { if node.ClientPeer != nil { node.log.Debug("NET: SENDING NEW BLOCK TO CLIENT") - p2p.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]blockchain.Block{*newBlock})) + node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]blockchain.Block{*newBlock})) } } @@ -566,7 +566,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { // Broadcast the message to all validators, as publicKeys is updated // FIXME: HAR-89 use a separate nodefind/neighbor message - p2p.BroadcastMessageFromLeader(peers, buffer) + host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer) return len(peers) } diff --git a/node/node_handler_p2p.go b/node/node_handler_p2p.go deleted file mode 100644 index a31b2b57d..000000000 --- a/node/node_handler_p2p.go +++ /dev/null @@ -1,183 +0,0 @@ -package node - -import ( - "bytes" - "encoding/gob" - "fmt" - "os" - - "github.com/harmony-one/harmony/blockchain" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2pv2" - "github.com/harmony-one/harmony/proto" - "github.com/harmony-one/harmony/proto/client" - "github.com/harmony-one/harmony/proto/consensus" - proto_identity "github.com/harmony-one/harmony/proto/identity" - proto_node "github.com/harmony-one/harmony/proto/node" - netp2p "github.com/libp2p/go-libp2p-net" -) - -// NodeHandlerV1 handles a new incoming connection. -func (node *Node) NodeHandlerV1(s netp2p.Stream) { - defer s.Close() - - // Read p2p message payload - content, err := p2pv2.ReadData(s) - - if err != nil { - node.log.Error("Read p2p data failed", "err", err, "node", node) - return - } - // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. - node.MaybeBroadcastAsValidator(content) - - consensusObj := node.Consensus - - msgCategory, err := proto.GetMessageCategory(content) - if err != nil { - node.log.Error("Read node type failed", "err", err, "node", node) - return - } - - msgType, err := proto.GetMessageType(content) - if err != nil { - node.log.Error("Read action type failed", "err", err, "node", node) - return - } - - msgPayload, err := proto.GetMessagePayload(content) - if err != nil { - node.log.Error("Read message payload failed", "err", err, "node", node) - return - } - - switch msgCategory { - case proto.Identity: - actionType := proto_identity.IDMessageType(msgType) - switch actionType { - case proto_identity.Identity: - messageType := proto_identity.MessageType(msgPayload[0]) - switch messageType { - case proto_identity.Register: - fmt.Println("received a identity message") - // TODO(ak): fix it. - // node.processPOWMessage(msgPayload) - node.log.Info("NET: received message: IDENTITY/REGISTER") - default: - node.log.Error("Announce message should be sent to IdentityChain") - } - } - case proto.Consensus: - actionType := consensus.ConMessageType(msgType) - switch actionType { - case consensus.Consensus: - if consensusObj.IsLeader { - node.log.Info("NET: received message: Consensus/Leader") - consensusObj.ProcessMessageLeader(msgPayload) - } else { - node.log.Info("NET: received message: Consensus/Validator") - consensusObj.ProcessMessageValidator(msgPayload) - } - } - case proto.Node: - actionType := proto_node.MessageType(msgType) - switch actionType { - case proto_node.Transaction: - node.log.Info("NET: received message: Node/Transaction") - node.transactionMessageHandler(msgPayload) - case proto_node.Block: - node.log.Info("NET: received message: Node/Block") - blockMsgType := proto_node.BlockMessageType(msgPayload[0]) - switch blockMsgType { - case proto_node.Sync: - decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type - blocks := new([]*blockchain.Block) - decoder.Decode(blocks) - if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { - node.Client.UpdateBlocks(*blocks) - } - } - case proto_node.Client: - node.log.Info("NET: received message: Node/Client") - clientMsgType := proto_node.ClientMessageType(msgPayload[0]) - switch clientMsgType { - case proto_node.LookupUtxo: - decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type - - fetchUtxoMessage := new(proto_node.FetchUtxoMessage) - decoder.Decode(fetchUtxoMessage) - - utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) - - p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) - } - case proto_node.Control: - node.log.Info("NET: received message: Node/Control") - controlType := msgPayload[0] - if proto_node.ControlMessageType(controlType) == proto_node.STOP { - if node.Chain == nil { - node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) - - sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() - node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) - - avgBlockSizeInBytes := 0 - txCount := 0 - blockCount := 0 - totalTxCount := 0 - totalBlockCount := 0 - avgTxSize := 0 - - for _, block := range node.blockchain.Blocks { - if block.IsStateBlock() { - totalTxCount += int(block.State.NumTransactions) - totalBlockCount += int(block.State.NumBlocks) - } else { - byteBuffer := bytes.NewBuffer([]byte{}) - encoder := gob.NewEncoder(byteBuffer) - encoder.Encode(block) - avgBlockSizeInBytes += len(byteBuffer.Bytes()) - - txCount += len(block.Transactions) - blockCount++ - totalTxCount += len(block.TransactionIds) - totalBlockCount++ - - byteBuffer = bytes.NewBuffer([]byte{}) - encoder = gob.NewEncoder(byteBuffer) - encoder.Encode(block.Transactions) - avgTxSize += len(byteBuffer.Bytes()) - } - } - if blockCount != 0 { - avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount - avgTxSize = avgTxSize / txCount - } - - node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) - } else { - node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) - } - - os.Exit(0) - } - case proto_node.PING: - // Leader receives PING from new node. - node.pingMessageHandler(msgPayload) - case proto_node.PONG: - // The new node receives back from leader. - node.pongMessageHandler(msgPayload) - } - case proto.Client: - actionType := client.MessageType(msgType) - node.log.Info("NET: received message: Client/Transaction") - switch actionType { - case client.Transaction: - if node.Client != nil { - node.Client.TransactionMessageHandler(msgPayload) - } - } - default: - node.log.Error("Unknown", "MsgCateory:", msgCategory) - } -} diff --git a/node/node_test.go b/node/node_test.go index 1f33b9720..eb8c485e6 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" proto_node "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/utils" ) @@ -18,9 +19,9 @@ func TestNewNewNode(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - - node := New(consensus, nil, leader) + host := p2pimpl.NewHost(leader) + consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) + node := New(host, consensus, nil) if node.Consensus == nil { t.Error("Consensus is not initialized for the node") } @@ -46,9 +47,10 @@ func TestCountNumTransactionsInBlockchain(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil, leader) + node := New(host, consensus, nil) node.AddTestingAddresses(1000) if node.countNumTransactionsInBlockchain() != 1001 { t.Error("Count of transactions in the blockchain is incorrect") @@ -59,9 +61,10 @@ func TestGetSyncingPeers(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil, leader) + node := New(host, consensus, nil) peer := p2p.Peer{IP: "1.1.1.1", Port: "2000"} peer2 := p2p.Peer{IP: "2.1.1.1", Port: "2000"} node.Neighbors.Store("minh", peer) @@ -101,9 +104,10 @@ func TestAddPeers(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) + host := p2pimpl.NewHost(leader) + consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil, leader) + node := New(host, consensus, nil) r1 := node.AddPeers(peers1) e1 := 2 if r1 != e1 { @@ -116,7 +120,7 @@ func TestAddPeers(t *testing.T) { } } -func sendPingMessage(leader p2p.Peer) { +func sendPingMessage(node *Node, leader p2p.Peer) { priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) pubKey1 := pki.GetPublicKeyFromScalar(priKey1) @@ -132,11 +136,11 @@ func sendPingMessage(leader p2p.Peer) { fmt.Println("waiting for 5 seconds ...") time.Sleep(5 * time.Second) - p2p.SendMessage(leader, buf1) + node.SendMessage(leader, buf1) fmt.Println("sent ping message ...") } -func sendPongMessage(leader p2p.Peer) { +func sendPongMessage(node *Node, leader p2p.Peer) { priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333)) pubKey1 := pki.GetPublicKeyFromScalar(priKey1) p1 := p2p.Peer{ @@ -158,7 +162,7 @@ func sendPongMessage(leader p2p.Peer) { fmt.Println("waiting for 10 seconds ...") time.Sleep(10 * time.Second) - p2p.SendMessage(leader, buf1) + node.SendMessage(leader, buf1) fmt.Println("sent pong message ...") } @@ -173,10 +177,11 @@ func TestPingPongHandler(test *testing.T) { _, pubKey := utils.GenKey("127.0.0.1", "8881") leader := p2p.Peer{IP: "127.0.0.1", Port: "8881", PubKey: pubKey} // validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"} - consensus := consensus.New(leader, "0", []p2p.Peer{leader}, leader) - node := New(consensus, nil, leader) + host := p2pimpl.NewHost(leader) + consensus := consensus.New(host, "0", []p2p.Peer{leader}, leader) + node := New(host, consensus, nil) //go sendPingMessage(leader) - go sendPongMessage(leader) + go sendPongMessage(node, leader) go exitServer() node.StartServer() } diff --git a/node/p2p.go b/node/p2p.go new file mode 100644 index 000000000..836922c0b --- /dev/null +++ b/node/p2p.go @@ -0,0 +1,21 @@ +package node + +import ( + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" +) + +// SendMessage sends data to ip, port +func (node *Node) SendMessage(p p2p.Peer, data []byte) { + host.SendMessage(node.host, p, data) +} + +// BroadcastMessage broadcasts message to peers +func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) { + host.BroadcastMessage(node.host, peers, data) +} + +// GetHost returns the p2p host +func (node *Node) GetHost() host.Host { + return node.host +} diff --git a/p2p/helper.go b/p2p/helper.go index 2be326afd..e3619e68d 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "io" "log" - "net" "time" ) @@ -28,13 +27,13 @@ content (n bytes) - actual message content const BatchSizeInByte = 1 << 16 // ReadMessageContent reads the message type and content size, and return the actual content. -func ReadMessageContent(conn net.Conn) ([]byte, error) { +func ReadMessageContent(s Stream) ([]byte, error) { var ( contentBuf = bytes.NewBuffer([]byte{}) - r = bufio.NewReader(conn) + r = bufio.NewReader(s) ) timeoutDuration := 1 * time.Second - conn.SetReadDeadline(time.Now().Add(timeoutDuration)) + s.SetReadDeadline(time.Now().Add(timeoutDuration)) //// Read 1 byte for message type _, err := r.ReadByte() switch err { @@ -67,7 +66,7 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { ILOOP: for { timeoutDuration := 10 * time.Second - conn.SetReadDeadline(time.Now().Add(timeoutDuration)) + s.SetReadDeadline(time.Now().Add(timeoutDuration)) if bytesToRead < BatchSizeInByte { // Read the last number of bytes less than 1024 tmpBuf = make([]byte, bytesToRead) @@ -91,25 +90,3 @@ ILOOP: } return contentBuf.Bytes(), nil } - -// CreateMessage create a general message. FIXME: this is not used -func CreateMessage(msgType byte, data []byte) []byte { - buffer := bytes.NewBuffer([]byte{}) - - buffer.WriteByte(msgType) - - fourBytes := make([]byte, 4) - binary.BigEndian.PutUint32(fourBytes, uint32(len(data))) - buffer.Write(fourBytes) - - buffer.Write(data) - return buffer.Bytes() -} - -// SendMessageContent send message over net connection. FIXME: this is not used -func SendMessageContent(conn net.Conn, data []byte) { - msgToSend := CreateMessage(byte(1), data) - w := bufio.NewWriter(conn) - w.Write(msgToSend) - w.Flush() -} diff --git a/p2p/helper_test.go b/p2p/helper_test.go deleted file mode 100644 index c10187d80..000000000 --- a/p2p/helper_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package p2p_test - -import ( - "bufio" - "net" - "testing" - - "github.com/harmony-one/harmony/p2p" -) - -func setUpTestServer(times int, t *testing.T, conCreated chan struct{}) { - t.Parallel() - ln, _ := net.Listen("tcp", ":8081") - conCreated <- struct{}{} - conn, _ := ln.Accept() - defer conn.Close() - - var ( - w = bufio.NewWriter(conn) - ) - for times > 0 { - times-- - data, err := p2p.ReadMessageContent(conn) - if err != nil { - t.Fatalf("error when ReadMessageContent %v", err) - } - data = p2p.CreateMessage(byte(1), data) - w.Write(data) - w.Flush() - } -} -func TestNewNewNode(t *testing.T) { - times := 100 - conCreated := make(chan struct{}) - go setUpTestServer(times, t, conCreated) - <-conCreated - - conn, _ := net.Dial("tcp", "127.0.0.1:8081") - defer conn.Close() - - for times > 0 { - times-- - - myMsg := "minhdoan" - p2p.SendMessageContent(conn, []byte(myMsg)) - - data, err := p2p.ReadMessageContent(conn) - if err != nil { - t.Error("got an error when trying to receive an expected message from server.") - } - if string(data) != myMsg { - t.Error("did not receive expected message") - } - } -} diff --git a/p2p/host/host.go b/p2p/host/host.go new file mode 100644 index 000000000..c9755cd44 --- /dev/null +++ b/p2p/host/host.go @@ -0,0 +1,13 @@ +package host + +import ( + "github.com/harmony-one/harmony/p2p" +) + +// Host is the client + server in p2p network. +type Host interface { + GetSelfPeer() p2p.Peer + SendMessage(p2p.Peer, []byte) error + BindHandlerAndServe(handler p2p.StreamHandler) + Close() error +} diff --git a/p2p/host/hostv1/hostv1.go b/p2p/host/hostv1/hostv1.go new file mode 100644 index 000000000..f9976e6f0 --- /dev/null +++ b/p2p/host/hostv1/hostv1.go @@ -0,0 +1,99 @@ +package hostv1 + +import ( + "io" + "net" + "time" + + "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/p2p" +) + +// HostV1 is the version 1 p2p host, using direct socket call. +type HostV1 struct { + self p2p.Peer + listener net.Listener + quit chan bool +} + +// New creates a HostV1 +func New(self p2p.Peer) *HostV1 { + h := &HostV1{ + self: self, + quit: make(chan bool, 1), + } + return h +} + +// GetSelfPeer gets self peer +func (host *HostV1) GetSelfPeer() p2p.Peer { + return host.self +} + +// BindHandlerAndServe Version 0 p2p. Going to be deprecated. +func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { + port := host.self.Port + addr := net.JoinHostPort(host.self.IP, port) + var err error + host.listener, err = net.Listen("tcp4", addr) + if err != nil { + log.Error("Socket listen port failed", "addr", addr, "err", err) + return + } + if host.listener == nil { + log.Error("Listen returned nil", "addr", addr) + return + } + backoff := p2p.NewExpBackoff(250*time.Millisecond, 15*time.Second, 2.0) + for { // Keep listening + select { + case <-host.quit: + return + default: + { + conn, err := host.listener.Accept() + if err != nil { + log.Error("Error listening on port.", "port", port, + "err", err) + backoff.Sleep() + continue + } + // log.Debug("Received New connection", "local", conn.LocalAddr(), "remote", conn.RemoteAddr()) + go handler(conn) + } + } + } +} + +// SendMessage sends message to peer +func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { + addr := net.JoinHostPort(peer.IP, peer.Port) + conn, err := net.Dial("tcp", addr) + // log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr) + + if err != nil { + log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) + return + } + defer conn.Close() + + nw, err := conn.Write(message) + if err != nil { + log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) + return + } + if nw < len(message) { + log.Warn("Write() returned short count", + "addr", conn.RemoteAddr(), "actual", nw, "expected", len(message)) + return io.ErrShortWrite + } + + // No ack (reply) message from the receiver for now. + return +} + +// Close closes the host +func (host *HostV1) Close() error { + host.quit <- true + return host.listener.Close() +} diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go new file mode 100644 index 000000000..a6164ccf9 --- /dev/null +++ b/p2p/host/hostv2/hostv2.go @@ -0,0 +1,94 @@ +package hostv2 + +import ( + "bufio" + "context" + "fmt" + + "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/p2p" + libp2p "github.com/libp2p/go-libp2p" + libp2phost "github.com/libp2p/go-libp2p-host" + net "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + multiaddr "github.com/multiformats/go-multiaddr" +) + +const ( + // BatchSizeInByte The batch size in byte (64MB) in which we return data + BatchSizeInByte = 1 << 16 + // ProtocolID The ID of protocol used in stream handling. + ProtocolID = "/harmony/0.0.1" +) + +// HostV2 is the version 2 p2p host +type HostV2 struct { + h libp2phost.Host + self p2p.Peer +} + +// Peerstore returns the peer store +func (host *HostV2) Peerstore() peerstore.Peerstore { + return host.h.Peerstore() +} + +// New creates a host for p2p communication +func New(self p2p.Peer) *HostV2 { + addr := fmt.Sprintf("/ip4/%s/tcp/%s", self.IP, self.Port) + sourceAddr, err := multiaddr.NewMultiaddr(addr) + catchError(err) + priv := addrToPrivKey(addr) + p2pHost, err := libp2p.New(context.Background(), + libp2p.ListenAddrs(sourceAddr), + libp2p.Identity(priv), + libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves. + // TODO(ricl): Other features to probe + // libp2p.EnableRelay; libp2p.Routing; + ) + catchError(err) + log.Debug("Host is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addrs", sourceAddr) + h := &HostV2{ + h: p2pHost, + self: self, + } + return h +} + +// GetSelfPeer gets self peer +func (host *HostV2) GetSelfPeer() p2p.Peer { + return host.self +} + +// BindHandlerAndServe bind a streamHandler to the harmony protocol. +func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { + host.h.SetStreamHandler(ProtocolID, func(s net.Stream) { + handler(s) + }) + // Hang forever + <-make(chan struct{}) +} + +// SendMessage a p2p message sending function with signature compatible to p2pv1. +func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { + addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port) + targetAddr, err := multiaddr.NewMultiaddr(addr) + + priv := addrToPrivKey(addr) + peerID, _ := peer.IDFromPrivateKey(priv) + host.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) + s, err := host.h.NewStream(context.Background(), peerID, ProtocolID) + catchError(err) + + // Create a buffered stream so that read and writes are non blocking. + w := bufio.NewWriter(bufio.NewWriter(s)) + + // Create a thread to read and write data. + go writeData(w, message) + return nil +} + +// Close closes the host +func (host *HostV2) Close() error { + return host.h.Close() +} diff --git a/p2pv2/util.go b/p2p/host/hostv2/util.go similarity index 97% rename from p2pv2/util.go rename to p2p/host/hostv2/util.go index 52486bd78..01c29a86d 100644 --- a/p2pv2/util.go +++ b/p2p/host/hostv2/util.go @@ -1,4 +1,4 @@ -package p2pv2 +package hostv2 import ( "bufio" diff --git a/p2p/peer.go b/p2p/host/message.go similarity index 51% rename from p2p/peer.go rename to p2p/host/message.go index 476e9d4b5..f3e576cf0 100644 --- a/p2p/peer.go +++ b/p2p/host/message.go @@ -1,46 +1,26 @@ -package p2p +package host import ( "bytes" "encoding/binary" - "io" "net" "runtime" "time" "github.com/harmony-one/harmony/log" - "github.com/harmony-one/harmony/p2pv2" - - "github.com/dedis/kyber" + "github.com/harmony-one/harmony/p2p" ) -// Peer is the object for a p2p peer (node) -type Peer struct { - IP string // IP address of the peer - Port string // Port number of the peer - PubKey kyber.Point // Public key of the peer - Ready bool // Ready is true if the peer is ready to join consensus. - ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard - // TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. -} - -// MaxBroadCast is the maximum number of neighbors to broadcast -const MaxBroadCast = 20 - -// Version The version number of p2p library -// 1 - Direct socket connection -// 2 - libp2p -const Version = 1 - -// SendMessage sends the message to the peer -func SendMessage(peer Peer, msg []byte) { +// SendMessage is to connect a socket given a port and send the given message. +// TODO(minhdoan, rj): need to check if a peer is reachable or not. +func SendMessage(host Host, p p2p.Peer, message []byte) { // Construct normal p2p message - content := ConstructP2pMessage(byte(0), msg) - go send(peer.IP, peer.Port, content) + content := ConstructP2pMessage(byte(0), message) + go send(host, p, content) } // BroadcastMessage sends the message to a list of peers -func BroadcastMessage(peers []Peer, msg []byte) { +func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) { if len(peers) == 0 { return } @@ -52,7 +32,7 @@ func BroadcastMessage(peers []Peer, msg []byte) { start := time.Now() for _, peer := range peers { peerCopy := peer - go send(peerCopy.IP, peerCopy.Port, content) + go send(h, peerCopy, content) } log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds()) @@ -63,31 +43,6 @@ func BroadcastMessage(peers []Peer, msg []byte) { } } -// SelectMyPeers chooses a list of peers based on the range of ValidatorID -// This is a quick hack of the current p2p networking model -func SelectMyPeers(peers []Peer, min int, max int) []Peer { - res := []Peer{} - for _, peer := range peers { - if peer.ValidatorID >= min && peer.ValidatorID <= max { - res = append(res, peer) - } - } - return res -} - -// BroadcastMessageFromLeader sends the message to a list of peers from a leader. -func BroadcastMessageFromLeader(peers []Peer, msg []byte) { - // TODO(minhdoan): Enable back for multicast. - peers = SelectMyPeers(peers, 1, MaxBroadCast) - BroadcastMessage(peers, msg) -} - -// BroadcastMessageFromValidator sends the message to a list of peers from a validator. -func BroadcastMessageFromValidator(selfPeer Peer, peers []Peer, msg []byte) { - peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast) - BroadcastMessage(peers, msg) -} - // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] func ConstructP2pMessage(msgType byte, content []byte) []byte { @@ -103,61 +58,56 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte { return byteBuffer.Bytes() } -// SocketClient is to connect a socket given a port and send the given message. -// TODO(minhdoan, rj): need to check if a peer is reachable or not. -func sendWithSocketClient(ip, port string, message []byte) (err error) { - //log.Printf("Sending message to ip %s and port %s\n", ip, port) - addr := net.JoinHostPort(ip, port) - conn, err := net.Dial("tcp", addr) - - if err != nil { - log.Warn("Dial() failed", "addr", addr, "error", err) - return - } - defer conn.Close() +// BroadcastMessageFromLeader sends the message to a list of peers from a leader. +func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte) { + // TODO(minhdoan): Enable back for multicast. + peers = SelectMyPeers(peers, 1, MaxBroadCast) + BroadcastMessage(h, peers, msg) + log.Info("Done sending from leader") +} - nw, err := conn.Write(message) - if err != nil { - log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) - return - } - if nw < len(message) { - log.Warn("Write() returned short count", - "addr", conn.RemoteAddr(), "actual", nw, "expected", len(message)) - return io.ErrShortWrite - } +// BroadcastMessageFromValidator sends the message to a list of peers from a validator. +func BroadcastMessageFromValidator(h Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) { + peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast) + BroadcastMessage(h, peers, msg) + log.Info("Done sending from validator") +} - //log.Printf("Sent to ip %s and port %s: %s\n", ip, port, message) +// MaxBroadCast is the maximum number of neighbors to broadcast +const MaxBroadCast = 20 - // No ack (reply) message from the receiver for now. - return +// SelectMyPeers chooses a list of peers based on the range of ValidatorID +// This is a quick hack of the current p2p networking model +func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer { + res := []p2p.Peer{} + for _, peer := range peers { + if peer.ValidatorID >= min && peer.ValidatorID <= max { + res = append(res, peer) + } + } + return res } // Send a message to another node with given port. -func send(ip, port string, message []byte) { +func send(h Host, peer p2p.Peer, message []byte) { // Add attack code here. //attack.GetInstance().Run() - backoff := NewExpBackoff(250*time.Millisecond, 10*time.Second, 2) + backoff := p2p.NewExpBackoff(250*time.Millisecond, 10*time.Second, 2) for trial := 0; trial < 10; trial++ { var err error - if Version == 1 { - // TODO(ricl): remove sendWithSocketClient related code. - err = sendWithSocketClient(ip, port, message) - } else { - err = p2pv2.Send(ip, port, message) - } + h.SendMessage(peer, message) if err == nil { if trial > 0 { - log.Warn("retry sendWithSocketClient", "rety", trial) + log.Warn("retry send", "rety", trial) } return } log.Info("sleeping before trying to send again", - "duration", backoff.Cur, "addr", net.JoinHostPort(ip, port)) + "duration", backoff.Cur, "addr", net.JoinHostPort(peer.IP, peer.Port)) backoff.Sleep() } - log.Error("gave up sending a message", "addr", net.JoinHostPort(ip, port)) + log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port)) } // DialWithSocketClient joins host port and establishes connection diff --git a/p2p/p2p.go b/p2p/p2p.go new file mode 100644 index 000000000..06ddd0c0b --- /dev/null +++ b/p2p/p2p.go @@ -0,0 +1,28 @@ +package p2p + +import ( + "time" + + "github.com/dedis/kyber" +) + +// Stream is abstract p2p stream from where we read message +type Stream interface { + Read([]byte) (int, error) + Write([]byte) (int, error) + Close() error + SetReadDeadline(time.Time) error +} + +// StreamHandler handles incoming p2p message. +type StreamHandler func(Stream) + +// Peer is the object for a p2p peer (node) +type Peer struct { + IP string // IP address of the peer + Port string // Port number of the peer + PubKey kyber.Point // Public key of the peer + Ready bool // Ready is true if the peer is ready to join consensus. + ValidatorID int // -1 is the default value, means not assigned any validator ID in the shard + // TODO(minhdoan, rj): use this Ready to not send/broadcast to this peer if it wasn't available. +} diff --git a/p2p/p2pimpl/p2pimpl.go b/p2p/p2pimpl/p2pimpl.go new file mode 100644 index 000000000..620915953 --- /dev/null +++ b/p2p/p2pimpl/p2pimpl.go @@ -0,0 +1,25 @@ +package p2pimpl + +import ( + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/host" + "github.com/harmony-one/harmony/p2p/host/hostv1" + "github.com/harmony-one/harmony/p2p/host/hostv2" +) + +// Version The version number of p2p library +// 1 - Direct socket connection +// 2 - libp2p +const Version = 1 + +// NewHost starts the host +func NewHost(peer p2p.Peer) host.Host { + // log.Debug("New Host", "ip/port", net.JoinHostPort(peer.IP, peer.Port)) + if Version == 1 { + h := hostv1.New(peer) + return h + } + + h := hostv2.New(peer) + return h +} diff --git a/p2pv2/host.go b/p2pv2/host.go deleted file mode 100644 index 4f7e5f650..000000000 --- a/p2pv2/host.go +++ /dev/null @@ -1,146 +0,0 @@ -package p2pv2 - -import ( - "bufio" - "bytes" - "context" - "encoding/binary" - "fmt" - "io" - "time" - - "github.com/harmony-one/harmony/log" - libp2p "github.com/libp2p/go-libp2p" - host "github.com/libp2p/go-libp2p-host" - net "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - multiaddr "github.com/multiformats/go-multiaddr" -) - -var ( - myHost host.Host // TODO(ricl): this should be a field in node. -) - -const ( - // BatchSizeInByte The batch size in byte (64MB) in which we return data - BatchSizeInByte = 1 << 16 - // ProtocolID The ID of protocol used in stream handling. - ProtocolID = "/harmony/0.0.1" -) - -// InitHost Initialize a host for p2p communication -func InitHost(ip, port string) { - addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) - sourceAddr, err := multiaddr.NewMultiaddr(addr) - catchError(err) - priv := addrToPrivKey(addr) - myHost, err = libp2p.New(context.Background(), - libp2p.ListenAddrs(sourceAddr), - libp2p.Identity(priv), - libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves. - // TODO(ricl): Other features to probe - // libp2p.EnableRelay; libp2p.Routing; - ) - catchError(err) - log.Debug("Host is up!", "port", port, "id", myHost.ID().Pretty(), "addrs", sourceAddr) -} - -// BindHandler bind a streamHandler to the harmony protocol. -func BindHandler(handler net.StreamHandler) { - myHost.SetStreamHandler(ProtocolID, handler) -} - -// Send a p2p message sending function with signature compatible to p2pv1. -func Send(ip, port string, message []byte) error { - addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) - targetAddr, err := multiaddr.NewMultiaddr(addr) - - priv := addrToPrivKey(addr) - peerID, _ := peer.IDFromPrivateKey(priv) - myHost.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) - s, err := myHost.NewStream(context.Background(), peerID, ProtocolID) - catchError(err) - - // Create a buffered stream so that read and writes are non blocking. - w := bufio.NewWriter(bufio.NewWriter(s)) - - // Create a thread to read and write data. - go writeData(w, message) - return nil -} - -// ReadData Call this function in streamHandler to get the binary data. -func ReadData(s net.Stream) ([]byte, error) { - timeoutDuration := 1 * time.Second - s.SetReadDeadline(time.Now().Add(timeoutDuration)) - - // Create a buffered stream so that read and writes are non blocking. - rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) - - contentBuf := bytes.NewBuffer([]byte{}) - // Read 1 byte for message type - _, err := rw.ReadByte() - switch err { - case nil: - //log.Printf("Received p2p message type: %x\n", msgType) - case io.EOF: - fallthrough - default: - log.Error("Error reading the p2p message type field", "err", err) - return contentBuf.Bytes(), err - } - // TODO: check on msgType and take actions accordingly - - // Read 4 bytes for message size - fourBytes := make([]byte, 4) - n, err := rw.Read(fourBytes) - if err != nil { - log.Error("Error reading the p2p message size field", "err", err) - return contentBuf.Bytes(), err - } else if n < len(fourBytes) { - log.Error("Invalid byte size", "bytes", n) - return contentBuf.Bytes(), err - } - - //log.Print(fourBytes) - // Number of bytes for the message content - bytesToRead := binary.BigEndian.Uint32(fourBytes) - //log.Printf("The content size is %d bytes.", bytesToRead) - - // Read the content in chunk of size `BatchSizeInByte` - tmpBuf := make([]byte, BatchSizeInByte) -ILOOP: - for { - // TODO(ricl): is this necessary? If yes, figure out how to make it work - // timeoutDuration := 10 * time.Second - // s.SetReadDeadline(time.Now().Add(timeoutDuration)) - if bytesToRead < BatchSizeInByte { - // Read the last number of bytes less than `BatchSizeInByte` - tmpBuf = make([]byte, bytesToRead) - } - n, err := rw.Read(tmpBuf) - contentBuf.Write(tmpBuf[:n]) - - switch err { - case io.EOF: - // TODO: should we return error here, or just ignore it? - log.Error("EOF reached while reading p2p message") - break ILOOP - case nil: - bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop - if bytesToRead <= 0 { - break ILOOP - } - default: - log.Error("Error reading p2p message") - return []byte{}, err - } - } - return contentBuf.Bytes(), nil -} - -// GetHost Get the p2p host -func GetHost() host.Host { - return myHost -} diff --git a/runbeacon/run-beacon.go b/runbeacon/run-beacon.go index 42739dc40..9500541a9 100644 --- a/runbeacon/run-beacon.go +++ b/runbeacon/run-beacon.go @@ -7,6 +7,7 @@ import ( "path" "github.com/harmony-one/harmony/beaconchain" + "github.com/harmony-one/harmony/log" ) var ( @@ -33,6 +34,9 @@ func main() { printVersion(os.Args[0]) } + h := log.StdoutHandler + log.Root().SetHandler(h) + bc := beaconchain.New(*numShards, *ip, *port) bc.StartServer() }