Merge pull request #119 from harmony-one/HAR-67_libp2p_integration_v2

[HAR-67]: libp2p integration v2
pull/135/head
Richard Liu 6 years ago committed by GitHub
commit b50cd7910b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      benchmark.go
  2. 79
      client/txgen/main.go
  3. 4
      client/wallet/main.go
  4. 4
      client/wallet_v2/main.go
  5. 9
      node/node.go
  6. 9
      node/node_test.go

@ -225,5 +225,5 @@ func main() {
} }
go currentNode.SupportSyncing() go currentNode.SupportSyncing()
currentNode.StartServer(*port) currentNode.StartServer()
} }

@ -3,7 +3,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/harmony-one/harmony/utils"
"os" "os"
"path" "path"
"runtime" "runtime"
@ -21,6 +20,7 @@ import (
"github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
proto_node "github.com/harmony-one/harmony/proto/node" proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/harmony-one/harmony/utils"
) )
var ( var (
@ -89,56 +89,55 @@ func main() {
} }
// Client/txgenerator server node setup // Client/txgenerator server node setup
if clientPeer == nil {
panic("Client Peer is nil!")
}
consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{}) consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj, nil, *clientPeer) clientNode := node.New(consensusObj, nil, *clientPeer)
if clientPeer != nil { clientNode.Client = client.NewClient(&shardIDLeaderMap)
//p2pv2.InitHost(clientPeer.IP, clientPeer.Port) // TODO: this should be moved into client node.
clientNode.Client = client.NewClient(&shardIDLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
// This func is used to update the client's utxopool when new blocks are received from the leaders log.Debug("Received new block from leader", "len", len(blocks))
updateBlocksFunc := func(blocks []*blockchain.Block) { for _, block := range blocks {
log.Debug("Received new block from leader", "len", len(blocks)) for _, node := range nodes {
for _, block := range blocks { shardID := block.ShardID
for _, node := range nodes {
shardID := block.ShardID
accountBlock := new(types.Block)
err := rlp.DecodeBytes(block.AccountBlock, accountBlock)
if err == nil {
shardID = accountBlock.ShardID()
}
if node.Consensus.ShardID == shardID {
log.Debug("Adding block from leader", "shardID", shardID)
// Add it to blockchain
node.AddNewBlock(block)
utxoPoolMutex.Lock()
node.UpdateUtxoAndState(block)
utxoPoolMutex.Unlock()
accountBlock := new(types.Block)
err := rlp.DecodeBytes(block.AccountBlock, accountBlock)
if err == nil {
shardID = accountBlock.ShardID()
}
if node.Consensus.ShardID == shardID {
log.Debug("Adding block from leader", "shardID", shardID)
// Add it to blockchain
node.AddNewBlock(block)
utxoPoolMutex.Lock()
node.UpdateUtxoAndState(block)
utxoPoolMutex.Unlock()
if err != nil {
log.Error("Failed decoding the block with RLP")
} else {
fmt.Println("RECEIVED NEW BLOCK ", len(accountBlock.Transactions()))
node.AddNewBlockAccount(accountBlock)
node.Worker.UpdateCurrent()
if err != nil { if err != nil {
log.Error("Failed decoding the block with RLP") log.Debug("Failed to add new block to worker", "Error", err)
} else {
fmt.Println("RECEIVED NEW BLOCK ", len(accountBlock.Transactions()))
node.AddNewBlockAccount(accountBlock)
node.Worker.UpdateCurrent()
if err != nil {
log.Debug("Failed to add new block to worker", "Error", err)
}
} }
} else {
continue
} }
} else {
continue
} }
} }
} }
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
clientNode.StartServer(clientPeer.Port)
}()
} }
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go clientNode.StartServer()
// Transaction generation process // Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now() start := time.Now()

@ -93,7 +93,7 @@ func main() {
fmt.Println("Private key imported...") fmt.Println("Private key imported...")
case "showBalance": case "showBalance":
walletNode := CreateWalletServerNode() walletNode := CreateWalletServerNode()
go walletNode.StartServer(walletNode.ClientPeer.Port) go walletNode.StartServer()
shardUtxoMap, err := FetchUtxos(ReadAddresses(), walletNode) shardUtxoMap, err := FetchUtxos(ReadAddresses(), walletNode)
if err != nil { if err != nil {
@ -163,7 +163,7 @@ func main() {
// Start client server // Start client server
walletNode := CreateWalletServerNode() walletNode := CreateWalletServerNode()
go walletNode.StartServer(walletNode.ClientPeer.Port) go walletNode.StartServer()
shardUtxoMap, err := FetchUtxos([][20]byte{senderAddressBytes}, walletNode) shardUtxoMap, err := FetchUtxos([][20]byte{senderAddressBytes}, walletNode)
if err != nil { if err != nil {

@ -93,7 +93,7 @@ func main() {
fmt.Println("Private key imported...") fmt.Println("Private key imported...")
case "showBalance": case "showBalance":
walletNode := CreateWalletServerNode() walletNode := CreateWalletServerNode()
go walletNode.StartServer(walletNode.ClientPeer.Port) go walletNode.StartServer()
shardUtxoMap, err := FetchUtxos(ReadAddresses(), walletNode) shardUtxoMap, err := FetchUtxos(ReadAddresses(), walletNode)
if err != nil { if err != nil {
@ -163,7 +163,7 @@ func main() {
// Start client server // Start client server
walletNode := CreateWalletServerNode() walletNode := CreateWalletServerNode()
go walletNode.StartServer(walletNode.ClientPeer.Port) go walletNode.StartServer()
shardUtxoMap, err := FetchUtxos([][20]byte{senderAddressBytes}, walletNode) shardUtxoMap, err := FetchUtxos([][20]byte{senderAddressBytes}, walletNode)
if err != nil { if err != nil {

@ -160,13 +160,12 @@ func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transa
} }
// StartServer starts a server and process the request by a handler. // StartServer starts a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer() {
if p2p.Version == 1 { if p2p.Version == 1 {
fmt.Println("going to start server on port:", port) node.log.Debug("Starting server", "node", node, "ip", node.SelfPeer.IP, "port", node.SelfPeer.Port)
//node.log.Debug("Starting server", "node", node, "port", port) node.listenOnPort(node.SelfPeer.Port)
node.listenOnPort(port)
} else { } else {
p2pv2.InitHost(node.SelfPeer.IP, port) p2pv2.InitHost(node.SelfPeer.IP, node.SelfPeer.Port)
p2pv2.BindHandler(node.NodeHandlerV1) p2pv2.BindHandler(node.NodeHandlerV1)
// Hang forever // Hang forever
<-make(chan struct{}) <-make(chan struct{})

@ -6,15 +6,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/harmony-one/harmony/utils" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
proto_node "github.com/harmony-one/harmony/proto/node" proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/harmony-one/harmony/utils"
) )
func TestNewNewNode(t *testing.T) { func TestNewNewNode(t *testing.T) {
@ -181,5 +178,5 @@ func TestPingPongHandler(test *testing.T) {
//go sendPingMessage(leader) //go sendPingMessage(leader)
go sendPongMessage(leader) go sendPongMessage(leader)
go exitServer() go exitServer()
node.StartServer("8881") node.StartServer()
} }

Loading…
Cancel
Save