diff --git a/.travis.yml b/.travis.yml index e56346534..b2105d342 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: - - master + - stable install: - export GOPATH=$HOME/gopath - export CGO_CPPFLAGS="-I$GOPATH/src/github.com/harmony-one/bls/include -I$GOPATH/src/github.com/harmony-one/mcl/include" diff --git a/api/proto/node/pingpong.go b/api/proto/node/pingpong.go index 18d626380..a461f3d5d 100644 --- a/api/proto/node/pingpong.go +++ b/api/proto/node/pingpong.go @@ -54,6 +54,10 @@ type Info struct { PeerID peer.ID // Peerstore ID } +func (info Info) String() string { + return fmt.Sprintf("Info:%v/%v=>%v/%v", info.IP, info.Port, info.ValidatorID, info.PeerID) +} + // PingMessageType defines the data structure of the Ping message type PingMessageType struct { Version uint16 // version of the protocol diff --git a/cmd/beaconchain/main.go b/cmd/beaconchain/main.go index f44c65df0..7a502a2ce 100644 --- a/cmd/beaconchain/main.go +++ b/cmd/beaconchain/main.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/log" beaconchain "github.com/harmony-one/harmony/internal/beaconchain/libs" + "github.com/harmony-one/harmony/internal/utils" ) var ( @@ -46,7 +47,9 @@ func main() { } else { fmt.Printf("Starting new beaconchain\n") beaconchain.SetSaveFile(*resetFlag) - bc = beaconchain.New(*numShards, *ip, *port) + + priKey, _, _ := utils.GenKeyP2P(*ip, *port) + bc = beaconchain.New(*numShards, *ip, *port, priKey) } fmt.Printf("Beacon Chain Started: /ip4/%s/tcp/%v/ipfs/%s\n", *ip, *port, bc.GetID().Pretty()) diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 97a9e6e56..7efe4cd31 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -16,6 +16,7 @@ import ( "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/newnode" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" @@ -65,6 +66,7 @@ func main() { var bcPeer *p2p.Peer var shardIDLeaderMap map[uint32]p2p.Peer + priKey, _, err := utils.GenKeyP2P(*ip, *port) if *bcAddr != "" { // Turn the destination into a multiaddr. @@ -84,7 +86,7 @@ func main() { bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} } - candidateNode := newnode.New(*ip, *port) + candidateNode := newnode.New(*ip, *port, priKey) candidateNode.AddPeer(bcPeer) candidateNode.ContactBeaconChain(*bcPeer) selfPeer := candidateNode.GetSelfPeer() @@ -113,7 +115,7 @@ func main() { // Nodes containing blockchain data to mirror the shards' data in the network nodes := []*node.Node{} - host, err := p2pimpl.NewHost(&selfPeer) + host, err := p2pimpl.NewHost(&selfPeer, priKey) if err != nil { panic("unable to new host in txgen") } diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index fd3c8adc1..7c697a208 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -24,6 +24,7 @@ import ( "github.com/harmony-one/harmony/core/types" libs "github.com/harmony-one/harmony/internal/beaconchain/libs" beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" @@ -299,7 +300,8 @@ func CreateWalletNode() *node.Node { // dummy host for wallet self := p2p.Peer{IP: "127.0.0.1", Port: "6789"} - host, _ := p2pimpl.NewHost(&self) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "6789") + host, _ := p2pimpl.NewHost(&self, priKey) walletNode := node.New(host, nil, nil) walletNode.Client = client.NewClient(walletNode.GetHost(), &shardIDLeaderMap) return walletNode diff --git a/cmd/harmony.go b/cmd/harmony.go index eac75505d..b8a2b4dcd 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -119,6 +119,7 @@ func main() { var selfPeer p2p.Peer var clientPeer *p2p.Peer var BCPeer *p2p.Peer + priKey, _, err := utils.GenKeyP2P(*ip, *port) if *bcAddr != "" { // Turn the destination into a multiaddr. @@ -139,7 +140,7 @@ func main() { } //Use Peer Discovery to get shard/leader/peer/... - candidateNode := pkg_newnode.New(*ip, *port) + candidateNode := pkg_newnode.New(*ip, *port, priKey) candidateNode.AddPeer(BCPeer) candidateNode.ContactBeaconChain(*BCPeer) @@ -149,7 +150,7 @@ func main() { clientPeer = candidateNode.GetClientPeer() selfPeer.PubKey = candidateNode.PubK - // fmt.Println(peers, leader, selfPeer, clientPeer, *logFolder, *minPeers) //TODO: to be replaced by a logger later: ak, rl + fmt.Println("Harmnoy", leader, selfPeer) var role string if leader.IP == *ip && leader.Port == *port { @@ -172,7 +173,7 @@ func main() { ldb, _ = InitLDBDatabase(*ip, *port, *freshDB) } - host, err := p2pimpl.NewHost(&selfPeer) + host, err := p2pimpl.NewHost(&selfPeer, priKey) if err != nil { panic("unable to new host in harmony") } diff --git a/consensus/consensus_leader_msg_test.go b/consensus/consensus_leader_msg_test.go index 84ddee43e..cc33ad357 100644 --- a/consensus/consensus_leader_msg_test.go +++ b/consensus/consensus_leader_msg_test.go @@ -8,15 +8,17 @@ import ( consensus_proto "github.com/harmony-one/harmony/api/consensus" "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" ) func TestConstructAnnounceMessage(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"} - host, err := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) if err != nil { - test.Fatalf("new host failed: %v", err) + test.Fatalf("newhost failure: %v", err) } consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} @@ -39,9 +41,10 @@ func TestConstructChallengeMessage(test *testing.T) { validatorPriKey.UnmarshalBinary(priKeyInBytes[:]) validatorPubKey := pki.GetPublicKeyFromScalar(leaderPriKey) validator := p2p.Peer{IP: "127.0.0.1", Port: "5555", PubKey: validatorPubKey} - host, err := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) if err != nil { - test.Fatalf("new host failed: %v", err) + test.Fatalf("newhost failure: %v", err) } consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go index eb705e404..76d2c4e6c 100644 --- a/consensus/consensus_leader_test.go +++ b/consensus/consensus_leader_test.go @@ -1,6 +1,8 @@ package consensus import ( + "fmt" + "github.com/golang/mock/gomock" "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/internal/utils" @@ -15,19 +17,25 @@ import ( "github.com/harmony-one/harmony/p2p" ) +var ( + ip = "127.0.0.1" +) + func TestProcessMessageLeaderCommit(test *testing.T) { ctrl := gomock.NewController(test) defer ctrl.Finish() - leader := p2p.Peer{IP: "127.0.0.1", Port: "7777"} + leader := p2p.Peer{IP: ip, Port: "7777"} _, leader.PubKey = utils.GenKey(leader.IP, leader.Port) - validator1 := p2p.Peer{IP: "127.0.0.1", Port: "7778", ValidatorID: 1} - _, validator1.PubKey = utils.GenKey(validator1.IP, validator1.Port) - validator2 := p2p.Peer{IP: "127.0.0.1", Port: "7776", ValidatorID: 2} - _, validator2.PubKey = utils.GenKey(validator2.IP, validator2.Port) - validator3 := p2p.Peer{IP: "127.0.0.1", Port: "7779", ValidatorID: 3} - _, validator3.PubKey = utils.GenKey(validator3.IP, validator3.Port) + validators := make([]p2p.Peer, 3) + hosts := make([]p2p.Host, 3) + + for i := 0; i < 3; i++ { + port := fmt.Sprintf("%d", 7788+i) + validators[i] = p2p.Peer{IP: ip, Port: port, ValidatorID: i + 1} + _, validators[i].PubKey = utils.GenKey(validators[i].IP, validators[i].Port) + } m := mock_host.NewMockHost(ctrl) // Asserts that the first and only call to Bar() is passed 99. @@ -35,26 +43,23 @@ func TestProcessMessageLeaderCommit(test *testing.T) { m.EXPECT().GetSelfPeer().Return(leader) m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(3) - consensusLeader := New(m, "0", []p2p.Peer{validator1, validator2, validator3}, leader) + consensusLeader := New(m, "0", validators, leader) consensusLeader.blockHash = [32]byte{} - host1, _ := p2pimpl.NewHost(&validator1) - consensusValidator1 := New(host1, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusValidator1.blockHash = [32]byte{} - _, msg := consensusValidator1.constructCommitMessage(consensus_proto.MessageType_COMMIT) - consensusLeader.ProcessMessageLeader(msg[1:]) - - host2, _ := p2pimpl.NewHost(&validator2) - consensusValidator2 := New(host2, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusValidator2.blockHash = [32]byte{} - _, msg = consensusValidator2.constructCommitMessage(consensus_proto.MessageType_COMMIT) - consensusLeader.ProcessMessageLeader(msg[1:]) - - host3, _ := p2pimpl.NewHost(&validator3) - consensusValidator3 := New(host3, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusValidator3.blockHash = [32]byte{} - _, msg = consensusValidator3.constructCommitMessage(consensus_proto.MessageType_COMMIT) - consensusLeader.ProcessMessageLeader(msg[1:]) + consensusValidators := make([]*Consensus, 3) + for i := 0; i < 3; i++ { + priKey, _, _ := utils.GenKeyP2P(validators[i].IP, validators[i].Port) + host, err := p2pimpl.NewHost(&validators[i], priKey) + if err != nil { + test.Fatalf("newhost error: %v", err) + } + hosts[i] = host + + consensusValidators[i] = New(hosts[i], "0", validators, leader) + consensusValidators[i].blockHash = [32]byte{} + _, msg := consensusValidators[i].constructCommitMessage(consensus_proto.MessageType_COMMIT) + consensusLeader.ProcessMessageLeader(msg[1:]) + } assert.Equal(test, ChallengeDone, consensusLeader.state) @@ -65,15 +70,17 @@ func TestProcessMessageLeaderResponse(test *testing.T) { ctrl := gomock.NewController(test) defer ctrl.Finish() - leader := p2p.Peer{IP: "127.0.0.1", Port: "8889"} + leader := p2p.Peer{IP: ip, Port: "8889"} _, leader.PubKey = utils.GenKey(leader.IP, leader.Port) - validator1 := p2p.Peer{IP: "127.0.0.1", Port: "8887", ValidatorID: 1} - _, validator1.PubKey = utils.GenKey(validator1.IP, validator1.Port) - validator2 := p2p.Peer{IP: "127.0.0.1", Port: "8888", ValidatorID: 2} - _, validator2.PubKey = utils.GenKey(validator2.IP, validator2.Port) - validator3 := p2p.Peer{IP: "127.0.0.1", Port: "8899", ValidatorID: 3} - _, validator3.PubKey = utils.GenKey(validator3.IP, validator3.Port) + validators := make([]p2p.Peer, 3) + hosts := make([]p2p.Host, 3) + + for i := 0; i < 3; i++ { + port := fmt.Sprintf("%d", 8788+i) + validators[i] = p2p.Peer{IP: ip, Port: port, ValidatorID: i + 1} + _, validators[i].PubKey = utils.GenKey(validators[i].IP, validators[i].Port) + } m := mock_host.NewMockHost(ctrl) // Asserts that the first and only call to Bar() is passed 99. @@ -81,35 +88,30 @@ func TestProcessMessageLeaderResponse(test *testing.T) { m.EXPECT().GetSelfPeer().Return(leader) m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(6) - consensusLeader := New(m, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusLeader.blockHash = [32]byte{} - - host1, _ := p2pimpl.NewHost(&validator1) - consensusValidator1 := New(host1, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusValidator1.blockHash = [32]byte{} - _, msg := consensusValidator1.constructCommitMessage(consensus_proto.MessageType_COMMIT) - consensusLeader.ProcessMessageLeader(msg[1:]) - - host2, _ := p2pimpl.NewHost(&validator2) - consensusValidator2 := New(host2, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusValidator2.blockHash = [32]byte{} - _, msg = consensusValidator2.constructCommitMessage(consensus_proto.MessageType_COMMIT) - consensusLeader.ProcessMessageLeader(msg[1:]) + for i := 0; i < 3; i++ { + priKey, _, _ := utils.GenKeyP2P(validators[i].IP, validators[i].Port) + host, err := p2pimpl.NewHost(&validators[i], priKey) + if err != nil { + test.Fatalf("newhost error: %v", err) + } + hosts[i] = host + } - host3, _ := p2pimpl.NewHost(&validator3) - consensusValidator3 := New(host3, "0", []p2p.Peer{validator1, validator2, validator3}, leader) - consensusValidator3.blockHash = [32]byte{} - _, msg = consensusValidator3.constructCommitMessage(consensus_proto.MessageType_COMMIT) - consensusLeader.ProcessMessageLeader(msg[1:]) - - msg = consensusValidator1.constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar().One()) - consensusLeader.ProcessMessageLeader(msg[1:]) - - msg = consensusValidator2.constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar().One()) - consensusLeader.ProcessMessageLeader(msg[1:]) + consensusLeader := New(m, "0", validators, leader) + consensusLeader.blockHash = [32]byte{} - msg = consensusValidator3.constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar().One()) - consensusLeader.ProcessMessageLeader(msg[1:]) + consensusValidators := make([]*Consensus, 3) + for i := 0; i < 3; i++ { + consensusValidators[i] = New(hosts[i], "0", validators, leader) + consensusValidators[i].blockHash = [32]byte{} + _, msg := consensusValidators[i].constructCommitMessage(consensus_proto.MessageType_COMMIT) + consensusLeader.ProcessMessageLeader(msg[1:]) + } + + for i := 0; i < 3; i++ { + msg := consensusValidators[i].constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar().One()) + consensusLeader.ProcessMessageLeader(msg[1:]) + } assert.Equal(test, CollectiveSigDone, consensusLeader.state) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index c773ba509..41f7f85b0 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -11,7 +11,11 @@ import ( func TestNew(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) if consensus.consensusID != 0 { test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusID) @@ -47,7 +51,11 @@ func TestRemovePeers(t *testing.T) { peerRemove := []p2p.Peer{p1, p2} leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := New(host, "0", peers, leader) // consensus.DebugPrintPublicKeys() @@ -61,7 +69,8 @@ func TestRemovePeers(t *testing.T) { func TestGetPeerFromID(t *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} - host, err := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) if err != nil { t.Fatalf("newhost failure: %v", err) } diff --git a/consensus/consensus_validator_msg_test.go b/consensus/consensus_validator_msg_test.go index 6d40e6417..290736da6 100644 --- a/consensus/consensus_validator_msg_test.go +++ b/consensus/consensus_validator_msg_test.go @@ -7,13 +7,18 @@ import ( consensus_proto "github.com/harmony-one/harmony/api/consensus" "github.com/harmony-one/harmony/crypto" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" ) func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9992"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9995"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} _, msg := consensus.constructCommitMessage(consensus_proto.MessageType_COMMIT) @@ -26,7 +31,11 @@ func TestConstructCommitMessage(test *testing.T) { func TestConstructResponseMessage(test *testing.T) { leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } consensus := New(host, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} msg := consensus.constructResponseMessage(consensus_proto.MessageType_RESPONSE, crypto.Ed25519Curve.Scalar()) diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go index 1d451fc4c..5dcbddcbd 100644 --- a/consensus/consensus_validator_test.go +++ b/consensus/consensus_validator_test.go @@ -36,7 +36,11 @@ func TestProcessMessageValidatorAnnounce(test *testing.T) { m.EXPECT().GetSelfPeer().Return(leader) m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1) - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } consensusLeader := New(host, "0", []p2p.Peer{validator1, validator2, validator3}, leader) blockBytes, err := hex.DecodeString("f90461f90222a0f7007987c6f26b20cbd6384e3587445eca556beb6716f8eb6a2f590ce8ed3925940000000000000000000000000000000000000000a0f4f2f8416b65c98890630b105f016370abaab236c92faf7fc73a13d037958c52a0db025c6f785698feb447b509908fe488486062e4607afaae85c3336692445b01a03688be0d6b3d0651911204b4539e11096045cacbb676401e2655653823014c8cb90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008001850254a0e6f88303295d845c2e4f0e80a00000000000000000000000000000000000000000000000000000000000000000880000000000000000840000000080b842000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f90238f9023580808083081650808b069e10de76676d08000000b901db6080604052678ac7230489e8000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058203e799228fee2fa7c5d15e71c04267a0cc2687c5eff3b48b98f21f355e1064ab300291ba0a87b9130f7f127af3a713a270610da48d56dedc9501e624bdfe04871859c88f3a05a94b087c05c6395825c5fc35d5ce96b2e61f0ce5f2d67b28f9b2d1178fa90f0c0") consensusLeader.block = blockBytes @@ -86,7 +90,11 @@ func TestProcessMessageValidatorChallenge(test *testing.T) { m.EXPECT().GetSelfPeer().Return(leader) m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2) - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } consensusLeader := New(host, "0", []p2p.Peer{validator1, validator2, validator3}, leader) blockBytes, err := hex.DecodeString("f90461f90222a0f7007987c6f26b20cbd6384e3587445eca556beb6716f8eb6a2f590ce8ed3925940000000000000000000000000000000000000000a0f4f2f8416b65c98890630b105f016370abaab236c92faf7fc73a13d037958c52a0db025c6f785698feb447b509908fe488486062e4607afaae85c3336692445b01a03688be0d6b3d0651911204b4539e11096045cacbb676401e2655653823014c8cb90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008001850254a0e6f88303295d845c2e4f0e80a00000000000000000000000000000000000000000000000000000000000000000880000000000000000840000000080b842000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f90238f9023580808083081650808b069e10de76676d08000000b901db6080604052678ac7230489e8000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058203e799228fee2fa7c5d15e71c04267a0cc2687c5eff3b48b98f21f355e1064ab300291ba0a87b9130f7f127af3a713a270610da48d56dedc9501e624bdfe04871859c88f3a05a94b087c05c6395825c5fc35d5ce96b2e61f0ce5f2d67b28f9b2d1178fa90f0c0") consensusLeader.block = blockBytes diff --git a/internal/beaconchain/libs/beaconchain.go b/internal/beaconchain/libs/beaconchain.go index 6f6ee8bb6..65bb63297 100644 --- a/internal/beaconchain/libs/beaconchain.go +++ b/internal/beaconchain/libs/beaconchain.go @@ -18,6 +18,7 @@ import ( "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/p2pimpl" + p2p_crypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" ) @@ -93,12 +94,12 @@ func (bc *BeaconChain) GetShardLeaderMap() map[int]*node.Info { } //New beaconchain initialization -func New(numShards int, ip, port string) *BeaconChain { +func New(numShards int, ip, port string, key p2p_crypto.PrivKey) *BeaconChain { bc := BeaconChain{} bc.log = log.New() bc.PubKey = generateBCKey() bc.Self = p2p.Peer{IP: ip, Port: port} - bc.host, _ = p2pimpl.NewHost(&bc.Self) + bc.host, _ = p2pimpl.NewHost(&bc.Self, key) bcinfo := &BCInfo{NumberOfShards: numShards, NumberOfNodesAdded: 0, IP: ip, Port: port, diff --git a/internal/beaconchain/libs/beaconchain_test.go b/internal/beaconchain/libs/beaconchain_test.go index fbc50abf5..ebabfc95f 100644 --- a/internal/beaconchain/libs/beaconchain_test.go +++ b/internal/beaconchain/libs/beaconchain_test.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/api/proto/bcconn" "github.com/harmony-one/harmony/api/proto/node" beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc" + "github.com/harmony-one/harmony/internal/utils" "github.com/stretchr/testify/assert" ) @@ -28,7 +29,8 @@ func TestNewNode(t *testing.T) { ip = "127.0.0.1" port = "7523" numshards := 2 - bc := New(numshards, ip, port) + priKey, _, _ := utils.GenKeyP2P(ip, port) + bc := New(numshards, ip, port, priKey) if bc.PubKey == nil { t.Error("beacon chain public key not initialized") @@ -48,7 +50,8 @@ func TestShardLeaderMap(t *testing.T) { ip = "127.0.0.1" beaconport := "7523" numshards := 1 - bc := New(numshards, ip, beaconport) + priKey, _, _ := utils.GenKeyP2P(ip, beaconport) + bc := New(numshards, ip, beaconport, priKey) bc.BCInfo.Leaders = leaders if !reflect.DeepEqual(bc.GetShardLeaderMap(), shardLeaderMap) { t.Error("The function GetShardLeaderMap doesn't work well") @@ -61,7 +64,8 @@ func TestFetchLeaders(t *testing.T) { ip = "127.0.0.1" beaconport := "7523" numshards := 1 - bc := New(numshards, ip, beaconport) + priKey, _, _ := utils.GenKeyP2P(ip, beaconport) + bc := New(numshards, ip, beaconport, priKey) bc.BCInfo.Leaders = leaders bc.rpcServer = beaconchain.NewServer(bc.GetShardLeaderMap) bc.StartRPCServer() @@ -80,7 +84,8 @@ func TestAcceptNodeInfo(t *testing.T) { ip = "127.0.0.1" beaconport := "7523" numshards := 1 - bc := New(numshards, ip, beaconport) + priKey, _, _ := utils.GenKeyP2P(ip, beaconport) + bc := New(numshards, ip, beaconport, priKey) b := bcconn.SerializeNodeInfo(leader1) node := bc.AcceptNodeInfo(b) if !reflect.DeepEqual(node, leader1) { @@ -97,7 +102,8 @@ func TestRespondRandomness(t *testing.T) { ip = "127.0.0.1" beaconport := "7523" numshards := 1 - bc := New(numshards, ip, beaconport) + priKey, _, _ := utils.GenKeyP2P(ip, beaconport) + bc := New(numshards, ip, beaconport, priKey) bc.RespondRandomness(leader1) assert.Equal(t, RandomInfoSent, bc.state) } @@ -107,7 +113,8 @@ func TestAcceptConnections(t *testing.T) { ip = "127.0.0.1" beaconport := "7523" numshards := 1 - bc := New(numshards, ip, beaconport) + priKey, _, _ := utils.GenKeyP2P(ip, beaconport) + bc := New(numshards, ip, beaconport, priKey) b := bcconn.SerializeNodeInfo(leader1) bc.AcceptConnections(b) assert.Equal(t, RandomInfoSent, bc.state) diff --git a/internal/newnode/newnode.go b/internal/newnode/newnode.go index 42947120e..72e5f8687 100644 --- a/internal/newnode/newnode.go +++ b/internal/newnode/newnode.go @@ -19,6 +19,7 @@ import ( "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/p2pimpl" + p2p_crypto "github.com/libp2p/go-libp2p-crypto" multiaddr "github.com/multiformats/go-multiaddr" ) @@ -39,7 +40,7 @@ type NewNode struct { } // New candidatenode initialization -func New(ip string, port string) *NewNode { +func New(ip string, port string, nodePk p2p_crypto.PrivKey) *NewNode { priKey, pubKey := utils.GenKey(ip, port) var node NewNode var err error @@ -48,8 +49,7 @@ func New(ip string, port string) *NewNode { node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} node.log = utils.GetLogInstance() node.SetInfo = make(chan bool) - node.host, err = p2pimpl.NewHost(&node.Self) - node.log.Info("NewNode New", "Self", node.Self) + node.host, err = p2pimpl.NewHost(&node.Self, nodePk) if err != nil { node.log.Error("failed to create new host", "msg", err) return nil @@ -81,6 +81,7 @@ func (node *NewNode) requestBeaconChain(BCPeer p2p.Peer) (err error) { if err != nil { node.log.Error("Could not Marshall public key into binary") } + fmt.Printf("[New Node]: %v\n", *node) nodeInfo := &proto_node.Info{IP: node.Self.IP, Port: node.Self.Port, PubKey: pubk, PeerID: node.Self.PeerID} msg := bcconn.SerializeNodeInfo(nodeInfo) msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Register, msg) diff --git a/internal/newnode/newnode_test.go b/internal/newnode/newnode_test.go index 148ad38ba..5f6ede7cb 100644 --- a/internal/newnode/newnode_test.go +++ b/internal/newnode/newnode_test.go @@ -6,6 +6,7 @@ import ( "time" beaconchain "github.com/harmony-one/harmony/internal/beaconchain/libs" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" peerstore "github.com/libp2p/go-libp2p-peerstore" multiaddr "github.com/multiformats/go-multiaddr" @@ -15,7 +16,8 @@ func TestNewNode(t *testing.T) { var ip, port string ip = "127.0.0.1" port = "8088" - nnode := New(ip, port) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "8088") + nnode := New(ip, port, priKey) if nnode.PubK == nil { t.Error("new node public key not initialized") @@ -29,8 +31,12 @@ func TestBeaconChainConnect(t *testing.T) { beaconport = "8081" nodeport = "9081" - nnode := New(ip, nodeport) - bc := beaconchain.New(1, ip, beaconport) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9081") + nnode := New(ip, nodeport, priKey) + + priKey, _, _ = utils.GenKeyP2P("127.0.0.1", "8081") + bc := beaconchain.New(1, ip, beaconport, priKey) + bcma = fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", bc.Self.IP, bc.Self.Port, bc.GetID().Pretty()) go bc.StartServer() diff --git a/node/node_handler_test.go b/node/node_handler_test.go index bfe43bc2e..a5106d257 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -13,7 +13,11 @@ func TestNodeStreamHandler(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) node := New(host, consensus, nil) @@ -33,7 +37,11 @@ func TestAddNewBlock(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", PubKey: pubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "9885"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) node := New(host, consensus, nil) @@ -52,7 +60,11 @@ func TestVerifyNewBlock(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) node := New(host, consensus, nil) diff --git a/node/node_test.go b/node/node_test.go index 6b48834f4..62be0b025 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -19,7 +19,11 @@ func TestNewNode(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) node := New(host, consensus, nil) if node.Consensus == nil { @@ -39,7 +43,8 @@ func TestGetSyncingPeers(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} - host, err := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) if err != nil { t.Fatalf("newhost failure: %v", err) } @@ -86,7 +91,11 @@ func TestAddPeers(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "8982", PubKey: pubKey} validator := p2p.Peer{IP: "127.0.0.1", Port: "8985"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) node := New(host, consensus, nil) @@ -155,11 +164,15 @@ func exitServer() { os.Exit(0) } -func TestPingPongHandler(test *testing.T) { +func TestPingPongHandler(t *testing.T) { _, pubKey := utils.GenKey("127.0.0.1", "8881") leader := p2p.Peer{IP: "127.0.0.1", Port: "8881", PubKey: pubKey} // validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"} - host, _ := p2pimpl.NewHost(&leader) + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } consensus := consensus.New(host, "0", []p2p.Peer{leader}, leader) node := New(host, consensus, nil) //go sendPingMessage(leader) diff --git a/p2p/helper.go b/p2p/helper.go index 9196f2a2a..bcada959d 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -2,7 +2,6 @@ package p2p import ( "bufio" - "bytes" "encoding/binary" "io" "time" @@ -28,46 +27,39 @@ content (n bytes) - actual message content const BatchSizeInByte = 1 << 16 // ReadMessageContent reads the message type and content size, and return the actual content. -func ReadMessageContent(s Stream) ([]byte, error) { +func ReadMessageContent(s Stream) (content []byte, err error) { var ( - contentBuf = bytes.NewBuffer([]byte{}) - r = bufio.NewReader(s) + r = bufio.NewReader(s) ) timeoutDuration := 1 * time.Second - s.SetReadDeadline(time.Now().Add(timeoutDuration)) + if err = s.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil { + log.Error("cannot reset deadline for message header read", "error", err) + return + } //// Read 1 byte for message type - _, err := r.ReadByte() - switch err { - case io.EOF: - log.Error("Error reading the p2p message type field", "io.EOF", err) - return contentBuf.Bytes(), err - case nil: - //log.Printf("Received p2p message type: %x\n", msgType) - default: - log.Error("Error reading the p2p message type field", "msg", err) - return contentBuf.Bytes(), err + if _, err = r.ReadByte(); err != nil { + log.Error("failed to read p2p message type field", "error", err) + return } // TODO: check on msgType and take actions accordingly //// Read 4 bytes for message size fourBytes := make([]byte, 4) - n, err := r.Read(fourBytes) - if err != nil { - log.Error("Error reading the p2p message size field") - return contentBuf.Bytes(), err - } else if n < len(fourBytes) { - log.Error("Failed reading the p2p message size field: only read", "Num of bytes", n) - return contentBuf.Bytes(), err + if _, err = io.ReadFull(r, fourBytes); err != nil { + log.Error("failed to read p2p message size field", "error", err) + return } contentLength := int(binary.BigEndian.Uint32(fourBytes)) - tmpBuf := make([]byte, contentLength) + contentBuf := make([]byte, contentLength) timeoutDuration = 20 * time.Second - s.SetReadDeadline(time.Now().Add(timeoutDuration)) - m, err := io.ReadFull(r, tmpBuf) - if err != nil || m < contentLength { - log.Error("Read %v bytes, we need %v bytes", m, contentLength) - return []byte{}, err + if err = s.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil { + log.Error("cannot reset deadline for message content read", "error", err) + return + } + if _, err = io.ReadFull(r, contentBuf); err != nil { + log.Error("failed to read p2p message contents", "error", err) + return } - contentBuf.Write(tmpBuf) - return contentBuf.Bytes(), nil + content = contentBuf + return } diff --git a/p2p/host/hostv1/hostv1.go b/p2p/host/hostv1/hostv1.go index d8ac3e439..8ff031607 100644 --- a/p2p/host/hostv1/hostv1.go +++ b/p2p/host/hostv1/hostv1.go @@ -80,24 +80,25 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { } // SendMessage sends message to peer -func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { +func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) error { + logger := log.New("from", host.self, "to", peer, "PeerID", peer.PeerID) addr := net.JoinHostPort(peer.IP, peer.Port) conn, err := net.Dial("tcp", addr) - if err != nil { - log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) - return fmt.Errorf("Dail Failed") + logger.Warn("Dial() failed", "address", addr, "error", err) + return fmt.Errorf("Dial(%s) failed: %v", addr, err) } - defer conn.Close() + defer func() { + if err := conn.Close(); err != nil { + logger.Warn("Close() failed", "error", err) + } + }() - nw, err := conn.Write(message) - if err != nil { - log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) - return fmt.Errorf("Write Failed") - } - if nw < len(message) { - log.Warn("Write() returned short count", - "addr", conn.RemoteAddr(), "actual", nw, "expected", len(message)) + if nw, err := conn.Write(message); err != nil { + logger.Warn("Write() failed", "error", err) + return fmt.Errorf("Write() failed: %v", err) + } else if nw < len(message) { + logger.Warn("Short Write()", "actual", nw, "expected", len(message)) return io.ErrShortWrite } diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index 8f07027e5..b081c9ec6 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -3,6 +3,7 @@ package hostv2 import ( "context" "fmt" + "io" "github.com/ethereum/go-ethereum/log" "github.com/harmony-one/harmony/p2p" @@ -12,7 +13,7 @@ import ( net "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" - multiaddr "github.com/multiformats/go-multiaddr" + ma "github.com/multiformats/go-multiaddr" ) const ( @@ -44,7 +45,7 @@ func (host *HostV2) AddPeer(p *p2p.Peer) error { // reconstruct the multiaddress based on ip/port // PeerID has to be known for the ip/port addr := fmt.Sprintf("/ip4/%s/tcp/%s", p.IP, p.Port) - targetAddr, err := multiaddr.NewMultiaddr(addr) + targetAddr, err := ma.NewMultiaddr(addr) if err != nil { log.Error("AddPeer NewMultiaddr error", "error", err) return err @@ -64,23 +65,29 @@ func (host *HostV2) Peerstore() peerstore.Peerstore { } // New creates a host for p2p communication -func New(self p2p.Peer, priKey p2p_crypto.PrivKey) *HostV2 { - +func New(self *p2p.Peer, priKey p2p_crypto.PrivKey) *HostV2 { + listenAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", self.Port)) + if err != nil { + log.Error("New MA Error", "IP", self.IP, "Port", self.Port) + return nil + } // TODO (leo), use the [0] of Addrs for now, need to find a reliable way of using listenAddr p2pHost, err := libp2p.New(context.Background(), - libp2p.ListenAddrs(self.Addrs[0]), + libp2p.ListenAddrs(listenAddr), libp2p.Identity(priKey), // TODO(ricl): Other features to probe // libp2p.EnableRelay; libp2p.Routing; ) + self.PeerID = p2pHost.ID() + catchError(err) - log.Debug("HostV2 is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addr", self.Addrs) + log.Debug("HostV2 is up!", "port", self.Port, "id", p2pHost.ID().Pretty(), "addr", listenAddr) // has to save the private key for host h := &HostV2{ h: p2pHost, - self: self, + self: *self, priKey: priKey, } @@ -108,13 +115,21 @@ func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { // SendMessage a p2p message sending function with signature compatible to p2pv1. func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { + logger := log.New("from", host.self, "to", p, "PeerID", p.PeerID) s, err := host.h.NewStream(context.Background(), p.PeerID, ProtocolID) if err != nil { - log.Error("Failed to send message", "from", host.self, "to", p, "error", err, "PeerID", p.PeerID) - return err + logger.Error("NewStream() failed", "peerID", p.PeerID, + "protocolID", ProtocolID, "error", err) + return fmt.Errorf("NewStream(%v, %v) failed: %v", p.PeerID, + ProtocolID, err) + } + if nw, err := s.Write(message); err != nil { + logger.Error("Write() failed", "error", err) + return fmt.Errorf("Write() failed: %v", err) + } else if nw < len(message) { + logger.Error("Short Write()", "expected", len(message), "actual", nw) + return io.ErrShortWrite } - - s.Write(message) return nil } diff --git a/p2p/host/hostv2/util.go b/p2p/host/hostv2/util.go index 3c3307625..e72c4ce28 100644 --- a/p2p/host/hostv2/util.go +++ b/p2p/host/hostv2/util.go @@ -1,8 +1,6 @@ package hostv2 import ( - "bufio" - "github.com/ethereum/go-ethereum/log" ) @@ -12,8 +10,3 @@ func catchError(err error) { panic(err) } } - -func writeData(w *bufio.Writer, data []byte) { - w.Write(data) - w.Flush() -} diff --git a/p2p/host/message.go b/p2p/host/message.go index 26b9a3fce..c19e1375e 100644 --- a/p2p/host/message.go +++ b/p2p/host/message.go @@ -1,7 +1,6 @@ package host import ( - "bytes" "encoding/binary" "net" "runtime" @@ -52,17 +51,11 @@ func BroadcastMessageFromLeader(h p2p.Host, peers []p2p.Peer, msg []byte, lostPe // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] func ConstructP2pMessage(msgType byte, content []byte) []byte { - - firstByte := byte(17) // messageType 0x11 - sizeBytes := make([]byte, 4) // contentSize - - binary.BigEndian.PutUint32(sizeBytes, uint32(len(content))) - - byteBuffer := bytes.NewBuffer([]byte{}) - byteBuffer.WriteByte(firstByte) - byteBuffer.Write(sizeBytes) - byteBuffer.Write(content) - return byteBuffer.Bytes() + message := make([]byte, 5+len(content)) + message[0] = 17 // messageType 0x11 + binary.BigEndian.PutUint32(message[1:5], uint32(len(content))) + copy(message[5:], content) + return message } // BroadcastMessageFromValidator sends the message to a list of peers from a validator. @@ -93,9 +86,7 @@ func send(h p2p.Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) { backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2) for trial := 0; trial < 10; trial++ { - var err error - err = h.SendMessage(peer, message) - if err == nil { + if err := h.SendMessage(peer, message); err == nil { if trial > 0 { log.Warn("retry send", "rety", trial) } diff --git a/p2p/host/message_test.go b/p2p/host/message_test.go index 694d59cb9..2290bfb9e 100644 --- a/p2p/host/message_test.go +++ b/p2p/host/message_test.go @@ -21,7 +21,7 @@ func TestSendMessage(test *testing.T) { priKey1, pubKey1, _ := utils.GenKeyP2P(peer1.IP, peer1.Port) peerID1, _ := peer.IDFromPublicKey(pubKey1) peer1.PeerID = peerID1 - host1 := hostv2.New(peer1, priKey1) + host1 := hostv2.New(&peer1, priKey1) peer2 := p2p.Peer{IP: "127.0.0.1", Port: "9001"} selfAddr2, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer2.Port)) @@ -29,10 +29,12 @@ func TestSendMessage(test *testing.T) { priKey2, pubKey2, _ := utils.GenKeyP2P(peer2.IP, peer2.Port) peerID2, _ := peer.IDFromPublicKey(pubKey2) peer2.PeerID = peerID2 - host2 := hostv2.New(peer2, priKey2) + host2 := hostv2.New(&peer2, priKey2) msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04} - host1.AddPeer(&peer2) + if err := host1.AddPeer(&peer2); err != nil { + test.Fatalf("cannot add peer2 to host1: %v", err) + } go host2.BindHandlerAndServe(handler) SendMessage(host1, peer2, msg, nil) @@ -40,7 +42,11 @@ func TestSendMessage(test *testing.T) { } func handler(s p2p.Stream) { - defer s.Close() + defer func() { + if err := s.Close(); err != nil { + panic(fmt.Sprintf("Close(%v) failed: %v", s, err)) + } + }() content, err := p2p.ReadMessageContent(s) if err != nil { panic("Read p2p data failed") diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index 2155523a6..c6e023ab5 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -5,10 +5,11 @@ package mock import ( + reflect "reflect" + gomock "github.com/golang/mock/gomock" p2p "github.com/harmony-one/harmony/p2p" peer "github.com/libp2p/go-libp2p-peer" - reflect "reflect" ) // MockHost is a mock of Host interface diff --git a/p2p/mock_stream.go b/p2p/mock_stream.go index 6046d6fd5..27ce7fac5 100644 --- a/p2p/mock_stream.go +++ b/p2p/mock_stream.go @@ -5,9 +5,10 @@ package p2p import ( - gomock "github.com/golang/mock/gomock" reflect "reflect" time "time" + + gomock "github.com/golang/mock/gomock" ) // MockStream is a mock of Stream interface diff --git a/p2p/p2pimpl/p2pimpl.go b/p2p/p2pimpl/p2pimpl.go index 5c589c86a..ec6cc4b7f 100644 --- a/p2p/p2pimpl/p2pimpl.go +++ b/p2p/p2pimpl/p2pimpl.go @@ -1,7 +1,6 @@ package p2pimpl import ( - "fmt" "net" "github.com/harmony-one/harmony/p2p" @@ -9,8 +8,7 @@ import ( "github.com/harmony-one/harmony/p2p/host/hostv2" "github.com/harmony-one/harmony/internal/utils" - peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" + p2p_crypto "github.com/libp2p/go-libp2p-crypto" ) // Version The version number of p2p library @@ -22,33 +20,13 @@ const Version = 2 // for hostv2, it generates multiaddress, keypair and add PeerID to peer, add priKey to host // TODO (leo) the PriKey of the host has to be persistent in disk, so that we don't need to regenerate it // on the same host if the node software restarted. The peerstore has to be persistent as well. -func NewHost(self *p2p.Peer) (p2p.Host, error) { +func NewHost(self *p2p.Peer, key p2p_crypto.PrivKey) (p2p.Host, error) { if Version == 1 { h := hostv1.New(self) return h, nil } - selfAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", self.Port)) - if err != nil { - return nil, err - } - self.Addrs = append(self.Addrs, selfAddr) - - // TODO (leo), change to GenKeyP2PRand() to generate random key. Right now, the key is predictable as the - // seed is fixed. - priKey, pubKey, err := utils.GenKeyP2P(self.IP, self.Port) - if err != nil { - return nil, err - } - - peerID, err := peer.IDFromPublicKey(pubKey) - - if err != nil { - return nil, err - } - - self.PeerID = peerID - h := hostv2.New(*self, priKey) + h := hostv2.New(self, key) utils.GetLogInstance().Info("NewHost", "self", net.JoinHostPort(self.IP, self.Port), "PeerID", self.PeerID)