[cleanup] remove libp2p unicast support

fix test cases after remove libp2p unicast

Signed-off-by: Leo Chen <leo@harmony.one>
pull/506/head
Leo Chen 6 years ago
parent 6c8f0f4b02
commit f3ffc65401
  1. 3
      api/beaconchain/beaconchain.go
  2. 256
      api/beaconchain/beaconchain.pb.go
  3. 23
      api/beaconchain/beaconchain.proto
  4. 70
      cmd/client/txgen/main.go
  5. 18
      cmd/client/wallet/lib/lib.go
  6. 1
      cmd/client/wallet/lib/lib_test.go
  7. 12
      cmd/harmony.go
  8. 9
      consensus/consensus.go
  9. 12
      consensus/consensus_leader.go
  10. 5
      consensus/consensus_leader_test.go
  11. 8
      consensus/consensus_validator.go
  12. 6
      consensus/consensus_validator_test.go
  13. 4
      drand/drand_leader.go
  14. 4
      drand/drand_validator.go
  15. 3
      internal/beaconchain/README.md
  16. 200
      internal/beaconchain/libs/beaconchain.go
  17. 59
      internal/beaconchain/libs/beaconchain_handler.go
  18. 150
      internal/beaconchain/libs/beaconchain_test.go
  19. 51
      internal/beaconchain/rpc/client.go
  20. 51
      internal/beaconchain/rpc/server.go
  21. 1
      internal/newnode/README.md
  22. 170
      internal/newnode/newnode.go
  23. 63
      internal/newnode/newnode_handler.go
  24. 65
      internal/newnode/newnode_test.go
  25. 4
      internal/utils/singleton.go
  26. 43
      node/node.go
  27. 56
      node/node_handler.go
  28. 25
      node/node_handler_test.go
  29. 16
      node/node_test.go
  30. 11
      node/p2p.go
  31. 2
      p2p/host.go
  32. 43
      p2p/host/hostv2/hostv2.go
  33. 99
      p2p/host/message.go
  34. 59
      p2p/host/message_test.go
  35. 26
      p2p/host/mock/host_mock.go
  36. 2
      scripts/go_executable_build.sh
  37. 28
      test/deploy.sh

@ -1,3 +0,0 @@
package beaconchain
//go:generate protoc beaconchain.proto --go_out=plugins=grpc:.

@ -1,256 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: beaconchain.proto
package beaconchain
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// FetchLeadersRequest is the request to fetch the current leaders.
type FetchLeadersRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FetchLeadersRequest) Reset() { *m = FetchLeadersRequest{} }
func (m *FetchLeadersRequest) String() string { return proto.CompactTextString(m) }
func (*FetchLeadersRequest) ProtoMessage() {}
func (*FetchLeadersRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_474fd8061d1037cf, []int{0}
}
func (m *FetchLeadersRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FetchLeadersRequest.Unmarshal(m, b)
}
func (m *FetchLeadersRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FetchLeadersRequest.Marshal(b, m, deterministic)
}
func (m *FetchLeadersRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_FetchLeadersRequest.Merge(m, src)
}
func (m *FetchLeadersRequest) XXX_Size() int {
return xxx_messageInfo_FetchLeadersRequest.Size(m)
}
func (m *FetchLeadersRequest) XXX_DiscardUnknown() {
xxx_messageInfo_FetchLeadersRequest.DiscardUnknown(m)
}
var xxx_messageInfo_FetchLeadersRequest proto.InternalMessageInfo
// FetchLeadersResponse is the response of FetchLeadersRequest.
type FetchLeadersResponse struct {
Leaders []*FetchLeadersResponse_Leader `protobuf:"bytes,1,rep,name=leaders,proto3" json:"leaders,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FetchLeadersResponse) Reset() { *m = FetchLeadersResponse{} }
func (m *FetchLeadersResponse) String() string { return proto.CompactTextString(m) }
func (*FetchLeadersResponse) ProtoMessage() {}
func (*FetchLeadersResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_474fd8061d1037cf, []int{1}
}
func (m *FetchLeadersResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FetchLeadersResponse.Unmarshal(m, b)
}
func (m *FetchLeadersResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FetchLeadersResponse.Marshal(b, m, deterministic)
}
func (m *FetchLeadersResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_FetchLeadersResponse.Merge(m, src)
}
func (m *FetchLeadersResponse) XXX_Size() int {
return xxx_messageInfo_FetchLeadersResponse.Size(m)
}
func (m *FetchLeadersResponse) XXX_DiscardUnknown() {
xxx_messageInfo_FetchLeadersResponse.DiscardUnknown(m)
}
var xxx_messageInfo_FetchLeadersResponse proto.InternalMessageInfo
func (m *FetchLeadersResponse) GetLeaders() []*FetchLeadersResponse_Leader {
if m != nil {
return m.Leaders
}
return nil
}
type FetchLeadersResponse_Leader struct {
Ip string `protobuf:"bytes,1,opt,name=ip,proto3" json:"ip,omitempty"`
Port string `protobuf:"bytes,2,opt,name=port,proto3" json:"port,omitempty"`
ShardId uint32 `protobuf:"varint,3,opt,name=shardId,proto3" json:"shardId,omitempty"`
PeerID string `protobuf:"bytes,4,opt,name=peerID,proto3" json:"peerID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FetchLeadersResponse_Leader) Reset() { *m = FetchLeadersResponse_Leader{} }
func (m *FetchLeadersResponse_Leader) String() string { return proto.CompactTextString(m) }
func (*FetchLeadersResponse_Leader) ProtoMessage() {}
func (*FetchLeadersResponse_Leader) Descriptor() ([]byte, []int) {
return fileDescriptor_474fd8061d1037cf, []int{1, 0}
}
func (m *FetchLeadersResponse_Leader) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FetchLeadersResponse_Leader.Unmarshal(m, b)
}
func (m *FetchLeadersResponse_Leader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FetchLeadersResponse_Leader.Marshal(b, m, deterministic)
}
func (m *FetchLeadersResponse_Leader) XXX_Merge(src proto.Message) {
xxx_messageInfo_FetchLeadersResponse_Leader.Merge(m, src)
}
func (m *FetchLeadersResponse_Leader) XXX_Size() int {
return xxx_messageInfo_FetchLeadersResponse_Leader.Size(m)
}
func (m *FetchLeadersResponse_Leader) XXX_DiscardUnknown() {
xxx_messageInfo_FetchLeadersResponse_Leader.DiscardUnknown(m)
}
var xxx_messageInfo_FetchLeadersResponse_Leader proto.InternalMessageInfo
func (m *FetchLeadersResponse_Leader) GetIp() string {
if m != nil {
return m.Ip
}
return ""
}
func (m *FetchLeadersResponse_Leader) GetPort() string {
if m != nil {
return m.Port
}
return ""
}
func (m *FetchLeadersResponse_Leader) GetShardId() uint32 {
if m != nil {
return m.ShardId
}
return 0
}
func (m *FetchLeadersResponse_Leader) GetPeerID() string {
if m != nil {
return m.PeerID
}
return ""
}
func init() {
proto.RegisterType((*FetchLeadersRequest)(nil), "beaconchain.FetchLeadersRequest")
proto.RegisterType((*FetchLeadersResponse)(nil), "beaconchain.FetchLeadersResponse")
proto.RegisterType((*FetchLeadersResponse_Leader)(nil), "beaconchain.FetchLeadersResponse.Leader")
}
func init() { proto.RegisterFile("beaconchain.proto", fileDescriptor_474fd8061d1037cf) }
var fileDescriptor_474fd8061d1037cf = []byte{
// 222 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xcd, 0x4a, 0xc4, 0x30,
0x14, 0x85, 0x4d, 0x67, 0xe8, 0xe0, 0x1d, 0x15, 0xbc, 0xfe, 0x10, 0x66, 0x15, 0xbb, 0xca, 0xaa,
0x8b, 0xf1, 0x0d, 0xaa, 0x08, 0x05, 0x57, 0x11, 0xb7, 0x42, 0x9a, 0x5e, 0x68, 0x50, 0x9a, 0x98,
0x44, 0x1f, 0xce, 0xa7, 0x13, 0x53, 0x0b, 0x15, 0x44, 0x77, 0x39, 0x1f, 0x39, 0xe4, 0xcb, 0x81,
0xd3, 0x8e, 0xb4, 0x71, 0xa3, 0x19, 0xb4, 0x1d, 0x6b, 0x1f, 0x5c, 0x72, 0xb8, 0x5d, 0xa0, 0xea,
0x02, 0xce, 0xee, 0x28, 0x99, 0xe1, 0x9e, 0x74, 0x4f, 0x21, 0x2a, 0x7a, 0x7d, 0xa3, 0x98, 0xaa,
0x0f, 0x06, 0xe7, 0x3f, 0x79, 0xf4, 0x6e, 0x8c, 0x84, 0x0d, 0x6c, 0x5e, 0x26, 0xc4, 0x99, 0x58,
0xc9, 0xed, 0x5e, 0xd6, 0xcb, 0x17, 0x7e, 0xeb, 0xd4, 0x53, 0x56, 0x73, 0x71, 0xf7, 0x04, 0xe5,
0x84, 0xf0, 0x04, 0x0a, 0xeb, 0x39, 0x13, 0x4c, 0x1e, 0xaa, 0xc2, 0x7a, 0x44, 0x58, 0x7b, 0x17,
0x12, 0x2f, 0x32, 0xc9, 0x67, 0xe4, 0xb0, 0x89, 0x83, 0x0e, 0x7d, 0xdb, 0xf3, 0x95, 0x60, 0xf2,
0x58, 0xcd, 0x11, 0x2f, 0xa1, 0xf4, 0x44, 0xa1, 0xbd, 0xe5, 0xeb, 0x7c, 0xff, 0x3b, 0xed, 0x9f,
0x01, 0x9b, 0xec, 0x74, 0xf3, 0xe5, 0xf4, 0x40, 0xe1, 0xdd, 0x1a, 0xc2, 0x47, 0x38, 0x5a, 0xda,
0xa1, 0xf8, 0x43, 0x3c, 0x8f, 0xb0, 0xbb, 0xfa, 0xf7, 0x6b, 0xd5, 0x41, 0x57, 0xe6, 0x51, 0xaf,
0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x35, 0x50, 0x26, 0x86, 0x69, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// BeaconChainServiceClient is the client API for BeaconChainService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type BeaconChainServiceClient interface {
FetchLeaders(ctx context.Context, in *FetchLeadersRequest, opts ...grpc.CallOption) (*FetchLeadersResponse, error)
}
type beaconChainServiceClient struct {
cc *grpc.ClientConn
}
func NewBeaconChainServiceClient(cc *grpc.ClientConn) BeaconChainServiceClient {
return &beaconChainServiceClient{cc}
}
func (c *beaconChainServiceClient) FetchLeaders(ctx context.Context, in *FetchLeadersRequest, opts ...grpc.CallOption) (*FetchLeadersResponse, error) {
out := new(FetchLeadersResponse)
err := c.cc.Invoke(ctx, "/beaconchain.BeaconChainService/FetchLeaders", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BeaconChainServiceServer is the server API for BeaconChainService service.
type BeaconChainServiceServer interface {
FetchLeaders(context.Context, *FetchLeadersRequest) (*FetchLeadersResponse, error)
}
func RegisterBeaconChainServiceServer(s *grpc.Server, srv BeaconChainServiceServer) {
s.RegisterService(&_BeaconChainService_serviceDesc, srv)
}
func _BeaconChainService_FetchLeaders_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FetchLeadersRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BeaconChainServiceServer).FetchLeaders(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/beaconchain.BeaconChainService/FetchLeaders",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BeaconChainServiceServer).FetchLeaders(ctx, req.(*FetchLeadersRequest))
}
return interceptor(ctx, in, info, handler)
}
var _BeaconChainService_serviceDesc = grpc.ServiceDesc{
ServiceName: "beaconchain.BeaconChainService",
HandlerType: (*BeaconChainServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "FetchLeaders",
Handler: _BeaconChainService_FetchLeaders_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "beaconchain.proto",
}

@ -1,23 +0,0 @@
syntax = "proto3";
package beaconchain;
// BeaconChainService is the service used for any beacon chain requests.
service BeaconChainService {
rpc FetchLeaders(FetchLeadersRequest) returns (FetchLeadersResponse) {}
}
// FetchLeadersRequest is the request to fetch the current leaders.
message FetchLeadersRequest {
}
// FetchLeadersResponse is the response of FetchLeadersRequest.
message FetchLeadersResponse {
message Leader {
string ip = 1;
string port = 2;
uint32 shardId = 3;
string peerID = 4;
}
repeated Leader leaders = 1;
}

@ -15,14 +15,11 @@ import (
"github.com/harmony-one/harmony/cmd/client/txgen/txgen"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/newnode"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr"
)
var (
@ -50,18 +47,10 @@ func main() {
versionFlag := flag.Bool("version", false, "Output version info")
crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.")
bcIP := flag.String("bc", "127.0.0.1", "IP of the identity chain")
bcPort := flag.String("bc_port", "8081", "port of the identity chain")
bcAddr := flag.String("bc_addr", "", "MultiAddr of the identity chain")
// Key file to store the private key
keyFile := flag.String("key", "./.txgenkey", "the private key file of the txgen")
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress")
// LibP2P peer discovery integration test
libp2pPD := flag.Bool("libp2p_pd", false, "enable libp2p based peer discovery")
flag.Parse()
if *versionFlag {
@ -95,41 +84,9 @@ func main() {
selfPeer := p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey}
if !*libp2pPD {
var bcPeer *p2p.Peer
if *bcAddr != "" {
// Turn the destination into a multiaddr.
maddr, err := multiaddr.NewMultiaddr(*bcAddr)
if err != nil {
panic(err)
}
// Extract the peer ID from the multiaddr.
info, err := peerstore.InfoFromP2pAddr(maddr)
if err != nil {
panic(err)
}
bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID}
} else {
bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort}
}
candidateNode := newnode.New(*ip, *port, nodePriKey)
candidateNode.AddPeer(bcPeer)
candidateNode.ContactBeaconChain(*bcPeer)
selfPeer := candidateNode.GetSelfPeer()
selfPeer.PubKey = candidateNode.PubK
shardIDLeaderMap = candidateNode.Leaders
debugPrintShardIDLeaderMap(shardIDLeaderMap)
} else {
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
shardIDLeaderMap = make(map[uint32]p2p.Peer)
shardIDLeaderMap[0] = p2p.Peer{}
utils.UseLibP2P = true
}
// Do cross shard tx if there are more than one shard
setting := txgen.Settings{
@ -195,27 +152,14 @@ func main() {
}
clientNode.Client.UpdateBlocks = updateBlocksFunc
for _, leader := range shardIDLeaderMap {
if !*libp2pPD {
clientNode.GetHost().AddPeer(&leader)
utils.GetLogInstance().Debug("Client Join Shard", "leader", leader)
go clientNode.JoinShard(leader)
}
clientNode.State = node.NodeReadyForConsensus
}
if *libp2pPD {
clientNode.Role = node.ClientNode
clientNode.ServiceManagerSetup()
clientNode.RunServices()
go clientNode.StartServer()
} else {
// Start the client server to listen to leader's message
go clientNode.StartServer()
// wait for 1 seconds for client to send ping message to leader
time.Sleep(time.Second)
clientNode.StopPing <- struct{}{}
}
clientNode.State = node.NodeReadyForConsensus
// Transaction generation process
@ -258,23 +202,17 @@ func main() {
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
if utils.UseLibP2P {
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg)
}
time.Sleep(3000 * time.Millisecond)
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, p2p_host.ConstructP2pMessage(byte(0), msg))
time.Sleep(3 * time.Second)
}
// SendTxsToLeader sends txs to leader account.
func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) {
utils.GetLogInstance().Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessageAccount(txs)
if utils.UseLibP2P {
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientNode.SendMessage(leader, msg)
}
}
func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) {

@ -2,28 +2,21 @@ package lib
import (
"fmt"
"strconv"
"time"
"github.com/harmony-one/harmony/api/client"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core/types"
libs "github.com/harmony-one/harmony/internal/beaconchain/libs"
beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
peer "github.com/libp2p/go-libp2p-peer"
)
// CreateWalletNode creates wallet server node.
func CreateWalletNode() *node.Node {
shardIDLeaderMap := make(map[uint32]p2p.Peer)
port, _ := strconv.Atoi("9999")
bcClient := beaconchain.NewClient("54.183.5.66", strconv.Itoa(port+libs.BeaconchainServicePortDiff))
response := bcClient.GetLeaders()
// port, _ := strconv.Atoi("9999")
// dummy host for wallet
self := p2p.Peer{IP: "127.0.0.1", Port: "6789"}
@ -33,6 +26,7 @@ func CreateWalletNode() *node.Node {
panic(err)
}
/*
for _, leader := range response.Leaders {
peerID, err := peer.IDB58Decode(leader.PeerID)
if err != nil {
@ -42,7 +36,7 @@ func CreateWalletNode() *node.Node {
shardIDLeaderMap[leader.ShardId] = leaderPeer
host.AddPeer(&leaderPeer)
}
*/
walletNode := node.New(host, nil, nil)
walletNode.Client = client.NewClient(walletNode.GetHost(), shardIDLeaderMap)
return walletNode
@ -50,9 +44,9 @@ func CreateWalletNode() *node.Node {
// SubmitTransaction submits the transaction to the Harmony network
func SubmitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error {
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
leader := walletNode.Client.Leaders[shardID]
walletNode.SendMessage(leader, msg)
// msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx})
// leader := walletNode.Client.Leaders[shardID]
// walletNode.SendMessage(leader, msg)
fmt.Printf("Transaction Id for shard %d: %s\n", int(shardID), tx.Hash().Hex())
time.Sleep(300 * time.Millisecond)
return nil

@ -29,7 +29,6 @@ func TestSubmitTransaction(test *testing.T) {
m := mock_host.NewMockHost(ctrl)
m.EXPECT().GetSelfPeer().AnyTimes()
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1)
walletNode := node.New(m, nil, nil)
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9990")

@ -101,9 +101,6 @@ func main() {
keyFile := flag.String("key", "./.hmykey", "the private key file of the harmony node")
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress")
// LibP2P peer discovery integration test
libp2pPD := flag.Bool("libp2p_pd", false, "enable libp2p based peer discovery")
// isBeacon indicates this node is a beacon chain node
isBeacon := flag.Bool("is_beacon", false, "true means this node is a beacon chain node")
@ -165,7 +162,6 @@ func main() {
} else {
role = "validator"
}
utils.UseLibP2P = true
// Init logging.
loggingInit(*logFolder, role, *ip, *port, *onlyLogTps)
@ -250,17 +246,9 @@ func main() {
consensus.OnConsensusDone = currentNode.PostConsensusProcessing
currentNode.State = node.NodeWaitToJoin
if !*libp2pPD {
if consensus.IsLeader {
currentNode.State = node.NodeLeader
} else {
go currentNode.JoinShard(leader)
}
} else {
if consensus.IsLeader {
go currentNode.SendPongMessage()
}
}
go currentNode.SupportSyncing()
currentNode.ServiceManagerSetup()

@ -472,11 +472,7 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.leader.PubKey)
buffer := pong.ConstructPongMessage()
if utils.UseLibP2P {
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), buffer))
} else {
host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers)
}
}
return count2
@ -637,11 +633,6 @@ func (consensus *Consensus) GetPeerFromID(peerID uint32) (p2p.Peer, bool) {
return value, true
}
// SendMessage sends message thru p2p host to peer.
func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) {
host.SendMessage(consensus.host, peer, message, nil)
}
// Populates the common basic fields for all consensus message.
func (consensus *Consensus) populateMessageFields(request *msg_pb.ConsensusRequest) {
// TODO(minhdoan): Maybe look into refactor this.

@ -146,13 +146,9 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) {
// Leader sign the block hash itself
consensus.prepareSigs[consensus.nodeID] = consensus.priKey.SignHash(consensus.blockHash[:])
if utils.UseLibP2P {
// Construct broadcast p2p message
utils.GetLogInstance().Warn("[Consensus]", "sent announce message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
}
}
// processPrepareMessage processes the prepare message sent from validators
@ -217,12 +213,8 @@ func (consensus *Consensus) processPrepareMessage(message *msg_pb.Message) {
msgToSend, aggSig := consensus.constructPreparedMessage()
consensus.aggregatedPrepareSig = aggSig
if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent prepared message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
}
// Set state to targetState
consensus.state = targetState
@ -296,12 +288,8 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) {
msgToSend, aggSig := consensus.constructCommittedMessage()
consensus.aggregatedCommitSig = aggSig
if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
}
var blockObj types.Block
err := rlp.DecodeBytes(consensus.block, &blockObj)

@ -45,7 +45,7 @@ func TestProcessMessageLeaderPrepare(test *testing.T) {
// Asserts that the first and only call to Bar() is passed 99.
// Anything else will fail.
m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(3)
m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any())
consensusLeader := New(m, "0", validators, leader)
consensusLeader.blockHash = blockHash
@ -91,7 +91,6 @@ func TestProcessMessageLeaderPrepareInvalidSignature(test *testing.T) {
// Asserts that the first and only call to Bar() is passed 99.
// Anything else will fail.
m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(0)
consensusLeader := New(m, "0", validators, leader)
consensusLeader.blockHash = blockHash
@ -146,7 +145,7 @@ func TestProcessMessageLeaderCommit(test *testing.T) {
// Asserts that the first and only call to Bar() is passed 99.
// Anything else will fail.
m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(3)
m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any())
for i := 0; i < 3; i++ {
priKey, _, _ := utils.GenKeyP2P(validators[i].IP, validators[i].Port)

@ -122,12 +122,8 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) {
// Construct and send prepare message
msgToSend := consensus.constructPrepareMessage()
if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent prepare message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
consensus.SendMessage(consensus.leader, msgToSend)
}
consensus.state = PrepareDone
}
@ -189,12 +185,8 @@ func (consensus *Consensus) processPreparedMessage(message *msg_pb.Message) {
// Construct and send the commit message
multiSigAndBitmap := append(multiSig, bitmap...)
msgToSend := consensus.constructCommitMessage(multiSigAndBitmap)
if utils.UseLibP2P {
utils.GetLogInstance().Warn("[Consensus]", "sent commit message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
consensus.SendMessage(consensus.leader, msgToSend)
}
consensus.state = CommitDone
}

@ -36,7 +36,7 @@ func TestProcessMessageValidatorAnnounce(test *testing.T) {
// Asserts that the first and only call to Bar() is passed 99.
// Anything else will fail.
m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1)
m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any())
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
@ -92,7 +92,7 @@ func TestProcessMessageValidatorPrepared(test *testing.T) {
// Asserts that the first and only call to Bar() is passed 99.
// Anything else will fail.
m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2)
m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()).Times(2)
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
@ -162,7 +162,7 @@ func TestProcessMessageValidatorCommitted(test *testing.T) {
// Asserts that the first and only call to Bar() is passed 99.
// Anything else will fail.
m.EXPECT().GetSelfPeer().Return(leader)
m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2)
m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()).Times(2)
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)

@ -74,12 +74,8 @@ func (dRand *DRand) init(epochBlock *types.Block) {
(*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...)
if utils.UseLibP2P {
utils.GetLogInstance().Info("[DRG] sent init", "msg", msgToSend, "leader.PubKey", dRand.leader.PubKey)
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil)
}
}
// ProcessMessageLeader dispatches messages for the leader to corresponding processors.

@ -52,9 +52,5 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) {
msgToSend := dRand.constructCommitMessage(rand, proof)
// Send the commit message back to leader
if utils.UseLibP2P {
dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
host.SendMessage(dRand.host, dRand.leader, msgToSend, nil)
}
}

@ -1,3 +0,0 @@
The beaconchain package currently is a centralized service that allocates every potential new node (uses newnode package) a specific shard.
If N is the number of shards, supplied as a parameter at bootup, then first N joining nodes are assigned to be the leaders of those N shards. The nodes that come after that are then assigned shards based on their order of entry.
In the future, the generation of randomness would be decentralized. Such randomness would be provided to a new node once its PoS has been verified and then the node would be able to calculate its own shard automatically.

@ -1,200 +0,0 @@
package beaconchain
import (
"math/rand"
"os"
"strconv"
"sync"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto/bcconn"
proto_identity "github.com/harmony-one/harmony/api/proto/identity"
"github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/crypto/pki"
beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
)
//BCState keeps track of the state the beaconchain is in
type BCState int
var mutex sync.Mutex
var identityPerBlock = 100000
// BeaconchainServicePortDiff is the positive port diff from beacon chain's self port
const BeaconchainServicePortDiff = 4444
//BCInfo is the information that needs to be stored on the disk in order to allow for a restart.
type BCInfo struct {
Leaders []*node.Info `json:"leaders"`
ShardLeaderMap map[int]*node.Info `json:"shardLeaderMap"`
NumberOfShards int `json:"numShards"`
NumberOfNodesAdded int `json:"numNodesAdded"`
IP string `json:"ip"`
Port string `json:"port"`
}
// BeaconChain (Blockchain) keeps Identities per epoch, currently centralized!
type BeaconChain struct {
BCInfo BCInfo
ShardLeaderMap map[int]*node.Info
PubKey *bls.PublicKey
host p2p.Host
state BCState
rpcServer *beaconchain.Server
Peer p2p.Peer
Self p2p.Peer // self Peer
}
//SaveFile is to store the file in which beaconchain info will be stored.
var SaveFile string
// Followings are the set of states of that beaconchain can be in.
const (
NodeInfoReceived BCState = iota
RandomInfoSent
)
// SupportRPC initializes and starts the rpc service
func (bc *BeaconChain) SupportRPC() {
bc.InitRPCServer()
bc.StartRPCServer()
}
// InitRPCServer initializes Rpc server.
func (bc *BeaconChain) InitRPCServer() {
bc.rpcServer = beaconchain.NewServer(bc.GetShardLeaderMap)
}
// StartRPCServer starts Rpc server.
func (bc *BeaconChain) StartRPCServer() {
port, err := strconv.Atoi(bc.BCInfo.Port)
if err != nil {
port = 0
}
utils.GetLogInstance().Info("support_client: StartRpcServer on port:", "port", strconv.Itoa(port+BeaconchainServicePortDiff))
bc.rpcServer.Start(bc.BCInfo.IP, strconv.Itoa(port+BeaconchainServicePortDiff))
}
// GetShardLeaderMap returns the map from shard id to leader.
func (bc *BeaconChain) GetShardLeaderMap() map[int]*node.Info {
result := make(map[int]*node.Info)
for i, leader := range bc.BCInfo.Leaders {
result[i] = leader
}
return result
}
//New beaconchain initialization
func New(numShards int, ip, port string, key p2p_crypto.PrivKey) *BeaconChain {
bc := BeaconChain{}
bc.PubKey = generateBCKey()
bc.Self = p2p.Peer{IP: ip, Port: port}
bc.host, _ = p2pimpl.NewHost(&bc.Self, key)
bcinfo := &BCInfo{NumberOfShards: numShards, NumberOfNodesAdded: 0,
IP: ip,
Port: port,
ShardLeaderMap: make(map[int]*node.Info)}
bc.BCInfo = *bcinfo
return &bc
}
func generateBCKey() *bls.PublicKey {
r := rand.Intn(1000)
priKey := pki.GetBLSPrivateKeyFromInt(r)
pubkey := priKey.GetPublicKey()
return pubkey
}
//AcceptNodeInfo deserializes node information received via beaconchain handler
func (bc *BeaconChain) AcceptNodeInfo(b []byte) *node.Info {
Node := bcconn.DeserializeNodeInfo(b)
utils.GetLogInstance().Info("New Node Connection", "IP", Node.IP, "Port", Node.Port, "PeerID", Node.PeerID)
bc.Peer = p2p.Peer{IP: Node.IP, Port: Node.Port, PeerID: Node.PeerID}
bc.host.AddPeer(&bc.Peer)
bc.BCInfo.NumberOfNodesAdded = bc.BCInfo.NumberOfNodesAdded + 1
shardNum, isLeader := utils.AllocateShard(bc.BCInfo.NumberOfNodesAdded, bc.BCInfo.NumberOfShards)
if isLeader {
bc.BCInfo.Leaders = append(bc.BCInfo.Leaders, Node)
bc.BCInfo.ShardLeaderMap[shardNum] = Node
}
go SaveBeaconChainInfo(SaveFile, bc)
bc.state = NodeInfoReceived
return Node
}
//RespondRandomness sends a randomness beacon to the node inorder for it process what shard it will be in
func (bc *BeaconChain) RespondRandomness(Node *node.Info) {
bci := bc.BCInfo
response := bcconn.ResponseRandomNumber{NumberOfShards: bci.NumberOfShards, NumberOfNodesAdded: bci.NumberOfNodesAdded, Leaders: bci.Leaders}
msg := bcconn.SerializeRandomInfo(response)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg)
utils.GetLogInstance().Info("Sent Out Msg", "# Nodes", response.NumberOfNodesAdded)
for i, n := range response.Leaders {
utils.GetLogInstance().Info("Sent Out Msg", "leader", i, "nodeInfo", n.PeerID)
}
host.SendMessage(bc.host, bc.Peer, msgToSend, nil)
bc.state = RandomInfoSent
}
//AcceptConnections welcomes new connections
func (bc *BeaconChain) AcceptConnections(b []byte) {
node := bc.AcceptNodeInfo(b)
bc.RespondRandomness(node)
}
//StartServer a server and process the request by a handler.
func (bc *BeaconChain) StartServer() {
bc.host.BindHandlerAndServe(bc.BeaconChainHandler)
}
//SaveBeaconChainInfo to disk
func SaveBeaconChainInfo(filePath string, bc *BeaconChain) error {
bci := BCtoBCI(bc)
err := utils.Save(filePath, bci)
return err
}
//LoadBeaconChainInfo from disk
func LoadBeaconChainInfo(path string) (*BeaconChain, error) {
bci := &BCInfo{}
var err error
if _, err := os.Stat(path); err != nil {
return nil, err
}
err = utils.Load(path, bci)
var bc *BeaconChain
if err != nil {
return nil, err
}
bc = BCItoBC(bci)
return bc, err
}
// BCtoBCI converts beaconchain into beaconchaininfo
func BCtoBCI(bc *BeaconChain) *BCInfo {
bci := &BCInfo{Leaders: bc.BCInfo.Leaders, ShardLeaderMap: bc.BCInfo.ShardLeaderMap, NumberOfShards: bc.BCInfo.NumberOfShards, NumberOfNodesAdded: bc.BCInfo.NumberOfNodesAdded, IP: bc.BCInfo.IP, Port: bc.BCInfo.Port}
return bci
}
//BCItoBC converts beconchaininfo to beaconchain
func BCItoBC(bci *BCInfo) *BeaconChain {
bc := &BeaconChain{BCInfo: *bci}
return bc
}
//SetSaveFile sets the filepath where beaconchain will be saved
func SetSaveFile(path string) {
SaveFile = path
}
//GetID return ID
func (bc *BeaconChain) GetID() peer.ID {
return bc.host.GetID()
}

@ -1,59 +0,0 @@
package beaconchain
import (
"github.com/harmony-one/harmony/api/proto"
proto_identity "github.com/harmony-one/harmony/api/proto/identity"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
// BeaconChainHandler handles registration of new Identities
func (bc *BeaconChain) BeaconChainHandler(s p2p.Stream) {
content, err := p2p.ReadMessageContent(s)
if err != nil {
utils.GetLogInstance().Error("Read p2p data failed")
return
}
msgCategory, err := proto.GetMessageCategory(content)
if err != nil {
utils.GetLogInstance().Error("Read message category failed", "err", err)
return
}
msgType, err := proto.GetMessageType(content)
if err != nil {
utils.GetLogInstance().Error("Read action type failed")
return
}
msgPayload, err := proto.GetMessagePayload(content)
if err != nil {
utils.GetLogInstance().Error("Read message payload failed")
return
}
identityMsgPayload, err := proto_identity.GetIdentityMessagePayload(msgPayload)
if err != nil {
utils.GetLogInstance().Error("Read message payload failed")
return
}
switch msgCategory {
case proto.Identity:
actionType := proto_identity.IDMessageType(msgType)
switch actionType {
case proto_identity.Identity:
utils.GetLogInstance().Info("Message category is of the type identity protocol, which is correct!")
idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload)
if err != nil {
utils.GetLogInstance().Error("Error finding the identity message type")
}
switch idMsgType {
case proto_identity.Register:
utils.GetLogInstance().Info("Identity Message Type is of the type Register")
bc.AcceptConnections(identityMsgPayload)
default:
utils.GetLogInstance().Error("Unrecognized identity message type", "type", idMsgType)
}
default:
utils.GetLogInstance().Error("Unrecognized message category", "actionType", actionType)
}
}
}

@ -1,150 +0,0 @@
package beaconchain
import (
"log"
"os"
"reflect"
"strconv"
"testing"
"github.com/harmony-one/harmony/api/proto/bcconn"
"github.com/harmony-one/harmony/api/proto/node"
beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc"
"github.com/harmony-one/harmony/internal/utils"
"github.com/stretchr/testify/assert"
)
var (
leader1 = &node.Info{IP: "127.0.0.1", Port: "9981"}
leader2 = &node.Info{IP: "127.0.0.1", Port: "9982"}
leaders = []*node.Info{leader1, leader2}
shardLeaderMap = map[int]*node.Info{
0: leader1,
1: leader2,
}
)
func TestNewNode(t *testing.T) {
var ip, port string
ip = "127.0.0.1"
port = "7523"
numshards := 2
priKey, _, _ := utils.GenKeyP2P(ip, port)
bc := New(numshards, ip, port, priKey)
if bc.PubKey == nil {
t.Error("beacon chain public key not initialized")
}
if bc.BCInfo.NumberOfNodesAdded != 0 {
t.Error("beacon chain number of nodes starting with is not zero! (should be zero)")
}
if bc.BCInfo.NumberOfShards != numshards {
t.Error("beacon chain number of shards not initialized to given number of desired shards")
}
}
func TestShardLeaderMap(t *testing.T) {
var ip string
ip = "127.0.0.1"
beaconport := "7523"
numshards := 1
priKey, _, _ := utils.GenKeyP2P(ip, beaconport)
bc := New(numshards, ip, beaconport, priKey)
bc.BCInfo.Leaders = leaders
if !reflect.DeepEqual(bc.GetShardLeaderMap(), shardLeaderMap) {
t.Error("The function GetShardLeaderMap doesn't work well")
}
}
func TestFetchLeaders(t *testing.T) {
var ip string
ip = "127.0.0.1"
beaconport := "7523"
numshards := 1
priKey, _, _ := utils.GenKeyP2P(ip, beaconport)
bc := New(numshards, ip, beaconport, priKey)
bc.BCInfo.Leaders = leaders
bc.rpcServer = beaconchain.NewServer(bc.GetShardLeaderMap)
bc.StartRPCServer()
port, _ := strconv.Atoi(beaconport)
bcClient := beaconchain.NewClient("127.0.0.1", strconv.Itoa(port+BeaconchainServicePortDiff))
response := bcClient.GetLeaders()
retleaders := response.GetLeaders()
if !(retleaders[0].GetIp() == leaders[0].IP || retleaders[0].GetPort() == leaders[0].Port || retleaders[1].GetPort() == leaders[1].Port) {
t.Error("Fetch leaders response is not as expected")
}
}
func TestAcceptNodeInfo(t *testing.T) {
var ip string
ip = "127.0.0.1"
beaconport := "7523"
numshards := 1
priKey, _, _ := utils.GenKeyP2P(ip, beaconport)
bc := New(numshards, ip, beaconport, priKey)
b := bcconn.SerializeNodeInfo(leader1)
node := bc.AcceptNodeInfo(b)
if !reflect.DeepEqual(node, leader1) {
t.Error("Beaconchain is unable to deserialize incoming node info")
}
if len(bc.BCInfo.Leaders) != 1 {
t.Error("Beaconchain was unable to update the leader array")
}
}
func TestRespondRandomness(t *testing.T) {
var ip string
ip = "127.0.0.1"
beaconport := "7523"
numshards := 1
priKey, _, _ := utils.GenKeyP2P(ip, beaconport)
bc := New(numshards, ip, beaconport, priKey)
bc.RespondRandomness(leader1)
assert.Equal(t, RandomInfoSent, bc.state)
}
func TestAcceptConnections(t *testing.T) {
var ip string
ip = "127.0.0.1"
beaconport := "7523"
numshards := 1
priKey, _, _ := utils.GenKeyP2P(ip, beaconport)
bc := New(numshards, ip, beaconport, priKey)
b := bcconn.SerializeNodeInfo(leader1)
bc.AcceptConnections(b)
assert.Equal(t, RandomInfoSent, bc.state)
}
func TestSaveBC(t *testing.T) {
var ip, port string
ip = "127.0.0.1"
port = "7523"
numshards := 2
bci := &BCInfo{IP: ip, Port: port, NumberOfShards: numshards}
bc := &BeaconChain{BCInfo: *bci}
err := SaveBeaconChainInfo("test.json", bc)
if err != nil {
log.Fatalln(err)
}
bc2, err2 := LoadBeaconChainInfo("test.json")
if err2 != nil {
log.Fatalln(err2)
}
if !reflect.DeepEqual(bc, bc2) {
t.Error("beacon chain info objects are not same")
}
os.Remove("test.json")
}
func TestSaveFile(t *testing.T) {
filepath := "test"
SetSaveFile(filepath)
if !reflect.DeepEqual(filepath, SaveFile) {
t.Error("Could not set savefile")
}
}

@ -1,51 +0,0 @@
package beaconchain
import (
"context"
"fmt"
"log"
"time"
proto "github.com/harmony-one/harmony/api/beaconchain"
"google.golang.org/grpc"
)
// Client is the client model for beaconchain service.
type Client struct {
beaconChainServiceClient proto.BeaconChainServiceClient
opts []grpc.DialOption
conn *grpc.ClientConn
}
// NewClient setups a Client given ip and port.
func NewClient(ip, port string) *Client {
client := Client{}
client.opts = append(client.opts, grpc.WithInsecure())
var err error
client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
return nil
}
client.beaconChainServiceClient = proto.NewBeaconChainServiceClient(client.conn)
return &client
}
// Close closes the Client.
func (client *Client) Close() {
client.conn.Close()
}
// GetLeaders gets current leaders from beacon chain
func (client *Client) GetLeaders() *proto.FetchLeadersResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
request := &proto.FetchLeadersRequest{}
response, err := client.beaconChainServiceClient.FetchLeaders(ctx, request)
if err != nil {
log.Fatalf("Error fetching leaders from beacon chain: %s", err)
}
return response
}

@ -1,51 +0,0 @@
package beaconchain
import (
"context"
"log"
"net"
"github.com/harmony-one/harmony/api/proto/node"
"google.golang.org/grpc"
proto "github.com/harmony-one/harmony/api/beaconchain"
)
// Server is the Server struct for beacon chain package.
type Server struct {
shardLeaderMap func() map[int]*node.Info
}
// FetchLeaders implements the FetchLeaders interface to return current leaders.
func (s *Server) FetchLeaders(ctx context.Context, request *proto.FetchLeadersRequest) (*proto.FetchLeadersResponse, error) {
log.Println("Returning FetchLeadersResponse")
leaders := []*proto.FetchLeadersResponse_Leader{}
for shardID, leader := range s.shardLeaderMap() {
leaders = append(leaders, &proto.FetchLeadersResponse_Leader{Ip: leader.IP, Port: leader.Port, ShardId: uint32(shardID), PeerID: leader.PeerID.Pretty()})
}
log.Println(leaders)
return &proto.FetchLeadersResponse{Leaders: leaders}, nil
}
// Start starts the Server on given ip and port.
func (s *Server) Start(ip, port string) (*grpc.Server, error) {
// TODO(minhdoan): Currently not using ip. Fix it later.
addr := net.JoinHostPort("", port)
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
proto.RegisterBeaconChainServiceServer(grpcServer, s)
go grpcServer.Serve(lis)
return grpcServer, nil
}
// NewServer creates new Server which implements BeaconChainServiceServer interface.
func NewServer(shardLeaderMap func() map[int]*node.Info) *Server {
s := &Server{shardLeaderMap}
return s
}

@ -1 +0,0 @@
Newnode package is for handling the interactions of a new candidate node that wants to join the network. Such interaction at the moment is about contacting the beaconchain and getting assigned a shard and findingout the shardleader. In future this package will be merged into the node package.

@ -1,170 +0,0 @@
package newnode
import (
"errors"
"fmt"
"os"
"strconv"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto/bcconn"
proto_identity "github.com/harmony-one/harmony/api/proto/identity"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
multiaddr "github.com/multiformats/go-multiaddr"
)
//NewNode is the struct for a candidate node
type NewNode struct {
Role string
ShardID int
ValidatorID int // Validator ID in its shard.
leader p2p.Peer
isLeader bool
Self p2p.Peer
Leaders map[uint32]p2p.Peer
PubK *bls.PublicKey
priK *bls.SecretKey
log log.Logger
SetInfo chan bool
host p2p.Host
}
// New candidate node initialization
func New(ip string, port string, nodePk p2p_crypto.PrivKey) *NewNode {
priKey, pubKey := utils.GenKey(ip, port)
var node NewNode
var err error
node.PubK = pubKey
node.priK = priKey
node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1}
node.log = utils.GetLogInstance()
node.SetInfo = make(chan bool)
node.host, err = p2pimpl.NewHost(&node.Self, nodePk)
if err != nil {
node.log.Error("failed to create new host", "msg", err)
return nil
}
node.Leaders = map[uint32]p2p.Peer{}
return &node
}
type registerResponseRandomNumber struct {
NumberOfShards int
NumberOfNodesAdded int
Leaders []*proto_node.Info
}
// ContactBeaconChain starts a newservice in the candidate node
func (node *NewNode) ContactBeaconChain(BCPeer p2p.Peer) error {
go node.host.BindHandlerAndServe(node.NodeHandler)
return node.requestBeaconChain(BCPeer)
}
func (node NewNode) String() string {
return fmt.Sprintf("bc: %v:%v => %v", node.Self.IP, node.Self.Port, node.Self.PeerID)
}
// RequestBeaconChain requests beacon chain for identity data
func (node *NewNode) requestBeaconChain(BCPeer p2p.Peer) (err error) {
node.log.Info("connecting to beacon chain now ...")
pubk := node.PubK.Serialize()
if err != nil {
node.log.Error("Could not Marshall public key into binary")
}
fmt.Printf("[New Node]: %v\n", *node)
nodeInfo := &proto_node.Info{IP: node.Self.IP, Port: node.Self.Port, PubKey: pubk, PeerID: node.Self.PeerID}
msg := bcconn.SerializeNodeInfo(nodeInfo)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Register, msg)
gotShardInfo := false
timeout := time.After(2 * time.Minute)
tick := time.Tick(3 * time.Second)
checkLoop:
for {
select {
case <-timeout:
gotShardInfo = false
break checkLoop
case <-tick:
select {
case setinfo := <-node.SetInfo:
if setinfo {
gotShardInfo = true
break checkLoop
}
default:
host.SendMessage(node.host, BCPeer, msgToSend, nil)
}
}
}
if !gotShardInfo {
err = errors.New("could not create connection")
node.log.Crit("Could not get sharding info after 2 minutes")
os.Exit(10)
}
return
}
// ProcessShardInfo
func (node *NewNode) processShardInfo(msgPayload []byte) bool {
leadersInfo := bcconn.DeserializeRandomInfo(msgPayload)
leaders := leadersInfo.Leaders
shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards)
for n, v := range leaders {
leaderPeer := p2p.Peer{IP: v.IP, Port: v.Port, PeerID: v.PeerID}
addr := fmt.Sprintf("/ip4/%s/tcp/%s", leaderPeer.IP, leaderPeer.Port)
targetAddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Error("processShardInfo NewMultiaddr error", "error", err)
return false
}
leaderPeer.Addrs = append(leaderPeer.Addrs, targetAddr)
leaderPeer.PubKey = &bls.PublicKey{}
err = leaderPeer.PubKey.Deserialize(v.PubKey[:])
if err != nil {
node.log.Error("Could not unmarshall leaders public key from binary")
}
node.Leaders[uint32(n)] = leaderPeer
}
node.leader = node.Leaders[uint32(shardNum-1)]
node.isLeader = isLeader
node.ShardID = shardNum - 1 //0 indexing.
node.SetInfo <- true
node.log.Info("Shard information obtained ..")
return true
}
// GetShardID gives shardid of node
func (node *NewNode) GetShardID() string {
return strconv.Itoa(node.ShardID)
}
// GetLeader gives the leader of the node
func (node *NewNode) GetLeader() p2p.Peer {
return node.leader
}
// GetClientPeer gives the client of the node
func (node *NewNode) GetClientPeer() *p2p.Peer {
return nil
}
// GetSelfPeer gives the peer part of the node's own struct
func (node *NewNode) GetSelfPeer() p2p.Peer {
return node.Self
}
// AddPeer add new peer for newnode
func (node *NewNode) AddPeer(p *p2p.Peer) error {
return node.host.AddPeer(p)
}

@ -1,63 +0,0 @@
package newnode
import (
"time"
"github.com/harmony-one/harmony/api/proto"
proto_identity "github.com/harmony-one/harmony/api/proto/identity"
"github.com/harmony-one/harmony/p2p"
)
// NodeHandler handles a new incoming connection.
func (node *NewNode) NodeHandler(s p2p.Stream) {
defer s.Close()
defer node.host.Close()
s.SetReadDeadline(time.Now().Add(1 * time.Second)) // This deadline is for 1 second to accept new connections.
content, err := p2p.ReadMessageContent(s)
if err != nil {
node.log.Error("Read p2p data failed", "err", err, "node", node)
return
}
msgCategory, err := proto.GetMessageCategory(content)
if err != nil {
node.log.Error("Read node type failed", "err", err, "node", node)
return
}
msgType, err := proto.GetMessageType(content)
if err != nil {
node.log.Error("Read action type failed", "err", err, "node", node)
return
}
msgPayload, err := proto.GetMessagePayload(content)
if err != nil {
node.log.Error("Read message payload failed", "err", err, "node", node)
return
}
identityMsgPayload, err := proto_identity.GetIdentityMessagePayload(msgPayload)
if err != nil {
node.log.Error("Read message payload failed")
return
}
switch msgCategory {
case proto.Identity:
actionType := proto_identity.IDMessageType(msgType)
switch actionType {
case proto_identity.Identity:
idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload)
if err != nil {
node.log.Error("Error finding the identity message type", err)
}
switch idMsgType {
case proto_identity.Acknowledge:
node.processShardInfo(identityMsgPayload)
default:
panic("The identity message type is wrong/missing and newnode does not handle this identity message type")
}
default:
panic("The msgCategory is wrong/missing and newnode does not handle this protocol message type")
}
}
}

@ -1,65 +0,0 @@
package newnode
import (
"fmt"
"testing"
"time"
beaconchain "github.com/harmony-one/harmony/internal/beaconchain/libs"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
peerstore "github.com/libp2p/go-libp2p-peerstore"
multiaddr "github.com/multiformats/go-multiaddr"
)
func TestNewNode(t *testing.T) {
var ip, port string
ip = "127.0.0.1"
port = "8088"
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "8088")
nnode := New(ip, port, priKey)
if nnode.PubK == nil {
t.Error("new node public key not initialized")
}
}
func TestBeaconChainConnect(t *testing.T) {
var ip, beaconport, bcma, nodeport string
ip = "127.0.0.1"
beaconport = "8081"
nodeport = "9081"
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9081")
nnode := New(ip, nodeport, priKey)
priKey, _, _ = utils.GenKeyP2P("127.0.0.1", "8081")
bc := beaconchain.New(1, ip, beaconport, priKey)
bcma = fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", bc.Self.IP, bc.Self.Port, bc.GetID().Pretty())
go bc.StartServer()
time.Sleep(3 * time.Second)
maddr, err := multiaddr.NewMultiaddr(bcma)
if err != nil {
t.Errorf("new multiaddr error: %v", err)
}
// Extract the peer ID from the multiaddr.
info, err2 := peerstore.InfoFromP2pAddr(maddr)
if err2 != nil {
t.Errorf("info from p2p addr error: %v", err2)
}
BCPeer := &p2p.Peer{IP: ip, Port: beaconport, Addrs: info.Addrs, PeerID: info.ID}
nnode.AddPeer(BCPeer)
err3 := nnode.ContactBeaconChain(*BCPeer)
if err3 != nil {
t.Errorf("could not read from connection: %v", err3)
}
}

@ -13,10 +13,6 @@ import (
var (
Port string
IP string
// Global Variable to use libp2p for networking
// FIXME: this is a temporary hack, once we totally switch to libp2p
// this variable shouldn't be used
UseLibP2P bool
)
// SetPortAndIP used to print out loggings of node with Port and IP.

@ -15,9 +15,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/api/client"
clientService "github.com/harmony-one/harmony/api/client/service"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/syncing"
"github.com/harmony-one/harmony/api/service/syncing/downloader"
@ -140,9 +138,6 @@ type Node struct {
// The p2p host used to send/receive p2p messages
host p2p.Host
// Channel to stop sending ping message
StopPing chan struct{}
// Signal channel for lost validators
OfflinePeers chan p2p.Peer
@ -218,11 +213,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions {
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
if utils.UseLibP2P {
select {}
} else {
node.host.BindHandlerAndServe(node.StreamHandler)
}
}
// Count the total number of transactions in the blockchain
@ -282,7 +273,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N
}
// Setup initial state of syncing.
node.StopPing = make(chan struct{})
node.peerRegistrationRecord = make(map[uint32]*syncConfig)
node.OfflinePeers = make(chan p2p.Peer)
@ -291,11 +281,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N
// start the goroutine to receive group message
go node.ReceiveGroupMessage()
if utils.UseLibP2P {
node.startConsensus = make(chan struct{})
} else {
node.startConsensus = nil
}
return &node
}
@ -325,35 +311,6 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
return count
}
// JoinShard helps a new node to join a shard.
func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, send ping message every 1 second, with a 10 minutes time-out
tick := time.NewTicker(1 * time.Second)
timeout := time.NewTicker(10 * time.Minute)
defer tick.Stop()
defer timeout.Stop()
for {
select {
case <-tick.C:
ping := proto_discovery.NewPingMessage(node.SelfPeer)
if node.Client != nil { // assume this is the client node
ping.Node.Role = proto_node.ClientRole
}
buffer := ping.ConstructPingMessage()
// Talk to leader.
node.SendMessage(leader, buffer)
case <-timeout.C:
utils.GetLogInstance().Info("JoinShard timeout")
return
case <-node.StopPing:
utils.GetLogInstance().Info("Stopping JoinShard")
return
}
}
}
// CalculateResponse implements DownloadInterface on Node object.
func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) {
response := &downloader_pb.DownloaderResponse{}

@ -29,29 +29,6 @@ const (
MaxNumberOfTransactionsPerBlock = 8000
)
// MaybeBroadcastAsValidator returns if the node is a validator node.
func (node *Node) MaybeBroadcastAsValidator(content []byte) {
// TODO: this is tree-based broadcasting. this needs to be replaced by p2p gossiping.
if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= host.MaxBroadCast {
go host.BroadcastMessageFromValidator(node.host, node.SelfPeer, node.Consensus.GetValidatorPeers(), content)
}
}
// StreamHandler handles a new incoming network message.
func (node *Node) StreamHandler(s p2p.Stream) {
defer s.Close()
// Read p2p message payload
content, err := p2p.ReadMessageContent(s)
if err != nil {
utils.GetLogInstance().Error("Read p2p data failed", "err", err, "node", node)
return
}
node.messageHandler(content, "")
}
// ReceiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages
func (node *Node) ReceiveGroupMessage() {
ctx := context.Background()
@ -282,11 +259,7 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
if node.ClientPeer != nil {
utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer)
if utils.UseLibP2P {
node.host.SendMessageToGroups([]p2p.GroupID{node.MyClientGroupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})))
} else {
node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
}
}
}
@ -375,9 +348,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// add to incoming peer list
//node.host.AddIncomingPeer(*peer)
if utils.UseLibP2P {
node.host.ConnectHostPeer(*peer)
}
if ping.Node.Role == proto_node.ClientRole {
utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer)
@ -388,29 +359,6 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// Add to Node's peer list anyway
node.AddPeers([]*p2p.Peer{peer})
// This is the old way of broadcasting pong message
if node.Consensus.IsLeader && !utils.UseLibP2P {
peers := node.Consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey())
buffer := pong.ConstructPongMessage()
// Send a Pong message directly to the sender
// This is necessary because the sender will need to get a ValidatorID
// Just broadcast won't work, some validators won't receive the latest
// PublicKeys as we rely on a valid ValidatorID to do broadcast.
// This is very buggy, but we will move to libp2p, hope the problem will
// be resolved then.
// However, I disable it for now as we are sending redundant PONG messages
// to all validators. This may not be needed. But it maybe add back.
// p2p.SendMessage(*peer, buffer)
// Broadcast the message to all validators, as publicKeys is updated
// FIXME: HAR-89 use a separate nodefind/neighbor message
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers)
// utils.GetLogInstance().Info("PingMsgHandler send pong message")
}
return 1
}
@ -518,10 +466,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
if node.State == NodeWaitToJoin {
node.State = NodeReadyForConsensus
// Notify JoinShard to stop sending Ping messages
if node.StopPing != nil {
node.StopPing <- struct{}{}
}
}
// Stop discovery service after received pong message

@ -3,37 +3,12 @@ package node
import (
"testing"
"github.com/golang/mock/gomock"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
func TestNodeStreamHandler(t *testing.T) {
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey}
validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"}
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902")
host, err := p2pimpl.NewHost(&leader, priKey)
if err != nil {
t.Fatalf("newhost failure: %v", err)
}
consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader)
node := New(host, consensus, nil)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := p2p.NewMockStream(ctrl)
m.EXPECT().Read(gomock.Any()).AnyTimes()
m.EXPECT().SetReadDeadline(gomock.Any())
m.EXPECT().Close()
node.StreamHandler(m)
}
func TestAddNewBlock(t *testing.T) {
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", PubKey: pubKey}

@ -120,13 +120,7 @@ func sendPingMessage(node *Node, leader p2p.Peer) {
}
ping1 := proto_discovery.NewPingMessage(p1)
buf1 := ping1.ConstructPingMessage()
fmt.Println("waiting for 5 seconds ...")
time.Sleep(5 * time.Second)
node.SendMessage(leader, buf1)
fmt.Println("sent ping message ...")
_ = ping1.ConstructPingMessage()
}
func sendPongMessage(node *Node, leader p2p.Peer) {
@ -147,13 +141,7 @@ func sendPongMessage(node *Node, leader p2p.Peer) {
leaderPubKey := pki.GetBLSPrivateKeyFromInt(888).GetPublicKey()
pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey)
buf1 := pong1.ConstructPongMessage()
fmt.Println("waiting for 10 seconds ...")
time.Sleep(10 * time.Second)
node.SendMessage(leader, buf1)
fmt.Println("sent pong message ...")
_ = pong1.ConstructPongMessage()
}
func exitServer() {

@ -2,19 +2,8 @@ package node
import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
)
// SendMessage sends data to ip, port
func (node *Node) SendMessage(p p2p.Peer, data []byte) {
host.SendMessage(node.host, p, data, nil)
}
// BroadcastMessage broadcasts message to peers
func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) {
host.BroadcastMessage(node.host, peers, data, nil)
}
// GetHost returns the p2p host
func (node *Node) GetHost() p2p.Host {
return node.host

@ -10,8 +10,6 @@ import (
// Host is the client + server in p2p network.
type Host interface {
GetSelfPeer() Peer
SendMessage(Peer, []byte) error
BindHandlerAndServe(handler StreamHandler)
Close() error
AddPeer(*Peer) error
GetID() libp2p_peer.ID

@ -5,7 +5,6 @@ package hostv2
import (
"context"
"fmt"
"io"
"sync"
"github.com/ethereum/go-ethereum/log"
@ -16,10 +15,8 @@ import (
libp2p "github.com/libp2p/go-libp2p"
libp2p_crypto "github.com/libp2p/go-libp2p-crypto"
libp2p_host "github.com/libp2p/go-libp2p-host"
libp2p_net "github.com/libp2p/go-libp2p-net"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
libp2p_peerstore "github.com/libp2p/go-libp2p-peerstore"
libp2p_protocol "github.com/libp2p/go-libp2p-protocol"
libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr"
)
@ -199,46 +196,6 @@ func (host *HostV2) GetSelfPeer() p2p.Peer {
return host.self
}
// BindHandlerAndServe bind a streamHandler to the harmony protocol.
func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) {
host.h.SetStreamHandler(libp2p_protocol.ID(ProtocolID), func(s libp2p_net.Stream) {
handler(s)
})
// Hang forever
<-make(chan struct{})
}
// SendMessage a p2p message sending function with signature compatible to p2pv1.
func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error {
logger := host.logger.New("from", host.self, "to", p, "PeerID", p.PeerID)
err := host.Peerstore().AddProtocols(p.PeerID, ProtocolID)
if err != nil {
logger.Error("AddProtocols() failed", "error", err)
return p2p.ErrAddProtocols
}
s, err := host.h.NewStream(context.Background(), p.PeerID, libp2p_protocol.ID(ProtocolID))
if err != nil {
logger.Error("NewStream() failed", "peerID", p.PeerID,
"protocolID", ProtocolID, "error", err)
return p2p.ErrNewStream
}
defer func() {
if err := s.Close(); err != nil {
logger.Warn("cannot close stream", "error", err)
}
}()
if nw, err := s.Write(message); err != nil {
logger.Error("Write() failed", "peerID", p.PeerID,
"protocolID", ProtocolID, "error", err)
return p2p.ErrMsgWrite
} else if nw < len(message) {
logger.Error("Short Write()", "expected", len(message), "actual", nw)
return io.ErrShortWrite
}
return nil
}
// Close closes the host
func (host *HostV2) Close() error {
return host.h.Close()

@ -2,52 +2,8 @@ package host
import (
"encoding/binary"
"net"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/harmony-one/harmony/p2p"
)
// SendMessage is to connect a socket given a port and send the given message.
// TODO(minhdoan, rj): need to check if a peer is reachable or not.
func SendMessage(host p2p.Host, p p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Construct normal p2p message
content := ConstructP2pMessage(byte(0), message)
go send(host, p, content, lostPeer)
}
// BroadcastMessage sends the message to a list of peers
func BroadcastMessage(h p2p.Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
if len(peers) == 0 {
return
}
// Construct broadcast p2p message
content := ConstructP2pMessage(byte(17), msg)
length := len(content)
// log.Info("Start Broadcasting", "gomaxprocs", runtime.GOMAXPROCS(0), "Size", length)
// start := time.Now()
for _, peer := range peers {
peerCopy := peer
go send(h, peerCopy, content, lostPeer)
}
// log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
// Keep track of block propagation time
// Assume 1M message is for block propagation
if length > 1000000 {
log.Debug("NET: START BLOCK PROPAGATION", "Size", length)
}
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h p2p.Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg, lostPeer)
}
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte {
message := make([]byte, 5+len(content))
@ -56,58 +12,3 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte {
copy(message[5:], content)
return message
}
// BroadcastMessageFromValidator sends the message to a list of peers from a validator.
func BroadcastMessageFromValidator(h p2p.Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) {
peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast)
BroadcastMessage(h, peers, msg, nil)
}
// MaxBroadCast is the maximum number of neighbors to broadcast
const MaxBroadCast = 20
// SelectMyPeers chooses a list of peers based on the range of ValidatorID
// This is a quick hack of the current p2p networking model
func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer {
res := []p2p.Peer{}
for _, peer := range peers {
if peer.ValidatorID >= min && peer.ValidatorID <= max {
res = append(res, peer)
}
}
return res
}
// Send a message to another node with given port.
func send(h p2p.Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Add attack code here.
//attack.GetInstance().Run()
backoff := p2p.NewExpBackoff(250*time.Millisecond, 5*time.Second, 2)
for trial := 0; trial < 3; trial++ {
err := h.SendMessage(peer, message)
// No need to retry if new stream error or no error
if err == nil || err == p2p.ErrNewStream {
if trial > 0 {
log.Warn("retry send", "rety", trial)
}
return
}
log.Info("sleeping before trying to send again",
"duration", backoff.Cur, "addr", net.JoinHostPort(peer.IP, peer.Port))
backoff.Sleep()
}
log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port))
if lostPeer != nil {
// Notify lostPeer channel
lostPeer <- peer
}
}
// DialWithSocketClient joins host port and establishes connection
func DialWithSocketClient(ip, port string) (conn net.Conn, err error) {
addr := net.JoinHostPort(ip, port)
conn, err = net.Dial("tcp", addr)
return
}

@ -1,59 +0,0 @@
package host
import (
"fmt"
"reflect"
"testing"
"time"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
func TestSendMessage(test *testing.T) {
peer1 := p2p.Peer{IP: "127.0.0.1", Port: "9000"}
selfAddr1, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer1.Port))
peer1.Addrs = append(peer1.Addrs, selfAddr1)
priKey1, pubKey1, _ := utils.GenKeyP2P(peer1.IP, peer1.Port)
peerID1, _ := libp2p_peer.IDFromPublicKey(pubKey1)
peer1.PeerID = peerID1
host1, _ := p2pimpl.NewHost(&peer1, priKey1)
peer2 := p2p.Peer{IP: "127.0.0.1", Port: "9001"}
selfAddr2, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer2.Port))
peer2.Addrs = append(peer2.Addrs, selfAddr2)
priKey2, pubKey2, _ := utils.GenKeyP2P(peer2.IP, peer2.Port)
peerID2, _ := libp2p_peer.IDFromPublicKey(pubKey2)
peer2.PeerID = peerID2
host2, _ := p2pimpl.NewHost(&peer2, priKey2)
msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04}
if err := host1.AddPeer(&peer2); err != nil {
test.Fatalf("cannot add peer2 to host1: %v", err)
}
go host2.BindHandlerAndServe(handler)
SendMessage(host1, peer2, msg, nil)
time.Sleep(3 * time.Second)
}
func handler(s p2p.Stream) {
defer func() {
if err := s.Close(); err != nil {
panic(fmt.Sprintf("Close(%v) failed: %v", s, err))
}
}()
content, err := p2p.ReadMessageContent(s)
if err != nil {
panic("Read p2p data failed")
}
golden := []byte{0x00, 0x01, 0x02, 0x03, 0x04}
if !reflect.DeepEqual(content, golden) {
panic("received message not equal original message")
}
}

@ -49,32 +49,6 @@ func (mr *MockHostMockRecorder) GetSelfPeer() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSelfPeer", reflect.TypeOf((*MockHost)(nil).GetSelfPeer))
}
// SendMessage mocks base method
func (m *MockHost) SendMessage(arg0 p2p.Peer, arg1 []byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMessage", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SendMessage indicates an expected call of SendMessage
func (mr *MockHostMockRecorder) SendMessage(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MockHost)(nil).SendMessage), arg0, arg1)
}
// BindHandlerAndServe mocks base method
func (m *MockHost) BindHandlerAndServe(handler p2p.StreamHandler) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BindHandlerAndServe", handler)
}
// BindHandlerAndServe indicates an expected call of BindHandlerAndServe
func (mr *MockHostMockRecorder) BindHandlerAndServe(handler interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BindHandlerAndServe", reflect.TypeOf((*MockHost)(nil).BindHandlerAndServe), handler)
}
// Close mocks base method
func (m *MockHost) Close() error {
m.ctrl.T.Helper()

@ -5,8 +5,8 @@ export GO111MODULE=on
declare -A SRC
SRC[harmony]=cmd/harmony.go
SRC[txgen]=cmd/client/txgen/main.go
SRC[wallet]=cmd/client/wallet/main.go
SRC[bootnode]=cmd/bootnode/main.go
# SRC[wallet]=cmd/client/wallet/main.go
BINDIR=bin
BUCKET=unique-bucket-bin

@ -19,7 +19,7 @@ function check_result() {
}
function cleanup() {
for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|beacon\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`;
for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`;
do
echo 'Killed process: '$pid
$DRYRUN kill -9 $pid 2> /dev/null
@ -57,7 +57,6 @@ USAGE: $ME [OPTIONS] config_file_name
-k nodeport kill the node with specified port number (default: $KILLPORT)
-n dryrun mode (default: $DRYRUN)
-S enable sync test (default: $SYNC)
-P enable libp2p peer discovery test (default: $P2P)
This script will build all the binaries and start harmony and txgen based on the configuration file.
@ -78,9 +77,8 @@ SHARDS=2
KILLPORT=9004
SYNC=true
DRYRUN=
P2P=false
while getopts "hdtD:m:s:k:nSP" option; do
while getopts "hdtD:m:s:k:nS" option; do
case $option in
h) usage ;;
d) DB='-db_supported' ;;
@ -91,7 +89,6 @@ while getopts "hdtD:m:s:k:nSP" option; do
k) KILLPORT=$OPTARG ;;
n) DRYRUN=echo ;;
S) SYNC=true ;;
P) P2P=true ;;
esac
done
@ -128,25 +125,12 @@ log_folder="tmp_log/log-$t"
mkdir -p $log_folder
LOG_FILE=$log_folder/r.log
HMY_OPT=
HMY_OPT2=
HMY_OPT3=
if [ "$P2P" == "false" ]; then
echo "launching beacon chain ..."
$DRYRUN $ROOT/bin/beacon -numShards $SHARDS > $log_folder/beacon.log 2>&1 | tee -a $LOG_FILE &
sleep 1 #waiting for beaconchain
BC_MA=$(grep "Beacon Chain Started" $log_folder/beacon.log | awk -F: ' { print $2 } ')
HMY_OPT=" -bc_addr $BC_MA"
else
echo "launching boot node ..."
$DRYRUN $ROOT/bin/bootnode -port 19876 > $log_folder/bootnode.log 2>&1 | tee -a $LOG_FILE &
sleep 1
BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ')
HMY_OPT2=" -bootnodes $BN_MA"
HMY_OPT2+=" -libp2p_pd"
HMY_OPT3=" -is_beacon"
fi
NUM_NN=0
@ -154,15 +138,15 @@ NUM_NN=0
while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardID <<< $line
if [ "$mode" == "leader" ]; then
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_leader 2>&1 | tee -a $LOG_FILE &
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_leader 2>&1 | tee -a $LOG_FILE &
fi
if [ "$mode" == "validator" ]; then
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE &
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE &
fi
sleep 0.5
if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then
(( NUM_NN += 35 ))
(sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) &
(sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) &
fi
done < $config
@ -177,7 +161,7 @@ if [ "$TXGEN" == "true" ]; then
line=$(grep client $config)
IFS=' ' read ip port mode shardID <<< $line
if [ "$mode" == "client" ]; then
$DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT $HMY_OPT2 2>&1 | tee -a $LOG_FILE
$DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT2 2>&1 | tee -a $LOG_FILE
fi
else
sleep $DURATION

Loading…
Cancel
Save