diff --git a/api/beaconchain/beaconchain.go b/api/beaconchain/beaconchain.go deleted file mode 100644 index fb775e204..000000000 --- a/api/beaconchain/beaconchain.go +++ /dev/null @@ -1,3 +0,0 @@ -package beaconchain - -//go:generate protoc beaconchain.proto --go_out=plugins=grpc:. diff --git a/api/beaconchain/beaconchain.pb.go b/api/beaconchain/beaconchain.pb.go deleted file mode 100644 index 1f7e5003c..000000000 --- a/api/beaconchain/beaconchain.pb.go +++ /dev/null @@ -1,256 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: beaconchain.proto - -package beaconchain - -import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -// FetchLeadersRequest is the request to fetch the current leaders. -type FetchLeadersRequest struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *FetchLeadersRequest) Reset() { *m = FetchLeadersRequest{} } -func (m *FetchLeadersRequest) String() string { return proto.CompactTextString(m) } -func (*FetchLeadersRequest) ProtoMessage() {} -func (*FetchLeadersRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_474fd8061d1037cf, []int{0} -} - -func (m *FetchLeadersRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_FetchLeadersRequest.Unmarshal(m, b) -} -func (m *FetchLeadersRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_FetchLeadersRequest.Marshal(b, m, deterministic) -} -func (m *FetchLeadersRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_FetchLeadersRequest.Merge(m, src) -} -func (m *FetchLeadersRequest) XXX_Size() int { - return xxx_messageInfo_FetchLeadersRequest.Size(m) -} -func (m *FetchLeadersRequest) XXX_DiscardUnknown() { - xxx_messageInfo_FetchLeadersRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_FetchLeadersRequest proto.InternalMessageInfo - -// FetchLeadersResponse is the response of FetchLeadersRequest. -type FetchLeadersResponse struct { - Leaders []*FetchLeadersResponse_Leader `protobuf:"bytes,1,rep,name=leaders,proto3" json:"leaders,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *FetchLeadersResponse) Reset() { *m = FetchLeadersResponse{} } -func (m *FetchLeadersResponse) String() string { return proto.CompactTextString(m) } -func (*FetchLeadersResponse) ProtoMessage() {} -func (*FetchLeadersResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_474fd8061d1037cf, []int{1} -} - -func (m *FetchLeadersResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_FetchLeadersResponse.Unmarshal(m, b) -} -func (m *FetchLeadersResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_FetchLeadersResponse.Marshal(b, m, deterministic) -} -func (m *FetchLeadersResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_FetchLeadersResponse.Merge(m, src) -} -func (m *FetchLeadersResponse) XXX_Size() int { - return xxx_messageInfo_FetchLeadersResponse.Size(m) -} -func (m *FetchLeadersResponse) XXX_DiscardUnknown() { - xxx_messageInfo_FetchLeadersResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_FetchLeadersResponse proto.InternalMessageInfo - -func (m *FetchLeadersResponse) GetLeaders() []*FetchLeadersResponse_Leader { - if m != nil { - return m.Leaders - } - return nil -} - -type FetchLeadersResponse_Leader struct { - Ip string `protobuf:"bytes,1,opt,name=ip,proto3" json:"ip,omitempty"` - Port string `protobuf:"bytes,2,opt,name=port,proto3" json:"port,omitempty"` - ShardId uint32 `protobuf:"varint,3,opt,name=shardId,proto3" json:"shardId,omitempty"` - PeerID string `protobuf:"bytes,4,opt,name=peerID,proto3" json:"peerID,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *FetchLeadersResponse_Leader) Reset() { *m = FetchLeadersResponse_Leader{} } -func (m *FetchLeadersResponse_Leader) String() string { return proto.CompactTextString(m) } -func (*FetchLeadersResponse_Leader) ProtoMessage() {} -func (*FetchLeadersResponse_Leader) Descriptor() ([]byte, []int) { - return fileDescriptor_474fd8061d1037cf, []int{1, 0} -} - -func (m *FetchLeadersResponse_Leader) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_FetchLeadersResponse_Leader.Unmarshal(m, b) -} -func (m *FetchLeadersResponse_Leader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_FetchLeadersResponse_Leader.Marshal(b, m, deterministic) -} -func (m *FetchLeadersResponse_Leader) XXX_Merge(src proto.Message) { - xxx_messageInfo_FetchLeadersResponse_Leader.Merge(m, src) -} -func (m *FetchLeadersResponse_Leader) XXX_Size() int { - return xxx_messageInfo_FetchLeadersResponse_Leader.Size(m) -} -func (m *FetchLeadersResponse_Leader) XXX_DiscardUnknown() { - xxx_messageInfo_FetchLeadersResponse_Leader.DiscardUnknown(m) -} - -var xxx_messageInfo_FetchLeadersResponse_Leader proto.InternalMessageInfo - -func (m *FetchLeadersResponse_Leader) GetIp() string { - if m != nil { - return m.Ip - } - return "" -} - -func (m *FetchLeadersResponse_Leader) GetPort() string { - if m != nil { - return m.Port - } - return "" -} - -func (m *FetchLeadersResponse_Leader) GetShardId() uint32 { - if m != nil { - return m.ShardId - } - return 0 -} - -func (m *FetchLeadersResponse_Leader) GetPeerID() string { - if m != nil { - return m.PeerID - } - return "" -} - -func init() { - proto.RegisterType((*FetchLeadersRequest)(nil), "beaconchain.FetchLeadersRequest") - proto.RegisterType((*FetchLeadersResponse)(nil), "beaconchain.FetchLeadersResponse") - proto.RegisterType((*FetchLeadersResponse_Leader)(nil), "beaconchain.FetchLeadersResponse.Leader") -} - -func init() { proto.RegisterFile("beaconchain.proto", fileDescriptor_474fd8061d1037cf) } - -var fileDescriptor_474fd8061d1037cf = []byte{ - // 222 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xcd, 0x4a, 0xc4, 0x30, - 0x14, 0x85, 0x4d, 0x67, 0xe8, 0xe0, 0x1d, 0x15, 0xbc, 0xfe, 0x10, 0x66, 0x15, 0xbb, 0xca, 0xaa, - 0x8b, 0xf1, 0x0d, 0xaa, 0x08, 0x05, 0x57, 0x11, 0xb7, 0x42, 0x9a, 0x5e, 0x68, 0x50, 0x9a, 0x98, - 0x44, 0x1f, 0xce, 0xa7, 0x13, 0x53, 0x0b, 0x15, 0x44, 0x77, 0x39, 0x1f, 0x39, 0xe4, 0xcb, 0x81, - 0xd3, 0x8e, 0xb4, 0x71, 0xa3, 0x19, 0xb4, 0x1d, 0x6b, 0x1f, 0x5c, 0x72, 0xb8, 0x5d, 0xa0, 0xea, - 0x02, 0xce, 0xee, 0x28, 0x99, 0xe1, 0x9e, 0x74, 0x4f, 0x21, 0x2a, 0x7a, 0x7d, 0xa3, 0x98, 0xaa, - 0x0f, 0x06, 0xe7, 0x3f, 0x79, 0xf4, 0x6e, 0x8c, 0x84, 0x0d, 0x6c, 0x5e, 0x26, 0xc4, 0x99, 0x58, - 0xc9, 0xed, 0x5e, 0xd6, 0xcb, 0x17, 0x7e, 0xeb, 0xd4, 0x53, 0x56, 0x73, 0x71, 0xf7, 0x04, 0xe5, - 0x84, 0xf0, 0x04, 0x0a, 0xeb, 0x39, 0x13, 0x4c, 0x1e, 0xaa, 0xc2, 0x7a, 0x44, 0x58, 0x7b, 0x17, - 0x12, 0x2f, 0x32, 0xc9, 0x67, 0xe4, 0xb0, 0x89, 0x83, 0x0e, 0x7d, 0xdb, 0xf3, 0x95, 0x60, 0xf2, - 0x58, 0xcd, 0x11, 0x2f, 0xa1, 0xf4, 0x44, 0xa1, 0xbd, 0xe5, 0xeb, 0x7c, 0xff, 0x3b, 0xed, 0x9f, - 0x01, 0x9b, 0xec, 0x74, 0xf3, 0xe5, 0xf4, 0x40, 0xe1, 0xdd, 0x1a, 0xc2, 0x47, 0x38, 0x5a, 0xda, - 0xa1, 0xf8, 0x43, 0x3c, 0x8f, 0xb0, 0xbb, 0xfa, 0xf7, 0x6b, 0xd5, 0x41, 0x57, 0xe6, 0x51, 0xaf, - 0x3f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x35, 0x50, 0x26, 0x86, 0x69, 0x01, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// BeaconChainServiceClient is the client API for BeaconChainService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type BeaconChainServiceClient interface { - FetchLeaders(ctx context.Context, in *FetchLeadersRequest, opts ...grpc.CallOption) (*FetchLeadersResponse, error) -} - -type beaconChainServiceClient struct { - cc *grpc.ClientConn -} - -func NewBeaconChainServiceClient(cc *grpc.ClientConn) BeaconChainServiceClient { - return &beaconChainServiceClient{cc} -} - -func (c *beaconChainServiceClient) FetchLeaders(ctx context.Context, in *FetchLeadersRequest, opts ...grpc.CallOption) (*FetchLeadersResponse, error) { - out := new(FetchLeadersResponse) - err := c.cc.Invoke(ctx, "/beaconchain.BeaconChainService/FetchLeaders", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// BeaconChainServiceServer is the server API for BeaconChainService service. -type BeaconChainServiceServer interface { - FetchLeaders(context.Context, *FetchLeadersRequest) (*FetchLeadersResponse, error) -} - -func RegisterBeaconChainServiceServer(s *grpc.Server, srv BeaconChainServiceServer) { - s.RegisterService(&_BeaconChainService_serviceDesc, srv) -} - -func _BeaconChainService_FetchLeaders_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(FetchLeadersRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BeaconChainServiceServer).FetchLeaders(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/beaconchain.BeaconChainService/FetchLeaders", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BeaconChainServiceServer).FetchLeaders(ctx, req.(*FetchLeadersRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _BeaconChainService_serviceDesc = grpc.ServiceDesc{ - ServiceName: "beaconchain.BeaconChainService", - HandlerType: (*BeaconChainServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "FetchLeaders", - Handler: _BeaconChainService_FetchLeaders_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "beaconchain.proto", -} diff --git a/api/beaconchain/beaconchain.proto b/api/beaconchain/beaconchain.proto deleted file mode 100644 index f5f7bcf5e..000000000 --- a/api/beaconchain/beaconchain.proto +++ /dev/null @@ -1,23 +0,0 @@ -syntax = "proto3"; - -package beaconchain; - -// BeaconChainService is the service used for any beacon chain requests. -service BeaconChainService { - rpc FetchLeaders(FetchLeadersRequest) returns (FetchLeadersResponse) {} -} - -// FetchLeadersRequest is the request to fetch the current leaders. -message FetchLeadersRequest { -} - -// FetchLeadersResponse is the response of FetchLeadersRequest. -message FetchLeadersResponse { - message Leader { - string ip = 1; - string port = 2; - uint32 shardId = 3; - string peerID = 4; - } - repeated Leader leaders = 1; -} diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index e9ed3e44c..d2e41c96f 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -15,14 +15,11 @@ import ( "github.com/harmony-one/harmony/cmd/client/txgen/txgen" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/internal/newnode" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" p2p_host "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/p2pimpl" - peerstore "github.com/libp2p/go-libp2p-peerstore" - multiaddr "github.com/multiformats/go-multiaddr" ) var ( @@ -50,18 +47,10 @@ func main() { versionFlag := flag.Bool("version", false, "Output version info") crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") - bcIP := flag.String("bc", "127.0.0.1", "IP of the identity chain") - bcPort := flag.String("bc_port", "8081", "port of the identity chain") - - bcAddr := flag.String("bc_addr", "", "MultiAddr of the identity chain") - // Key file to store the private key keyFile := flag.String("key", "./.txgenkey", "the private key file of the txgen") flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress") - // LibP2P peer discovery integration test - libp2pPD := flag.Bool("libp2p_pd", false, "enable libp2p based peer discovery") - flag.Parse() if *versionFlag { @@ -95,41 +84,9 @@ func main() { selfPeer := p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey} - if !*libp2pPD { - var bcPeer *p2p.Peer - if *bcAddr != "" { - // Turn the destination into a multiaddr. - maddr, err := multiaddr.NewMultiaddr(*bcAddr) - if err != nil { - panic(err) - } - - // Extract the peer ID from the multiaddr. - info, err := peerstore.InfoFromP2pAddr(maddr) - if err != nil { - panic(err) - } - - bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} - } else { - bcPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} - } - - candidateNode := newnode.New(*ip, *port, nodePriKey) - candidateNode.AddPeer(bcPeer) - candidateNode.ContactBeaconChain(*bcPeer) - selfPeer := candidateNode.GetSelfPeer() - selfPeer.PubKey = candidateNode.PubK - - shardIDLeaderMap = candidateNode.Leaders - - debugPrintShardIDLeaderMap(shardIDLeaderMap) - } else { - // Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard - shardIDLeaderMap = make(map[uint32]p2p.Peer) - shardIDLeaderMap[0] = p2p.Peer{} - utils.UseLibP2P = true - } + // Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard + shardIDLeaderMap = make(map[uint32]p2p.Peer) + shardIDLeaderMap[0] = p2p.Peer{} // Do cross shard tx if there are more than one shard setting := txgen.Settings{ @@ -195,27 +152,14 @@ func main() { } clientNode.Client.UpdateBlocks = updateBlocksFunc - for _, leader := range shardIDLeaderMap { - if !*libp2pPD { - clientNode.GetHost().AddPeer(&leader) - utils.GetLogInstance().Debug("Client Join Shard", "leader", leader) - go clientNode.JoinShard(leader) - } - clientNode.State = node.NodeReadyForConsensus - } + clientNode.Role = node.ClientNode + clientNode.ServiceManagerSetup() + clientNode.RunServices() - if *libp2pPD { - clientNode.Role = node.ClientNode - clientNode.ServiceManagerSetup() - clientNode.RunServices() - go clientNode.StartServer() - } else { - // Start the client server to listen to leader's message - go clientNode.StartServer() - // wait for 1 seconds for client to send ping message to leader - time.Sleep(time.Second) - clientNode.StopPing <- struct{}{} - } + // Start the client server to listen to leader's message + go clientNode.StartServer() + // wait for 1 seconds for client to send ping message to leader + time.Sleep(time.Second) clientNode.State = node.NodeReadyForConsensus // Transaction generation process @@ -258,23 +202,17 @@ func main() { // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - if utils.UseLibP2P { - clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) - } else { - clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) - } - time.Sleep(3000 * time.Millisecond) + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, p2p_host.ConstructP2pMessage(byte(0), msg)) + + time.Sleep(3 * time.Second) } // SendTxsToLeader sends txs to leader account. func SendTxsToLeader(clientNode *node.Node, leader p2p.Peer, txs types.Transactions) { utils.GetLogInstance().Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessageAccount(txs) - if utils.UseLibP2P { - clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) - } else { - clientNode.SendMessage(leader, msg) - } + clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) } func debugPrintShardIDLeaderMap(leaderMap map[uint32]p2p.Peer) { diff --git a/cmd/client/wallet/lib/lib.go b/cmd/client/wallet/lib/lib.go index eeaee6899..b99b1b71a 100644 --- a/cmd/client/wallet/lib/lib.go +++ b/cmd/client/wallet/lib/lib.go @@ -2,28 +2,21 @@ package lib import ( "fmt" - "strconv" "time" "github.com/harmony-one/harmony/api/client" - proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core/types" - libs "github.com/harmony-one/harmony/internal/beaconchain/libs" - beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" - peer "github.com/libp2p/go-libp2p-peer" ) // CreateWalletNode creates wallet server node. func CreateWalletNode() *node.Node { shardIDLeaderMap := make(map[uint32]p2p.Peer) - port, _ := strconv.Atoi("9999") - bcClient := beaconchain.NewClient("54.183.5.66", strconv.Itoa(port+libs.BeaconchainServicePortDiff)) - response := bcClient.GetLeaders() + // port, _ := strconv.Atoi("9999") // dummy host for wallet self := p2p.Peer{IP: "127.0.0.1", Port: "6789"} @@ -33,16 +26,17 @@ func CreateWalletNode() *node.Node { panic(err) } - for _, leader := range response.Leaders { - peerID, err := peer.IDB58Decode(leader.PeerID) - if err != nil { - panic(err) + /* + for _, leader := range response.Leaders { + peerID, err := peer.IDB58Decode(leader.PeerID) + if err != nil { + panic(err) + } + leaderPeer := p2p.Peer{IP: leader.Ip, Port: leader.Port, PeerID: peerID} + shardIDLeaderMap[leader.ShardId] = leaderPeer + host.AddPeer(&leaderPeer) } - leaderPeer := p2p.Peer{IP: leader.Ip, Port: leader.Port, PeerID: peerID} - shardIDLeaderMap[leader.ShardId] = leaderPeer - host.AddPeer(&leaderPeer) - } - + */ walletNode := node.New(host, nil, nil) walletNode.Client = client.NewClient(walletNode.GetHost(), shardIDLeaderMap) return walletNode @@ -50,9 +44,9 @@ func CreateWalletNode() *node.Node { // SubmitTransaction submits the transaction to the Harmony network func SubmitTransaction(tx *types.Transaction, walletNode *node.Node, shardID uint32) error { - msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) - leader := walletNode.Client.Leaders[shardID] - walletNode.SendMessage(leader, msg) + // msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) + // leader := walletNode.Client.Leaders[shardID] + // walletNode.SendMessage(leader, msg) fmt.Printf("Transaction Id for shard %d: %s\n", int(shardID), tx.Hash().Hex()) time.Sleep(300 * time.Millisecond) return nil diff --git a/cmd/client/wallet/lib/lib_test.go b/cmd/client/wallet/lib/lib_test.go index e6710df1c..b92b28ccf 100644 --- a/cmd/client/wallet/lib/lib_test.go +++ b/cmd/client/wallet/lib/lib_test.go @@ -29,7 +29,6 @@ func TestSubmitTransaction(test *testing.T) { m := mock_host.NewMockHost(ctrl) m.EXPECT().GetSelfPeer().AnyTimes() - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1) walletNode := node.New(m, nil, nil) priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9990") diff --git a/cmd/harmony.go b/cmd/harmony.go index b6045fc14..f9956f664 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -101,9 +101,6 @@ func main() { keyFile := flag.String("key", "./.hmykey", "the private key file of the harmony node") flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress") - // LibP2P peer discovery integration test - libp2pPD := flag.Bool("libp2p_pd", false, "enable libp2p based peer discovery") - // isBeacon indicates this node is a beacon chain node isBeacon := flag.Bool("is_beacon", false, "true means this node is a beacon chain node") @@ -165,7 +162,6 @@ func main() { } else { role = "validator" } - utils.UseLibP2P = true // Init logging. loggingInit(*logFolder, role, *ip, *port, *onlyLogTps) @@ -250,16 +246,8 @@ func main() { consensus.OnConsensusDone = currentNode.PostConsensusProcessing currentNode.State = node.NodeWaitToJoin - if !*libp2pPD { - if consensus.IsLeader { - currentNode.State = node.NodeLeader - } else { - go currentNode.JoinShard(leader) - } - } else { - if consensus.IsLeader { - go currentNode.SendPongMessage() - } + if consensus.IsLeader { + go currentNode.SendPongMessage() } go currentNode.SupportSyncing() diff --git a/consensus/consensus.go b/consensus/consensus.go index 498224558..3181240cd 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -472,11 +472,7 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.leader.PubKey) buffer := pong.ConstructPongMessage() - if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), buffer)) - } else { - host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers) - } + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), buffer)) } return count2 @@ -637,11 +633,6 @@ func (consensus *Consensus) GetPeerFromID(peerID uint32) (p2p.Peer, bool) { return value, true } -// SendMessage sends message thru p2p host to peer. -func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) { - host.SendMessage(consensus.host, peer, message, nil) -} - // Populates the common basic fields for all consensus message. func (consensus *Consensus) populateMessageFields(request *msg_pb.ConsensusRequest) { // TODO(minhdoan): Maybe look into refactor this. diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index d8eb0932e..6b9833f9a 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -146,13 +146,9 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) { // Leader sign the block hash itself consensus.prepareSigs[consensus.nodeID] = consensus.priKey.SignHash(consensus.blockHash[:]) - if utils.UseLibP2P { - // Construct broadcast p2p message - utils.GetLogInstance().Warn("[Consensus]", "sent announce message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) - } + // Construct broadcast p2p message + utils.GetLogInstance().Warn("[Consensus]", "sent announce message", len(msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } // processPrepareMessage processes the prepare message sent from validators @@ -217,12 +213,8 @@ func (consensus *Consensus) processPrepareMessage(message *msg_pb.Message) { msgToSend, aggSig := consensus.constructPreparedMessage() consensus.aggregatedPrepareSig = aggSig - if utils.UseLibP2P { - utils.GetLogInstance().Warn("[Consensus]", "sent prepared message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) - } + utils.GetLogInstance().Warn("[Consensus]", "sent prepared message", len(msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) // Set state to targetState consensus.state = targetState @@ -296,12 +288,8 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) { msgToSend, aggSig := consensus.constructCommittedMessage() consensus.aggregatedCommitSig = aggSig - if utils.UseLibP2P { - utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) - } + utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) var blockObj types.Block err := rlp.DecodeBytes(consensus.block, &blockObj) diff --git a/consensus/consensus_leader_test.go b/consensus/consensus_leader_test.go index 37666d434..5d051b341 100644 --- a/consensus/consensus_leader_test.go +++ b/consensus/consensus_leader_test.go @@ -45,7 +45,7 @@ func TestProcessMessageLeaderPrepare(test *testing.T) { // Asserts that the first and only call to Bar() is passed 99. // Anything else will fail. m.EXPECT().GetSelfPeer().Return(leader) - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(3) + m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()) consensusLeader := New(m, "0", validators, leader) consensusLeader.blockHash = blockHash @@ -91,7 +91,6 @@ func TestProcessMessageLeaderPrepareInvalidSignature(test *testing.T) { // Asserts that the first and only call to Bar() is passed 99. // Anything else will fail. m.EXPECT().GetSelfPeer().Return(leader) - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(0) consensusLeader := New(m, "0", validators, leader) consensusLeader.blockHash = blockHash @@ -146,7 +145,7 @@ func TestProcessMessageLeaderCommit(test *testing.T) { // Asserts that the first and only call to Bar() is passed 99. // Anything else will fail. m.EXPECT().GetSelfPeer().Return(leader) - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(3) + m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()) for i := 0; i < 3; i++ { priKey, _, _ := utils.GenKeyP2P(validators[i].IP, validators[i].Port) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index a94e01bf9..4067d7f7c 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -122,12 +122,8 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) { // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() - if utils.UseLibP2P { - utils.GetLogInstance().Warn("[Consensus]", "sent prepare message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - consensus.SendMessage(consensus.leader, msgToSend) - } + utils.GetLogInstance().Warn("[Consensus]", "sent prepare message", len(msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.state = PrepareDone } @@ -189,12 +185,8 @@ func (consensus *Consensus) processPreparedMessage(message *msg_pb.Message) { // Construct and send the commit message multiSigAndBitmap := append(multiSig, bitmap...) msgToSend := consensus.constructCommitMessage(multiSigAndBitmap) - if utils.UseLibP2P { - utils.GetLogInstance().Warn("[Consensus]", "sent commit message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - consensus.SendMessage(consensus.leader, msgToSend) - } + utils.GetLogInstance().Warn("[Consensus]", "sent commit message", len(msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.state = CommitDone } diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go index 6411c8108..588f1fb31 100644 --- a/consensus/consensus_validator_test.go +++ b/consensus/consensus_validator_test.go @@ -36,7 +36,7 @@ func TestProcessMessageValidatorAnnounce(test *testing.T) { // Asserts that the first and only call to Bar() is passed 99. // Anything else will fail. m.EXPECT().GetSelfPeer().Return(leader) - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(1) + m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()) priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") host, err := p2pimpl.NewHost(&leader, priKey) @@ -92,7 +92,7 @@ func TestProcessMessageValidatorPrepared(test *testing.T) { // Asserts that the first and only call to Bar() is passed 99. // Anything else will fail. m.EXPECT().GetSelfPeer().Return(leader) - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2) + m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()).Times(2) priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") host, err := p2pimpl.NewHost(&leader, priKey) @@ -162,7 +162,7 @@ func TestProcessMessageValidatorCommitted(test *testing.T) { // Asserts that the first and only call to Bar() is passed 99. // Anything else will fail. m.EXPECT().GetSelfPeer().Return(leader) - m.EXPECT().SendMessage(gomock.Any(), gomock.Any()).Times(2) + m.EXPECT().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, gomock.Any()).Times(2) priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") host, err := p2pimpl.NewHost(&leader, priKey) diff --git a/drand/drand_leader.go b/drand/drand_leader.go index b9cd2b35c..646bdbf81 100644 --- a/drand/drand_leader.go +++ b/drand/drand_leader.go @@ -74,12 +74,8 @@ func (dRand *DRand) init(epochBlock *types.Block) { (*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...) - if utils.UseLibP2P { - utils.GetLogInstance().Info("[DRG] sent init", "msg", msgToSend, "leader.PubKey", dRand.leader.PubKey) - dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil) - } + utils.GetLogInstance().Info("[DRG] sent init", "msg", msgToSend, "leader.PubKey", dRand.leader.PubKey) + dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } // ProcessMessageLeader dispatches messages for the leader to corresponding processors. diff --git a/drand/drand_validator.go b/drand/drand_validator.go index 23a2847ee..69cb79906 100644 --- a/drand/drand_validator.go +++ b/drand/drand_validator.go @@ -52,9 +52,5 @@ func (dRand *DRand) processInitMessage(message drand_proto.Message) { msgToSend := dRand.constructCommitMessage(rand, proof) // Send the commit message back to leader - if utils.UseLibP2P { - dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) - } else { - host.SendMessage(dRand.host, dRand.leader, msgToSend, nil) - } + dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } diff --git a/internal/beaconchain/README.md b/internal/beaconchain/README.md deleted file mode 100644 index bb5016f53..000000000 --- a/internal/beaconchain/README.md +++ /dev/null @@ -1,3 +0,0 @@ -The beaconchain package currently is a centralized service that allocates every potential new node (uses newnode package) a specific shard. -If N is the number of shards, supplied as a parameter at bootup, then first N joining nodes are assigned to be the leaders of those N shards. The nodes that come after that are then assigned shards based on their order of entry. -In the future, the generation of randomness would be decentralized. Such randomness would be provided to a new node once its PoS has been verified and then the node would be able to calculate its own shard automatically. \ No newline at end of file diff --git a/internal/beaconchain/libs/beaconchain.go b/internal/beaconchain/libs/beaconchain.go deleted file mode 100644 index 5476e53a5..000000000 --- a/internal/beaconchain/libs/beaconchain.go +++ /dev/null @@ -1,200 +0,0 @@ -package beaconchain - -import ( - "math/rand" - "os" - "strconv" - "sync" - - "github.com/harmony-one/bls/ffi/go/bls" - "github.com/harmony-one/harmony/api/proto/bcconn" - proto_identity "github.com/harmony-one/harmony/api/proto/identity" - "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/crypto/pki" - beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2p/host" - "github.com/harmony-one/harmony/p2p/p2pimpl" - p2p_crypto "github.com/libp2p/go-libp2p-crypto" - peer "github.com/libp2p/go-libp2p-peer" -) - -//BCState keeps track of the state the beaconchain is in -type BCState int - -var mutex sync.Mutex -var identityPerBlock = 100000 - -// BeaconchainServicePortDiff is the positive port diff from beacon chain's self port -const BeaconchainServicePortDiff = 4444 - -//BCInfo is the information that needs to be stored on the disk in order to allow for a restart. -type BCInfo struct { - Leaders []*node.Info `json:"leaders"` - ShardLeaderMap map[int]*node.Info `json:"shardLeaderMap"` - NumberOfShards int `json:"numShards"` - NumberOfNodesAdded int `json:"numNodesAdded"` - IP string `json:"ip"` - Port string `json:"port"` -} - -// BeaconChain (Blockchain) keeps Identities per epoch, currently centralized! -type BeaconChain struct { - BCInfo BCInfo - ShardLeaderMap map[int]*node.Info - PubKey *bls.PublicKey - host p2p.Host - state BCState - rpcServer *beaconchain.Server - Peer p2p.Peer - Self p2p.Peer // self Peer -} - -//SaveFile is to store the file in which beaconchain info will be stored. -var SaveFile string - -// Followings are the set of states of that beaconchain can be in. -const ( - NodeInfoReceived BCState = iota - RandomInfoSent -) - -// SupportRPC initializes and starts the rpc service -func (bc *BeaconChain) SupportRPC() { - bc.InitRPCServer() - bc.StartRPCServer() -} - -// InitRPCServer initializes Rpc server. -func (bc *BeaconChain) InitRPCServer() { - bc.rpcServer = beaconchain.NewServer(bc.GetShardLeaderMap) -} - -// StartRPCServer starts Rpc server. -func (bc *BeaconChain) StartRPCServer() { - port, err := strconv.Atoi(bc.BCInfo.Port) - if err != nil { - port = 0 - } - utils.GetLogInstance().Info("support_client: StartRpcServer on port:", "port", strconv.Itoa(port+BeaconchainServicePortDiff)) - bc.rpcServer.Start(bc.BCInfo.IP, strconv.Itoa(port+BeaconchainServicePortDiff)) -} - -// GetShardLeaderMap returns the map from shard id to leader. -func (bc *BeaconChain) GetShardLeaderMap() map[int]*node.Info { - result := make(map[int]*node.Info) - for i, leader := range bc.BCInfo.Leaders { - result[i] = leader - } - return result -} - -//New beaconchain initialization -func New(numShards int, ip, port string, key p2p_crypto.PrivKey) *BeaconChain { - bc := BeaconChain{} - bc.PubKey = generateBCKey() - bc.Self = p2p.Peer{IP: ip, Port: port} - bc.host, _ = p2pimpl.NewHost(&bc.Self, key) - bcinfo := &BCInfo{NumberOfShards: numShards, NumberOfNodesAdded: 0, - IP: ip, - Port: port, - ShardLeaderMap: make(map[int]*node.Info)} - bc.BCInfo = *bcinfo - return &bc -} - -func generateBCKey() *bls.PublicKey { - r := rand.Intn(1000) - priKey := pki.GetBLSPrivateKeyFromInt(r) - pubkey := priKey.GetPublicKey() - return pubkey -} - -//AcceptNodeInfo deserializes node information received via beaconchain handler -func (bc *BeaconChain) AcceptNodeInfo(b []byte) *node.Info { - Node := bcconn.DeserializeNodeInfo(b) - utils.GetLogInstance().Info("New Node Connection", "IP", Node.IP, "Port", Node.Port, "PeerID", Node.PeerID) - bc.Peer = p2p.Peer{IP: Node.IP, Port: Node.Port, PeerID: Node.PeerID} - bc.host.AddPeer(&bc.Peer) - - bc.BCInfo.NumberOfNodesAdded = bc.BCInfo.NumberOfNodesAdded + 1 - shardNum, isLeader := utils.AllocateShard(bc.BCInfo.NumberOfNodesAdded, bc.BCInfo.NumberOfShards) - if isLeader { - bc.BCInfo.Leaders = append(bc.BCInfo.Leaders, Node) - bc.BCInfo.ShardLeaderMap[shardNum] = Node - } - go SaveBeaconChainInfo(SaveFile, bc) - bc.state = NodeInfoReceived - return Node -} - -//RespondRandomness sends a randomness beacon to the node inorder for it process what shard it will be in -func (bc *BeaconChain) RespondRandomness(Node *node.Info) { - bci := bc.BCInfo - response := bcconn.ResponseRandomNumber{NumberOfShards: bci.NumberOfShards, NumberOfNodesAdded: bci.NumberOfNodesAdded, Leaders: bci.Leaders} - msg := bcconn.SerializeRandomInfo(response) - msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg) - utils.GetLogInstance().Info("Sent Out Msg", "# Nodes", response.NumberOfNodesAdded) - for i, n := range response.Leaders { - utils.GetLogInstance().Info("Sent Out Msg", "leader", i, "nodeInfo", n.PeerID) - } - host.SendMessage(bc.host, bc.Peer, msgToSend, nil) - bc.state = RandomInfoSent -} - -//AcceptConnections welcomes new connections -func (bc *BeaconChain) AcceptConnections(b []byte) { - node := bc.AcceptNodeInfo(b) - bc.RespondRandomness(node) -} - -//StartServer a server and process the request by a handler. -func (bc *BeaconChain) StartServer() { - bc.host.BindHandlerAndServe(bc.BeaconChainHandler) -} - -//SaveBeaconChainInfo to disk -func SaveBeaconChainInfo(filePath string, bc *BeaconChain) error { - bci := BCtoBCI(bc) - err := utils.Save(filePath, bci) - return err -} - -//LoadBeaconChainInfo from disk -func LoadBeaconChainInfo(path string) (*BeaconChain, error) { - bci := &BCInfo{} - var err error - if _, err := os.Stat(path); err != nil { - return nil, err - } - err = utils.Load(path, bci) - var bc *BeaconChain - if err != nil { - return nil, err - } - bc = BCItoBC(bci) - return bc, err -} - -// BCtoBCI converts beaconchain into beaconchaininfo -func BCtoBCI(bc *BeaconChain) *BCInfo { - bci := &BCInfo{Leaders: bc.BCInfo.Leaders, ShardLeaderMap: bc.BCInfo.ShardLeaderMap, NumberOfShards: bc.BCInfo.NumberOfShards, NumberOfNodesAdded: bc.BCInfo.NumberOfNodesAdded, IP: bc.BCInfo.IP, Port: bc.BCInfo.Port} - return bci -} - -//BCItoBC converts beconchaininfo to beaconchain -func BCItoBC(bci *BCInfo) *BeaconChain { - bc := &BeaconChain{BCInfo: *bci} - return bc -} - -//SetSaveFile sets the filepath where beaconchain will be saved -func SetSaveFile(path string) { - SaveFile = path -} - -//GetID return ID -func (bc *BeaconChain) GetID() peer.ID { - return bc.host.GetID() -} diff --git a/internal/beaconchain/libs/beaconchain_handler.go b/internal/beaconchain/libs/beaconchain_handler.go deleted file mode 100644 index b2dffc35f..000000000 --- a/internal/beaconchain/libs/beaconchain_handler.go +++ /dev/null @@ -1,59 +0,0 @@ -package beaconchain - -import ( - "github.com/harmony-one/harmony/api/proto" - proto_identity "github.com/harmony-one/harmony/api/proto/identity" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" -) - -// BeaconChainHandler handles registration of new Identities -func (bc *BeaconChain) BeaconChainHandler(s p2p.Stream) { - content, err := p2p.ReadMessageContent(s) - if err != nil { - utils.GetLogInstance().Error("Read p2p data failed") - return - } - msgCategory, err := proto.GetMessageCategory(content) - if err != nil { - utils.GetLogInstance().Error("Read message category failed", "err", err) - return - } - msgType, err := proto.GetMessageType(content) - if err != nil { - utils.GetLogInstance().Error("Read action type failed") - return - } - msgPayload, err := proto.GetMessagePayload(content) - if err != nil { - utils.GetLogInstance().Error("Read message payload failed") - return - } - identityMsgPayload, err := proto_identity.GetIdentityMessagePayload(msgPayload) - if err != nil { - utils.GetLogInstance().Error("Read message payload failed") - return - } - switch msgCategory { - case proto.Identity: - actionType := proto_identity.IDMessageType(msgType) - switch actionType { - case proto_identity.Identity: - utils.GetLogInstance().Info("Message category is of the type identity protocol, which is correct!") - idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload) - if err != nil { - utils.GetLogInstance().Error("Error finding the identity message type") - } - switch idMsgType { - case proto_identity.Register: - utils.GetLogInstance().Info("Identity Message Type is of the type Register") - bc.AcceptConnections(identityMsgPayload) - default: - utils.GetLogInstance().Error("Unrecognized identity message type", "type", idMsgType) - } - default: - utils.GetLogInstance().Error("Unrecognized message category", "actionType", actionType) - } - - } -} diff --git a/internal/beaconchain/libs/beaconchain_test.go b/internal/beaconchain/libs/beaconchain_test.go deleted file mode 100644 index ebabfc95f..000000000 --- a/internal/beaconchain/libs/beaconchain_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package beaconchain - -import ( - "log" - "os" - "reflect" - "strconv" - "testing" - - "github.com/harmony-one/harmony/api/proto/bcconn" - "github.com/harmony-one/harmony/api/proto/node" - beaconchain "github.com/harmony-one/harmony/internal/beaconchain/rpc" - "github.com/harmony-one/harmony/internal/utils" - "github.com/stretchr/testify/assert" -) - -var ( - leader1 = &node.Info{IP: "127.0.0.1", Port: "9981"} - leader2 = &node.Info{IP: "127.0.0.1", Port: "9982"} - leaders = []*node.Info{leader1, leader2} - shardLeaderMap = map[int]*node.Info{ - 0: leader1, - 1: leader2, - } -) - -func TestNewNode(t *testing.T) { - var ip, port string - ip = "127.0.0.1" - port = "7523" - numshards := 2 - priKey, _, _ := utils.GenKeyP2P(ip, port) - bc := New(numshards, ip, port, priKey) - - if bc.PubKey == nil { - t.Error("beacon chain public key not initialized") - } - - if bc.BCInfo.NumberOfNodesAdded != 0 { - t.Error("beacon chain number of nodes starting with is not zero! (should be zero)") - } - - if bc.BCInfo.NumberOfShards != numshards { - t.Error("beacon chain number of shards not initialized to given number of desired shards") - } -} - -func TestShardLeaderMap(t *testing.T) { - var ip string - ip = "127.0.0.1" - beaconport := "7523" - numshards := 1 - priKey, _, _ := utils.GenKeyP2P(ip, beaconport) - bc := New(numshards, ip, beaconport, priKey) - bc.BCInfo.Leaders = leaders - if !reflect.DeepEqual(bc.GetShardLeaderMap(), shardLeaderMap) { - t.Error("The function GetShardLeaderMap doesn't work well") - } - -} - -func TestFetchLeaders(t *testing.T) { - var ip string - ip = "127.0.0.1" - beaconport := "7523" - numshards := 1 - priKey, _, _ := utils.GenKeyP2P(ip, beaconport) - bc := New(numshards, ip, beaconport, priKey) - bc.BCInfo.Leaders = leaders - bc.rpcServer = beaconchain.NewServer(bc.GetShardLeaderMap) - bc.StartRPCServer() - port, _ := strconv.Atoi(beaconport) - bcClient := beaconchain.NewClient("127.0.0.1", strconv.Itoa(port+BeaconchainServicePortDiff)) - response := bcClient.GetLeaders() - retleaders := response.GetLeaders() - if !(retleaders[0].GetIp() == leaders[0].IP || retleaders[0].GetPort() == leaders[0].Port || retleaders[1].GetPort() == leaders[1].Port) { - t.Error("Fetch leaders response is not as expected") - } - -} - -func TestAcceptNodeInfo(t *testing.T) { - var ip string - ip = "127.0.0.1" - beaconport := "7523" - numshards := 1 - priKey, _, _ := utils.GenKeyP2P(ip, beaconport) - bc := New(numshards, ip, beaconport, priKey) - b := bcconn.SerializeNodeInfo(leader1) - node := bc.AcceptNodeInfo(b) - if !reflect.DeepEqual(node, leader1) { - t.Error("Beaconchain is unable to deserialize incoming node info") - } - if len(bc.BCInfo.Leaders) != 1 { - t.Error("Beaconchain was unable to update the leader array") - } - -} - -func TestRespondRandomness(t *testing.T) { - var ip string - ip = "127.0.0.1" - beaconport := "7523" - numshards := 1 - priKey, _, _ := utils.GenKeyP2P(ip, beaconport) - bc := New(numshards, ip, beaconport, priKey) - bc.RespondRandomness(leader1) - assert.Equal(t, RandomInfoSent, bc.state) -} - -func TestAcceptConnections(t *testing.T) { - var ip string - ip = "127.0.0.1" - beaconport := "7523" - numshards := 1 - priKey, _, _ := utils.GenKeyP2P(ip, beaconport) - bc := New(numshards, ip, beaconport, priKey) - b := bcconn.SerializeNodeInfo(leader1) - bc.AcceptConnections(b) - assert.Equal(t, RandomInfoSent, bc.state) -} - -func TestSaveBC(t *testing.T) { - var ip, port string - ip = "127.0.0.1" - port = "7523" - numshards := 2 - bci := &BCInfo{IP: ip, Port: port, NumberOfShards: numshards} - bc := &BeaconChain{BCInfo: *bci} - err := SaveBeaconChainInfo("test.json", bc) - if err != nil { - log.Fatalln(err) - } - bc2, err2 := LoadBeaconChainInfo("test.json") - if err2 != nil { - log.Fatalln(err2) - } - if !reflect.DeepEqual(bc, bc2) { - t.Error("beacon chain info objects are not same") - } - os.Remove("test.json") -} - -func TestSaveFile(t *testing.T) { - filepath := "test" - SetSaveFile(filepath) - if !reflect.DeepEqual(filepath, SaveFile) { - t.Error("Could not set savefile") - } -} diff --git a/internal/beaconchain/rpc/client.go b/internal/beaconchain/rpc/client.go deleted file mode 100644 index a8a4f1f57..000000000 --- a/internal/beaconchain/rpc/client.go +++ /dev/null @@ -1,51 +0,0 @@ -package beaconchain - -import ( - "context" - "fmt" - "log" - "time" - - proto "github.com/harmony-one/harmony/api/beaconchain" - - "google.golang.org/grpc" -) - -// Client is the client model for beaconchain service. -type Client struct { - beaconChainServiceClient proto.BeaconChainServiceClient - opts []grpc.DialOption - conn *grpc.ClientConn -} - -// NewClient setups a Client given ip and port. -func NewClient(ip, port string) *Client { - client := Client{} - client.opts = append(client.opts, grpc.WithInsecure()) - var err error - client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) - if err != nil { - log.Fatalf("fail to dial: %v", err) - return nil - } - - client.beaconChainServiceClient = proto.NewBeaconChainServiceClient(client.conn) - return &client -} - -// Close closes the Client. -func (client *Client) Close() { - client.conn.Close() -} - -// GetLeaders gets current leaders from beacon chain -func (client *Client) GetLeaders() *proto.FetchLeadersResponse { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - request := &proto.FetchLeadersRequest{} - response, err := client.beaconChainServiceClient.FetchLeaders(ctx, request) - if err != nil { - log.Fatalf("Error fetching leaders from beacon chain: %s", err) - } - return response -} diff --git a/internal/beaconchain/rpc/server.go b/internal/beaconchain/rpc/server.go deleted file mode 100644 index 4542918f8..000000000 --- a/internal/beaconchain/rpc/server.go +++ /dev/null @@ -1,51 +0,0 @@ -package beaconchain - -import ( - "context" - "log" - "net" - - "github.com/harmony-one/harmony/api/proto/node" - - "google.golang.org/grpc" - - proto "github.com/harmony-one/harmony/api/beaconchain" -) - -// Server is the Server struct for beacon chain package. -type Server struct { - shardLeaderMap func() map[int]*node.Info -} - -// FetchLeaders implements the FetchLeaders interface to return current leaders. -func (s *Server) FetchLeaders(ctx context.Context, request *proto.FetchLeadersRequest) (*proto.FetchLeadersResponse, error) { - log.Println("Returning FetchLeadersResponse") - - leaders := []*proto.FetchLeadersResponse_Leader{} - for shardID, leader := range s.shardLeaderMap() { - leaders = append(leaders, &proto.FetchLeadersResponse_Leader{Ip: leader.IP, Port: leader.Port, ShardId: uint32(shardID), PeerID: leader.PeerID.Pretty()}) - } - log.Println(leaders) - return &proto.FetchLeadersResponse{Leaders: leaders}, nil -} - -// Start starts the Server on given ip and port. -func (s *Server) Start(ip, port string) (*grpc.Server, error) { - // TODO(minhdoan): Currently not using ip. Fix it later. - addr := net.JoinHostPort("", port) - lis, err := net.Listen("tcp", addr) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - var opts []grpc.ServerOption - grpcServer := grpc.NewServer(opts...) - proto.RegisterBeaconChainServiceServer(grpcServer, s) - go grpcServer.Serve(lis) - return grpcServer, nil -} - -// NewServer creates new Server which implements BeaconChainServiceServer interface. -func NewServer(shardLeaderMap func() map[int]*node.Info) *Server { - s := &Server{shardLeaderMap} - return s -} diff --git a/internal/newnode/README.md b/internal/newnode/README.md deleted file mode 100644 index e1b7061ea..000000000 --- a/internal/newnode/README.md +++ /dev/null @@ -1 +0,0 @@ -Newnode package is for handling the interactions of a new candidate node that wants to join the network. Such interaction at the moment is about contacting the beaconchain and getting assigned a shard and findingout the shardleader. In future this package will be merged into the node package. \ No newline at end of file diff --git a/internal/newnode/newnode.go b/internal/newnode/newnode.go deleted file mode 100644 index 564ded0fa..000000000 --- a/internal/newnode/newnode.go +++ /dev/null @@ -1,170 +0,0 @@ -package newnode - -import ( - "errors" - "fmt" - "os" - "strconv" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/harmony-one/bls/ffi/go/bls" - "github.com/harmony-one/harmony/api/proto/bcconn" - proto_identity "github.com/harmony-one/harmony/api/proto/identity" - proto_node "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2p/host" - "github.com/harmony-one/harmony/p2p/p2pimpl" - - p2p_crypto "github.com/libp2p/go-libp2p-crypto" - multiaddr "github.com/multiformats/go-multiaddr" -) - -//NewNode is the struct for a candidate node -type NewNode struct { - Role string - ShardID int - ValidatorID int // Validator ID in its shard. - leader p2p.Peer - isLeader bool - Self p2p.Peer - Leaders map[uint32]p2p.Peer - PubK *bls.PublicKey - priK *bls.SecretKey - log log.Logger - SetInfo chan bool - host p2p.Host -} - -// New candidate node initialization -func New(ip string, port string, nodePk p2p_crypto.PrivKey) *NewNode { - priKey, pubKey := utils.GenKey(ip, port) - var node NewNode - var err error - node.PubK = pubKey - node.priK = priKey - node.Self = p2p.Peer{IP: ip, Port: port, PubKey: pubKey, ValidatorID: -1} - node.log = utils.GetLogInstance() - node.SetInfo = make(chan bool) - node.host, err = p2pimpl.NewHost(&node.Self, nodePk) - if err != nil { - node.log.Error("failed to create new host", "msg", err) - return nil - } - node.Leaders = map[uint32]p2p.Peer{} - return &node -} - -type registerResponseRandomNumber struct { - NumberOfShards int - NumberOfNodesAdded int - Leaders []*proto_node.Info -} - -// ContactBeaconChain starts a newservice in the candidate node -func (node *NewNode) ContactBeaconChain(BCPeer p2p.Peer) error { - go node.host.BindHandlerAndServe(node.NodeHandler) - return node.requestBeaconChain(BCPeer) -} - -func (node NewNode) String() string { - return fmt.Sprintf("bc: %v:%v => %v", node.Self.IP, node.Self.Port, node.Self.PeerID) -} - -// RequestBeaconChain requests beacon chain for identity data -func (node *NewNode) requestBeaconChain(BCPeer p2p.Peer) (err error) { - node.log.Info("connecting to beacon chain now ...") - pubk := node.PubK.Serialize() - if err != nil { - node.log.Error("Could not Marshall public key into binary") - } - fmt.Printf("[New Node]: %v\n", *node) - nodeInfo := &proto_node.Info{IP: node.Self.IP, Port: node.Self.Port, PubKey: pubk, PeerID: node.Self.PeerID} - msg := bcconn.SerializeNodeInfo(nodeInfo) - msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Register, msg) - gotShardInfo := false - timeout := time.After(2 * time.Minute) - tick := time.Tick(3 * time.Second) -checkLoop: - for { - select { - case <-timeout: - gotShardInfo = false - break checkLoop - case <-tick: - select { - case setinfo := <-node.SetInfo: - if setinfo { - gotShardInfo = true - break checkLoop - } - default: - host.SendMessage(node.host, BCPeer, msgToSend, nil) - } - } - } - if !gotShardInfo { - err = errors.New("could not create connection") - node.log.Crit("Could not get sharding info after 2 minutes") - os.Exit(10) - } - return -} - -// ProcessShardInfo -func (node *NewNode) processShardInfo(msgPayload []byte) bool { - leadersInfo := bcconn.DeserializeRandomInfo(msgPayload) - leaders := leadersInfo.Leaders - shardNum, isLeader := utils.AllocateShard(leadersInfo.NumberOfNodesAdded, leadersInfo.NumberOfShards) - for n, v := range leaders { - leaderPeer := p2p.Peer{IP: v.IP, Port: v.Port, PeerID: v.PeerID} - - addr := fmt.Sprintf("/ip4/%s/tcp/%s", leaderPeer.IP, leaderPeer.Port) - targetAddr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - log.Error("processShardInfo NewMultiaddr error", "error", err) - return false - } - leaderPeer.Addrs = append(leaderPeer.Addrs, targetAddr) - - leaderPeer.PubKey = &bls.PublicKey{} - err = leaderPeer.PubKey.Deserialize(v.PubKey[:]) - if err != nil { - node.log.Error("Could not unmarshall leaders public key from binary") - } - node.Leaders[uint32(n)] = leaderPeer - } - - node.leader = node.Leaders[uint32(shardNum-1)] - node.isLeader = isLeader - node.ShardID = shardNum - 1 //0 indexing. - node.SetInfo <- true - node.log.Info("Shard information obtained ..") - return true -} - -// GetShardID gives shardid of node -func (node *NewNode) GetShardID() string { - return strconv.Itoa(node.ShardID) -} - -// GetLeader gives the leader of the node -func (node *NewNode) GetLeader() p2p.Peer { - return node.leader -} - -// GetClientPeer gives the client of the node -func (node *NewNode) GetClientPeer() *p2p.Peer { - return nil -} - -// GetSelfPeer gives the peer part of the node's own struct -func (node *NewNode) GetSelfPeer() p2p.Peer { - return node.Self -} - -// AddPeer add new peer for newnode -func (node *NewNode) AddPeer(p *p2p.Peer) error { - return node.host.AddPeer(p) -} diff --git a/internal/newnode/newnode_handler.go b/internal/newnode/newnode_handler.go deleted file mode 100644 index 582a2eb93..000000000 --- a/internal/newnode/newnode_handler.go +++ /dev/null @@ -1,63 +0,0 @@ -package newnode - -import ( - "time" - - "github.com/harmony-one/harmony/api/proto" - proto_identity "github.com/harmony-one/harmony/api/proto/identity" - "github.com/harmony-one/harmony/p2p" -) - -// NodeHandler handles a new incoming connection. -func (node *NewNode) NodeHandler(s p2p.Stream) { - defer s.Close() - defer node.host.Close() - s.SetReadDeadline(time.Now().Add(1 * time.Second)) // This deadline is for 1 second to accept new connections. - content, err := p2p.ReadMessageContent(s) - if err != nil { - node.log.Error("Read p2p data failed", "err", err, "node", node) - return - } - - msgCategory, err := proto.GetMessageCategory(content) - if err != nil { - node.log.Error("Read node type failed", "err", err, "node", node) - return - } - - msgType, err := proto.GetMessageType(content) - if err != nil { - node.log.Error("Read action type failed", "err", err, "node", node) - return - } - - msgPayload, err := proto.GetMessagePayload(content) - if err != nil { - node.log.Error("Read message payload failed", "err", err, "node", node) - return - } - identityMsgPayload, err := proto_identity.GetIdentityMessagePayload(msgPayload) - if err != nil { - node.log.Error("Read message payload failed") - return - } - switch msgCategory { - case proto.Identity: - actionType := proto_identity.IDMessageType(msgType) - switch actionType { - case proto_identity.Identity: - idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload) - if err != nil { - node.log.Error("Error finding the identity message type", err) - } - switch idMsgType { - case proto_identity.Acknowledge: - node.processShardInfo(identityMsgPayload) - default: - panic("The identity message type is wrong/missing and newnode does not handle this identity message type") - } - default: - panic("The msgCategory is wrong/missing and newnode does not handle this protocol message type") - } - } -} diff --git a/internal/newnode/newnode_test.go b/internal/newnode/newnode_test.go deleted file mode 100644 index 5f6ede7cb..000000000 --- a/internal/newnode/newnode_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package newnode - -import ( - "fmt" - "testing" - "time" - - beaconchain "github.com/harmony-one/harmony/internal/beaconchain/libs" - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - peerstore "github.com/libp2p/go-libp2p-peerstore" - multiaddr "github.com/multiformats/go-multiaddr" -) - -func TestNewNode(t *testing.T) { - var ip, port string - ip = "127.0.0.1" - port = "8088" - priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "8088") - nnode := New(ip, port, priKey) - - if nnode.PubK == nil { - t.Error("new node public key not initialized") - } -} - -func TestBeaconChainConnect(t *testing.T) { - var ip, beaconport, bcma, nodeport string - - ip = "127.0.0.1" - beaconport = "8081" - nodeport = "9081" - - priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9081") - nnode := New(ip, nodeport, priKey) - - priKey, _, _ = utils.GenKeyP2P("127.0.0.1", "8081") - bc := beaconchain.New(1, ip, beaconport, priKey) - - bcma = fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", bc.Self.IP, bc.Self.Port, bc.GetID().Pretty()) - - go bc.StartServer() - time.Sleep(3 * time.Second) - - maddr, err := multiaddr.NewMultiaddr(bcma) - if err != nil { - t.Errorf("new multiaddr error: %v", err) - } - - // Extract the peer ID from the multiaddr. - info, err2 := peerstore.InfoFromP2pAddr(maddr) - if err2 != nil { - t.Errorf("info from p2p addr error: %v", err2) - } - - BCPeer := &p2p.Peer{IP: ip, Port: beaconport, Addrs: info.Addrs, PeerID: info.ID} - - nnode.AddPeer(BCPeer) - - err3 := nnode.ContactBeaconChain(*BCPeer) - - if err3 != nil { - t.Errorf("could not read from connection: %v", err3) - } -} diff --git a/internal/utils/singleton.go b/internal/utils/singleton.go index 48a93a2fd..5735e7a7a 100644 --- a/internal/utils/singleton.go +++ b/internal/utils/singleton.go @@ -13,10 +13,6 @@ import ( var ( Port string IP string - // Global Variable to use libp2p for networking - // FIXME: this is a temporary hack, once we totally switch to libp2p - // this variable shouldn't be used - UseLibP2P bool ) // SetPortAndIP used to print out loggings of node with Port and IP. diff --git a/node/node.go b/node/node.go index 5c6312759..1140e654f 100644 --- a/node/node.go +++ b/node/node.go @@ -15,9 +15,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/api/client" clientService "github.com/harmony-one/harmony/api/client/service" - proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" msg_pb "github.com/harmony-one/harmony/api/proto/message" - proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" @@ -140,9 +138,6 @@ type Node struct { // The p2p host used to send/receive p2p messages host p2p.Host - // Channel to stop sending ping message - StopPing chan struct{} - // Signal channel for lost validators OfflinePeers chan p2p.Peer @@ -218,11 +213,7 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions { // StartServer starts a server and process the requests by a handler. func (node *Node) StartServer() { - if utils.UseLibP2P { - select {} - } else { - node.host.BindHandlerAndServe(node.StreamHandler) - } + select {} } // Count the total number of transactions in the blockchain @@ -282,7 +273,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N } // Setup initial state of syncing. - node.StopPing = make(chan struct{}) node.peerRegistrationRecord = make(map[uint32]*syncConfig) node.OfflinePeers = make(chan p2p.Peer) @@ -291,11 +281,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N // start the goroutine to receive group message go node.ReceiveGroupMessage() - if utils.UseLibP2P { - node.startConsensus = make(chan struct{}) - } else { - node.startConsensus = nil - } + node.startConsensus = make(chan struct{}) return &node } @@ -325,35 +311,6 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int { return count } -// JoinShard helps a new node to join a shard. -func (node *Node) JoinShard(leader p2p.Peer) { - // try to join the shard, send ping message every 1 second, with a 10 minutes time-out - tick := time.NewTicker(1 * time.Second) - timeout := time.NewTicker(10 * time.Minute) - defer tick.Stop() - defer timeout.Stop() - - for { - select { - case <-tick.C: - ping := proto_discovery.NewPingMessage(node.SelfPeer) - if node.Client != nil { // assume this is the client node - ping.Node.Role = proto_node.ClientRole - } - buffer := ping.ConstructPingMessage() - - // Talk to leader. - node.SendMessage(leader, buffer) - case <-timeout.C: - utils.GetLogInstance().Info("JoinShard timeout") - return - case <-node.StopPing: - utils.GetLogInstance().Info("Stopping JoinShard") - return - } - } -} - // CalculateResponse implements DownloadInterface on Node object. func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) { response := &downloader_pb.DownloaderResponse{} diff --git a/node/node_handler.go b/node/node_handler.go index dffa607cb..57878e29e 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -29,29 +29,6 @@ const ( MaxNumberOfTransactionsPerBlock = 8000 ) -// MaybeBroadcastAsValidator returns if the node is a validator node. -func (node *Node) MaybeBroadcastAsValidator(content []byte) { - // TODO: this is tree-based broadcasting. this needs to be replaced by p2p gossiping. - if node.SelfPeer.ValidatorID > 0 && node.SelfPeer.ValidatorID <= host.MaxBroadCast { - go host.BroadcastMessageFromValidator(node.host, node.SelfPeer, node.Consensus.GetValidatorPeers(), content) - } -} - -// StreamHandler handles a new incoming network message. -func (node *Node) StreamHandler(s p2p.Stream) { - defer s.Close() - - // Read p2p message payload - content, err := p2p.ReadMessageContent(s) - - if err != nil { - utils.GetLogInstance().Error("Read p2p data failed", "err", err, "node", node) - return - } - - node.messageHandler(content, "") -} - // ReceiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages func (node *Node) ReceiveGroupMessage() { ctx := context.Background() @@ -282,11 +259,7 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { func (node *Node) BroadcastNewBlock(newBlock *types.Block) { if node.ClientPeer != nil { utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer) - if utils.UseLibP2P { - node.host.SendMessageToGroups([]p2p.GroupID{node.MyClientGroupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))) - } else { - node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) - } + node.host.SendMessageToGroups([]p2p.GroupID{node.MyClientGroupID}, host.ConstructP2pMessage(byte(0), proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))) } } @@ -375,9 +348,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int { // add to incoming peer list //node.host.AddIncomingPeer(*peer) - if utils.UseLibP2P { - node.host.ConnectHostPeer(*peer) - } + node.host.ConnectHostPeer(*peer) if ping.Node.Role == proto_node.ClientRole { utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer) @@ -388,29 +359,6 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int { // Add to Node's peer list anyway node.AddPeers([]*p2p.Peer{peer}) - // This is the old way of broadcasting pong message - if node.Consensus.IsLeader && !utils.UseLibP2P { - peers := node.Consensus.GetValidatorPeers() - pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey()) - buffer := pong.ConstructPongMessage() - - // Send a Pong message directly to the sender - // This is necessary because the sender will need to get a ValidatorID - // Just broadcast won't work, some validators won't receive the latest - // PublicKeys as we rely on a valid ValidatorID to do broadcast. - // This is very buggy, but we will move to libp2p, hope the problem will - // be resolved then. - // However, I disable it for now as we are sending redundant PONG messages - // to all validators. This may not be needed. But it maybe add back. - // p2p.SendMessage(*peer, buffer) - - // Broadcast the message to all validators, as publicKeys is updated - // FIXME: HAR-89 use a separate nodefind/neighbor message - - host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers) - // utils.GetLogInstance().Info("PingMsgHandler send pong message") - } - return 1 } @@ -518,10 +466,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { if node.State == NodeWaitToJoin { node.State = NodeReadyForConsensus - // Notify JoinShard to stop sending Ping messages - if node.StopPing != nil { - node.StopPing <- struct{}{} - } } // Stop discovery service after received pong message diff --git a/node/node_handler_test.go b/node/node_handler_test.go index f24a0e6cb..0cd999108 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -3,37 +3,12 @@ package node import ( "testing" - "github.com/golang/mock/gomock" "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" ) -func TestNodeStreamHandler(t *testing.T) { - _, pubKey := utils.GenKey("1", "2") - leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", PubKey: pubKey} - validator := p2p.Peer{IP: "127.0.0.1", Port: "8885"} - priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") - host, err := p2pimpl.NewHost(&leader, priKey) - if err != nil { - t.Fatalf("newhost failure: %v", err) - } - consensus := consensus.New(host, "0", []p2p.Peer{leader, validator}, leader) - node := New(host, consensus, nil) - - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - m := p2p.NewMockStream(ctrl) - - m.EXPECT().Read(gomock.Any()).AnyTimes() - m.EXPECT().SetReadDeadline(gomock.Any()) - m.EXPECT().Close() - - node.StreamHandler(m) -} - func TestAddNewBlock(t *testing.T) { _, pubKey := utils.GenKey("1", "2") leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", PubKey: pubKey} diff --git a/node/node_test.go b/node/node_test.go index 23a828d35..deb221c38 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -120,13 +120,7 @@ func sendPingMessage(node *Node, leader p2p.Peer) { } ping1 := proto_discovery.NewPingMessage(p1) - buf1 := ping1.ConstructPingMessage() - - fmt.Println("waiting for 5 seconds ...") - time.Sleep(5 * time.Second) - - node.SendMessage(leader, buf1) - fmt.Println("sent ping message ...") + _ = ping1.ConstructPingMessage() } func sendPongMessage(node *Node, leader p2p.Peer) { @@ -147,13 +141,7 @@ func sendPongMessage(node *Node, leader p2p.Peer) { leaderPubKey := pki.GetBLSPrivateKeyFromInt(888).GetPublicKey() pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey) - buf1 := pong1.ConstructPongMessage() - - fmt.Println("waiting for 10 seconds ...") - time.Sleep(10 * time.Second) - - node.SendMessage(leader, buf1) - fmt.Println("sent pong message ...") + _ = pong1.ConstructPongMessage() } func exitServer() { diff --git a/node/p2p.go b/node/p2p.go index 1f38f0227..4d254a996 100644 --- a/node/p2p.go +++ b/node/p2p.go @@ -2,19 +2,8 @@ package node import ( "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2p/host" ) -// SendMessage sends data to ip, port -func (node *Node) SendMessage(p p2p.Peer, data []byte) { - host.SendMessage(node.host, p, data, nil) -} - -// BroadcastMessage broadcasts message to peers -func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) { - host.BroadcastMessage(node.host, peers, data, nil) -} - // GetHost returns the p2p host func (node *Node) GetHost() p2p.Host { return node.host diff --git a/p2p/host.go b/p2p/host.go index 69d098947..9de308c1c 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -10,8 +10,6 @@ import ( // Host is the client + server in p2p network. type Host interface { GetSelfPeer() Peer - SendMessage(Peer, []byte) error - BindHandlerAndServe(handler StreamHandler) Close() error AddPeer(*Peer) error GetID() libp2p_peer.ID diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index c531aa111..193e2d2ff 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -5,7 +5,6 @@ package hostv2 import ( "context" "fmt" - "io" "sync" "github.com/ethereum/go-ethereum/log" @@ -16,10 +15,8 @@ import ( libp2p "github.com/libp2p/go-libp2p" libp2p_crypto "github.com/libp2p/go-libp2p-crypto" libp2p_host "github.com/libp2p/go-libp2p-host" - libp2p_net "github.com/libp2p/go-libp2p-net" libp2p_peer "github.com/libp2p/go-libp2p-peer" libp2p_peerstore "github.com/libp2p/go-libp2p-peerstore" - libp2p_protocol "github.com/libp2p/go-libp2p-protocol" libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" ma "github.com/multiformats/go-multiaddr" ) @@ -199,46 +196,6 @@ func (host *HostV2) GetSelfPeer() p2p.Peer { return host.self } -// BindHandlerAndServe bind a streamHandler to the harmony protocol. -func (host *HostV2) BindHandlerAndServe(handler p2p.StreamHandler) { - host.h.SetStreamHandler(libp2p_protocol.ID(ProtocolID), func(s libp2p_net.Stream) { - handler(s) - }) - // Hang forever - <-make(chan struct{}) -} - -// SendMessage a p2p message sending function with signature compatible to p2pv1. -func (host *HostV2) SendMessage(p p2p.Peer, message []byte) error { - logger := host.logger.New("from", host.self, "to", p, "PeerID", p.PeerID) - err := host.Peerstore().AddProtocols(p.PeerID, ProtocolID) - if err != nil { - logger.Error("AddProtocols() failed", "error", err) - return p2p.ErrAddProtocols - } - s, err := host.h.NewStream(context.Background(), p.PeerID, libp2p_protocol.ID(ProtocolID)) - if err != nil { - logger.Error("NewStream() failed", "peerID", p.PeerID, - "protocolID", ProtocolID, "error", err) - return p2p.ErrNewStream - } - defer func() { - if err := s.Close(); err != nil { - logger.Warn("cannot close stream", "error", err) - } - }() - if nw, err := s.Write(message); err != nil { - logger.Error("Write() failed", "peerID", p.PeerID, - "protocolID", ProtocolID, "error", err) - return p2p.ErrMsgWrite - } else if nw < len(message) { - logger.Error("Short Write()", "expected", len(message), "actual", nw) - return io.ErrShortWrite - } - - return nil -} - // Close closes the host func (host *HostV2) Close() error { return host.h.Close() diff --git a/p2p/host/message.go b/p2p/host/message.go index af90a81ce..aeb3b2913 100644 --- a/p2p/host/message.go +++ b/p2p/host/message.go @@ -2,52 +2,8 @@ package host import ( "encoding/binary" - "net" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/harmony-one/harmony/p2p" ) -// SendMessage is to connect a socket given a port and send the given message. -// TODO(minhdoan, rj): need to check if a peer is reachable or not. -func SendMessage(host p2p.Host, p p2p.Peer, message []byte, lostPeer chan p2p.Peer) { - // Construct normal p2p message - content := ConstructP2pMessage(byte(0), message) - go send(host, p, content, lostPeer) -} - -// BroadcastMessage sends the message to a list of peers -func BroadcastMessage(h p2p.Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) { - if len(peers) == 0 { - return - } - // Construct broadcast p2p message - content := ConstructP2pMessage(byte(17), msg) - length := len(content) - - // log.Info("Start Broadcasting", "gomaxprocs", runtime.GOMAXPROCS(0), "Size", length) - // start := time.Now() - for _, peer := range peers { - peerCopy := peer - go send(h, peerCopy, content, lostPeer) - } - // log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds()) - - // Keep track of block propagation time - // Assume 1M message is for block propagation - if length > 1000000 { - log.Debug("NET: START BLOCK PROPAGATION", "Size", length) - } -} - -// BroadcastMessageFromLeader sends the message to a list of peers from a leader. -func BroadcastMessageFromLeader(h p2p.Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) { - // TODO(minhdoan): Enable back for multicast. - peers = SelectMyPeers(peers, 1, MaxBroadCast) - BroadcastMessage(h, peers, msg, lostPeer) -} - // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] func ConstructP2pMessage(msgType byte, content []byte) []byte { message := make([]byte, 5+len(content)) @@ -56,58 +12,3 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte { copy(message[5:], content) return message } - -// BroadcastMessageFromValidator sends the message to a list of peers from a validator. -func BroadcastMessageFromValidator(h p2p.Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) { - peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast) - BroadcastMessage(h, peers, msg, nil) -} - -// MaxBroadCast is the maximum number of neighbors to broadcast -const MaxBroadCast = 20 - -// SelectMyPeers chooses a list of peers based on the range of ValidatorID -// This is a quick hack of the current p2p networking model -func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer { - res := []p2p.Peer{} - for _, peer := range peers { - if peer.ValidatorID >= min && peer.ValidatorID <= max { - res = append(res, peer) - } - } - return res -} - -// Send a message to another node with given port. -func send(h p2p.Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) { - // Add attack code here. - //attack.GetInstance().Run() - backoff := p2p.NewExpBackoff(250*time.Millisecond, 5*time.Second, 2) - - for trial := 0; trial < 3; trial++ { - err := h.SendMessage(peer, message) - // No need to retry if new stream error or no error - if err == nil || err == p2p.ErrNewStream { - if trial > 0 { - log.Warn("retry send", "rety", trial) - } - return - } - log.Info("sleeping before trying to send again", - "duration", backoff.Cur, "addr", net.JoinHostPort(peer.IP, peer.Port)) - backoff.Sleep() - } - log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port)) - - if lostPeer != nil { - // Notify lostPeer channel - lostPeer <- peer - } -} - -// DialWithSocketClient joins host port and establishes connection -func DialWithSocketClient(ip, port string) (conn net.Conn, err error) { - addr := net.JoinHostPort(ip, port) - conn, err = net.Dial("tcp", addr) - return -} diff --git a/p2p/host/message_test.go b/p2p/host/message_test.go deleted file mode 100644 index 8ee2e4d44..000000000 --- a/p2p/host/message_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package host - -import ( - "fmt" - "reflect" - "testing" - "time" - - libp2p_peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" - - "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/p2p" - "github.com/harmony-one/harmony/p2p/p2pimpl" -) - -func TestSendMessage(test *testing.T) { - peer1 := p2p.Peer{IP: "127.0.0.1", Port: "9000"} - selfAddr1, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer1.Port)) - peer1.Addrs = append(peer1.Addrs, selfAddr1) - priKey1, pubKey1, _ := utils.GenKeyP2P(peer1.IP, peer1.Port) - peerID1, _ := libp2p_peer.IDFromPublicKey(pubKey1) - peer1.PeerID = peerID1 - host1, _ := p2pimpl.NewHost(&peer1, priKey1) - - peer2 := p2p.Peer{IP: "127.0.0.1", Port: "9001"} - selfAddr2, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", peer2.Port)) - peer2.Addrs = append(peer2.Addrs, selfAddr2) - priKey2, pubKey2, _ := utils.GenKeyP2P(peer2.IP, peer2.Port) - peerID2, _ := libp2p_peer.IDFromPublicKey(pubKey2) - peer2.PeerID = peerID2 - host2, _ := p2pimpl.NewHost(&peer2, priKey2) - - msg := []byte{0x00, 0x01, 0x02, 0x03, 0x04} - if err := host1.AddPeer(&peer2); err != nil { - test.Fatalf("cannot add peer2 to host1: %v", err) - } - - go host2.BindHandlerAndServe(handler) - SendMessage(host1, peer2, msg, nil) - time.Sleep(3 * time.Second) -} - -func handler(s p2p.Stream) { - defer func() { - if err := s.Close(); err != nil { - panic(fmt.Sprintf("Close(%v) failed: %v", s, err)) - } - }() - content, err := p2p.ReadMessageContent(s) - if err != nil { - panic("Read p2p data failed") - } - golden := []byte{0x00, 0x01, 0x02, 0x03, 0x04} - - if !reflect.DeepEqual(content, golden) { - panic("received message not equal original message") - } -} diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index ffb291064..bdf4b8720 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -49,32 +49,6 @@ func (mr *MockHostMockRecorder) GetSelfPeer() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSelfPeer", reflect.TypeOf((*MockHost)(nil).GetSelfPeer)) } -// SendMessage mocks base method -func (m *MockHost) SendMessage(arg0 p2p.Peer, arg1 []byte) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendMessage", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendMessage indicates an expected call of SendMessage -func (mr *MockHostMockRecorder) SendMessage(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessage", reflect.TypeOf((*MockHost)(nil).SendMessage), arg0, arg1) -} - -// BindHandlerAndServe mocks base method -func (m *MockHost) BindHandlerAndServe(handler p2p.StreamHandler) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "BindHandlerAndServe", handler) -} - -// BindHandlerAndServe indicates an expected call of BindHandlerAndServe -func (mr *MockHostMockRecorder) BindHandlerAndServe(handler interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BindHandlerAndServe", reflect.TypeOf((*MockHost)(nil).BindHandlerAndServe), handler) -} - // Close mocks base method func (m *MockHost) Close() error { m.ctrl.T.Helper() diff --git a/scripts/go_executable_build.sh b/scripts/go_executable_build.sh index 1911ce120..78e04f30d 100755 --- a/scripts/go_executable_build.sh +++ b/scripts/go_executable_build.sh @@ -5,8 +5,8 @@ export GO111MODULE=on declare -A SRC SRC[harmony]=cmd/harmony.go SRC[txgen]=cmd/client/txgen/main.go -SRC[wallet]=cmd/client/wallet/main.go SRC[bootnode]=cmd/bootnode/main.go +# SRC[wallet]=cmd/client/wallet/main.go BINDIR=bin BUCKET=unique-bucket-bin diff --git a/test/deploy.sh b/test/deploy.sh index 9f3064994..69fc24250 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -19,7 +19,7 @@ function check_result() { } function cleanup() { - for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|beacon\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`; + for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`; do echo 'Killed process: '$pid $DRYRUN kill -9 $pid 2> /dev/null @@ -57,7 +57,6 @@ USAGE: $ME [OPTIONS] config_file_name -k nodeport kill the node with specified port number (default: $KILLPORT) -n dryrun mode (default: $DRYRUN) -S enable sync test (default: $SYNC) - -P enable libp2p peer discovery test (default: $P2P) This script will build all the binaries and start harmony and txgen based on the configuration file. @@ -78,9 +77,8 @@ SHARDS=2 KILLPORT=9004 SYNC=true DRYRUN= -P2P=false -while getopts "hdtD:m:s:k:nSP" option; do +while getopts "hdtD:m:s:k:nS" option; do case $option in h) usage ;; d) DB='-db_supported' ;; @@ -91,7 +89,6 @@ while getopts "hdtD:m:s:k:nSP" option; do k) KILLPORT=$OPTARG ;; n) DRYRUN=echo ;; S) SYNC=true ;; - P) P2P=true ;; esac done @@ -128,25 +125,12 @@ log_folder="tmp_log/log-$t" mkdir -p $log_folder LOG_FILE=$log_folder/r.log -HMY_OPT= -HMY_OPT2= -HMY_OPT3= - -if [ "$P2P" == "false" ]; then - echo "launching beacon chain ..." - $DRYRUN $ROOT/bin/beacon -numShards $SHARDS > $log_folder/beacon.log 2>&1 | tee -a $LOG_FILE & - sleep 1 #waiting for beaconchain - BC_MA=$(grep "Beacon Chain Started" $log_folder/beacon.log | awk -F: ' { print $2 } ') - HMY_OPT=" -bc_addr $BC_MA" -else - echo "launching boot node ..." - $DRYRUN $ROOT/bin/bootnode -port 19876 > $log_folder/bootnode.log 2>&1 | tee -a $LOG_FILE & - sleep 1 - BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ') - HMY_OPT2=" -bootnodes $BN_MA" - HMY_OPT2+=" -libp2p_pd" - HMY_OPT3=" -is_beacon" -fi +echo "launching boot node ..." +$DRYRUN $ROOT/bin/bootnode -port 19876 > $log_folder/bootnode.log 2>&1 | tee -a $LOG_FILE & +sleep 1 +BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ') +HMY_OPT2=" -bootnodes $BN_MA" +HMY_OPT3=" -is_beacon" NUM_NN=0 @@ -154,15 +138,15 @@ NUM_NN=0 while IFS='' read -r line || [[ -n "$line" ]]; do IFS=' ' read ip port mode shardID <<< $line if [ "$mode" == "leader" ]; then - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_leader 2>&1 | tee -a $LOG_FILE & + $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_leader 2>&1 | tee -a $LOG_FILE & fi if [ "$mode" == "validator" ]; then - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE & + $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE & fi sleep 0.5 if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then (( NUM_NN += 35 )) - (sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) & + (sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) & fi done < $config @@ -177,7 +161,7 @@ if [ "$TXGEN" == "true" ]; then line=$(grep client $config) IFS=' ' read ip port mode shardID <<< $line if [ "$mode" == "client" ]; then - $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT $HMY_OPT2 2>&1 | tee -a $LOG_FILE + $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT2 2>&1 | tee -a $LOG_FILE fi else sleep $DURATION