diff --git a/api/client/service/client.go b/api/client/service/client.go index 726a33381..59c7b9658 100644 --- a/api/client/service/client.go +++ b/api/client/service/client.go @@ -62,3 +62,15 @@ func (client *Client) GetFreeToken(address common.Address) *proto.GetFreeTokenRe } return response } + +// GetStakingContractInfo gets necessary info for staking. +func (client *Client) GetStakingContractInfo(address common.Address) *proto.StakingContractInfoResponse { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &proto.StakingContractInfoRequest{Address: address.Bytes()} + response, err := client.clientServiceClient.GetStakingContractInfo(ctx, request) + if err != nil { + log.Fatalf("Error getting free token: %s", err) + } + return response +} diff --git a/api/client/service/gen.sh b/api/client/service/gen.sh new file mode 100755 index 000000000..ccaa5fb75 --- /dev/null +++ b/api/client/service/gen.sh @@ -0,0 +1 @@ +protoc -I proto/ proto/client.proto --go_out=plugins=grpc:proto diff --git a/api/client/service/proto/client.pb.go b/api/client/service/proto/client.pb.go index cc2b02400..5df844c57 100644 --- a/api/client/service/proto/client.pb.go +++ b/api/client/service/proto/client.pb.go @@ -195,32 +195,138 @@ func (m *GetFreeTokenResponse) GetTxId() []byte { return nil } +// StakingContractInfoRequest is the request to necessary info for stkaing. +type StakingContractInfoRequest struct { + // The account address + Address []byte `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StakingContractInfoRequest) Reset() { *m = StakingContractInfoRequest{} } +func (m *StakingContractInfoRequest) String() string { return proto.CompactTextString(m) } +func (*StakingContractInfoRequest) ProtoMessage() {} +func (*StakingContractInfoRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_014de31d7ac8c57c, []int{4} +} + +func (m *StakingContractInfoRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StakingContractInfoRequest.Unmarshal(m, b) +} +func (m *StakingContractInfoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StakingContractInfoRequest.Marshal(b, m, deterministic) +} +func (m *StakingContractInfoRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_StakingContractInfoRequest.Merge(m, src) +} +func (m *StakingContractInfoRequest) XXX_Size() int { + return xxx_messageInfo_StakingContractInfoRequest.Size(m) +} +func (m *StakingContractInfoRequest) XXX_DiscardUnknown() { + xxx_messageInfo_StakingContractInfoRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_StakingContractInfoRequest proto.InternalMessageInfo + +func (m *StakingContractInfoRequest) GetAddress() []byte { + if m != nil { + return m.Address + } + return nil +} + +// StakingContractInfoResponse is the response of GetStakingContractInfo. +type StakingContractInfoResponse struct { + // Contract address. + ContractAddress string `protobuf:"bytes,1,opt,name=contract_address,json=contractAddress,proto3" json:"contract_address,omitempty"` + // The balance of the staking account. + Balance []byte `protobuf:"bytes,2,opt,name=balance,proto3" json:"balance,omitempty"` + // The nonce of the staking account. + Nonce uint64 `protobuf:"varint,3,opt,name=nonce,proto3" json:"nonce,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StakingContractInfoResponse) Reset() { *m = StakingContractInfoResponse{} } +func (m *StakingContractInfoResponse) String() string { return proto.CompactTextString(m) } +func (*StakingContractInfoResponse) ProtoMessage() {} +func (*StakingContractInfoResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_014de31d7ac8c57c, []int{5} +} + +func (m *StakingContractInfoResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StakingContractInfoResponse.Unmarshal(m, b) +} +func (m *StakingContractInfoResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StakingContractInfoResponse.Marshal(b, m, deterministic) +} +func (m *StakingContractInfoResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_StakingContractInfoResponse.Merge(m, src) +} +func (m *StakingContractInfoResponse) XXX_Size() int { + return xxx_messageInfo_StakingContractInfoResponse.Size(m) +} +func (m *StakingContractInfoResponse) XXX_DiscardUnknown() { + xxx_messageInfo_StakingContractInfoResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_StakingContractInfoResponse proto.InternalMessageInfo + +func (m *StakingContractInfoResponse) GetContractAddress() string { + if m != nil { + return m.ContractAddress + } + return "" +} + +func (m *StakingContractInfoResponse) GetBalance() []byte { + if m != nil { + return m.Balance + } + return nil +} + +func (m *StakingContractInfoResponse) GetNonce() uint64 { + if m != nil { + return m.Nonce + } + return 0 +} + func init() { proto.RegisterType((*FetchAccountStateRequest)(nil), "client.FetchAccountStateRequest") proto.RegisterType((*FetchAccountStateResponse)(nil), "client.FetchAccountStateResponse") proto.RegisterType((*GetFreeTokenRequest)(nil), "client.GetFreeTokenRequest") proto.RegisterType((*GetFreeTokenResponse)(nil), "client.GetFreeTokenResponse") + proto.RegisterType((*StakingContractInfoRequest)(nil), "client.StakingContractInfoRequest") + proto.RegisterType((*StakingContractInfoResponse)(nil), "client.StakingContractInfoResponse") } func init() { proto.RegisterFile("client.proto", fileDescriptor_014de31d7ac8c57c) } var fileDescriptor_014de31d7ac8c57c = []byte{ - // 229 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0xce, 0xc9, 0x4c, - 0xcd, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x83, 0xf0, 0x94, 0x4c, 0xb8, 0x24, - 0xdc, 0x52, 0x4b, 0x92, 0x33, 0x1c, 0x93, 0x93, 0xf3, 0x4b, 0xf3, 0x4a, 0x82, 0x4b, 0x12, 0x4b, - 0x52, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x24, 0xb8, 0xd8, 0x13, 0x53, 0x52, 0x8a, - 0x52, 0x8b, 0x8b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x78, 0x82, 0x60, 0x5c, 0x25, 0x6f, 0x2e, 0x49, - 0x2c, 0xba, 0x8a, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x41, 0xda, 0x92, 0x12, 0x73, 0x12, 0xf3, 0x92, - 0x53, 0x61, 0xda, 0xa0, 0x5c, 0x21, 0x11, 0x2e, 0xd6, 0xbc, 0x7c, 0x90, 0x38, 0x93, 0x02, 0xa3, - 0x06, 0x4b, 0x10, 0x84, 0xa3, 0xa4, 0xcf, 0x25, 0xec, 0x9e, 0x5a, 0xe2, 0x56, 0x94, 0x9a, 0x1a, - 0x92, 0x9f, 0x9d, 0x9a, 0x47, 0xd8, 0x76, 0x2d, 0x2e, 0x11, 0x54, 0x0d, 0x50, 0x8b, 0x85, 0xb8, - 0x58, 0x4a, 0x2a, 0x3c, 0x53, 0xa0, 0xca, 0xc1, 0x6c, 0xa3, 0x1d, 0x8c, 0x5c, 0xbc, 0xce, 0x60, - 0xaf, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x0a, 0x45, 0x71, 0x09, 0x62, 0xb8, 0x5d, 0x48, - 0x41, 0x0f, 0x1a, 0x3a, 0xb8, 0x02, 0x43, 0x4a, 0x11, 0x8f, 0x0a, 0x88, 0xfd, 0x4a, 0x0c, 0x42, - 0xde, 0x5c, 0x3c, 0xc8, 0x2e, 0x13, 0x92, 0x86, 0x69, 0xc2, 0xe2, 0x41, 0x29, 0x19, 0xec, 0x92, - 0x30, 0xc3, 0x92, 0xd8, 0xc0, 0x31, 0x65, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xc6, 0xd9, 0x35, - 0x0c, 0xb9, 0x01, 0x00, 0x00, + // 302 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x52, 0x41, 0x4f, 0xf2, 0x40, + 0x14, 0xfc, 0xda, 0x0f, 0x31, 0xbe, 0xd4, 0xa8, 0x2b, 0x31, 0xb5, 0x78, 0xa8, 0xeb, 0x05, 0x3d, + 0x60, 0xa2, 0xc6, 0x3b, 0x21, 0x81, 0x10, 0x6e, 0xad, 0x27, 0x2f, 0x66, 0xd9, 0x3e, 0xb5, 0x81, + 0xec, 0x62, 0xfb, 0x30, 0xfc, 0x19, 0xff, 0xab, 0xa1, 0xdd, 0x35, 0x35, 0xb6, 0x70, 0xdb, 0x99, + 0x7d, 0xf3, 0x26, 0x3b, 0xb3, 0xe0, 0xc9, 0x45, 0x8a, 0x8a, 0xfa, 0xcb, 0x4c, 0x93, 0x66, 0xed, + 0x12, 0xf1, 0x07, 0xf0, 0x47, 0x48, 0xf2, 0x7d, 0x20, 0xa5, 0x5e, 0x29, 0x8a, 0x49, 0x10, 0x46, + 0xf8, 0xb1, 0xc2, 0x9c, 0x98, 0x0f, 0xfb, 0x22, 0x49, 0x32, 0xcc, 0x73, 0xdf, 0x09, 0x9d, 0x9e, + 0x17, 0x59, 0xc8, 0xa7, 0x70, 0x5e, 0xa3, 0xca, 0x97, 0x5a, 0xe5, 0xb8, 0x91, 0xcd, 0xc4, 0x42, + 0x28, 0x89, 0x56, 0x66, 0x20, 0xeb, 0xc0, 0x9e, 0xd2, 0x1b, 0xde, 0x0d, 0x9d, 0x5e, 0x2b, 0x2a, + 0x01, 0xbf, 0x85, 0xd3, 0x31, 0xd2, 0x28, 0x43, 0x7c, 0xd2, 0x73, 0x54, 0xbb, 0xdd, 0x6f, 0xa0, + 0xf3, 0x5b, 0x60, 0x8c, 0x19, 0xb4, 0x68, 0x3d, 0x49, 0xcc, 0x78, 0x71, 0xe6, 0x8f, 0x10, 0xc4, + 0x24, 0xe6, 0xa9, 0x7a, 0x1b, 0x6a, 0x45, 0x99, 0x90, 0x34, 0x51, 0xaf, 0x7a, 0xb7, 0xc7, 0x1a, + 0xba, 0xb5, 0x3a, 0x63, 0x75, 0x0d, 0xc7, 0xd2, 0xf0, 0x2f, 0xd5, 0x0d, 0x07, 0xd1, 0x91, 0xe5, + 0x07, 0x25, 0x5d, 0x8d, 0xc3, 0x6d, 0x88, 0xe3, 0x7f, 0x25, 0x8e, 0xbb, 0x2f, 0x17, 0x0e, 0x87, + 0x45, 0x39, 0x31, 0x66, 0x9f, 0xa9, 0x44, 0xf6, 0x0c, 0x27, 0x7f, 0xd2, 0x66, 0x61, 0xdf, 0xf4, + 0xd9, 0x54, 0x5f, 0x70, 0xb9, 0x65, 0xa2, 0x7c, 0x06, 0xff, 0xc7, 0xa6, 0xe0, 0x55, 0xb3, 0x64, + 0x5d, 0x2b, 0xaa, 0xa9, 0x24, 0xb8, 0xa8, 0xbf, 0xfc, 0x59, 0x26, 0xe1, 0x6c, 0x8c, 0x54, 0x93, + 0x1b, 0xe3, 0x56, 0xd9, 0x5c, 0x46, 0x70, 0xb5, 0x75, 0xc6, 0x9a, 0xcc, 0xda, 0xc5, 0x07, 0xbe, + 0xff, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xe9, 0x3e, 0x0b, 0x3e, 0xd0, 0x02, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -237,6 +343,7 @@ const _ = grpc.SupportPackageIsVersion4 type ClientServiceClient interface { FetchAccountState(ctx context.Context, in *FetchAccountStateRequest, opts ...grpc.CallOption) (*FetchAccountStateResponse, error) GetFreeToken(ctx context.Context, in *GetFreeTokenRequest, opts ...grpc.CallOption) (*GetFreeTokenResponse, error) + GetStakingContractInfo(ctx context.Context, in *StakingContractInfoRequest, opts ...grpc.CallOption) (*StakingContractInfoResponse, error) } type clientServiceClient struct { @@ -265,10 +372,20 @@ func (c *clientServiceClient) GetFreeToken(ctx context.Context, in *GetFreeToken return out, nil } +func (c *clientServiceClient) GetStakingContractInfo(ctx context.Context, in *StakingContractInfoRequest, opts ...grpc.CallOption) (*StakingContractInfoResponse, error) { + out := new(StakingContractInfoResponse) + err := c.cc.Invoke(ctx, "/client.ClientService/GetStakingContractInfo", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ClientServiceServer is the server API for ClientService service. type ClientServiceServer interface { FetchAccountState(context.Context, *FetchAccountStateRequest) (*FetchAccountStateResponse, error) GetFreeToken(context.Context, *GetFreeTokenRequest) (*GetFreeTokenResponse, error) + GetStakingContractInfo(context.Context, *StakingContractInfoRequest) (*StakingContractInfoResponse, error) } func RegisterClientServiceServer(s *grpc.Server, srv ClientServiceServer) { @@ -311,6 +428,24 @@ func _ClientService_GetFreeToken_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _ClientService_GetStakingContractInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StakingContractInfoRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ClientServiceServer).GetStakingContractInfo(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/client.ClientService/GetStakingContractInfo", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ClientServiceServer).GetStakingContractInfo(ctx, req.(*StakingContractInfoRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _ClientService_serviceDesc = grpc.ServiceDesc{ ServiceName: "client.ClientService", HandlerType: (*ClientServiceServer)(nil), @@ -323,6 +458,10 @@ var _ClientService_serviceDesc = grpc.ServiceDesc{ MethodName: "GetFreeToken", Handler: _ClientService_GetFreeToken_Handler, }, + { + MethodName: "GetStakingContractInfo", + Handler: _ClientService_GetStakingContractInfo_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "client.proto", diff --git a/api/client/service/proto/client.proto b/api/client/service/proto/client.proto index da466c34a..2620f8e8c 100644 --- a/api/client/service/proto/client.proto +++ b/api/client/service/proto/client.proto @@ -6,6 +6,7 @@ package client; service ClientService { rpc FetchAccountState(FetchAccountStateRequest) returns (FetchAccountStateResponse) {} rpc GetFreeToken(GetFreeTokenRequest) returns (GetFreeTokenResponse) {} + rpc GetStakingContractInfo(StakingContractInfoRequest) returns (StakingContractInfoResponse) {} } // FetchAccountStateRequest is the request to fetch an account's balance and nonce. @@ -33,3 +34,20 @@ message GetFreeTokenResponse { // The transaction Id that requests free token from the faucet. bytes txId = 1; } + +// StakingContractInfoRequest is the request to necessary info for stkaing. +message StakingContractInfoRequest { + // The account address + bytes address = 1; +} + +// StakingContractInfoResponse is the response of GetStakingContractInfo. +message StakingContractInfoResponse { + // Contract address. + string contract_address = 1; + // The balance of the staking account. + bytes balance = 2; + // The nonce of the staking account. + uint64 nonce = 3; +} + diff --git a/api/client/service/server.go b/api/client/service/server.go index 0453fe2f7..cab06cec3 100644 --- a/api/client/service/server.go +++ b/api/client/service/server.go @@ -6,17 +6,16 @@ import ( "net" "github.com/ethereum/go-ethereum/common" + proto "github.com/harmony-one/harmony/api/client/service/proto" "github.com/harmony-one/harmony/core/state" - "google.golang.org/grpc" - - proto "github.com/harmony-one/harmony/api/client/service/proto" ) // Server is the Server struct for client service package. type Server struct { - stateReader func() (*state.DB, error) - callFaucetContract func(common.Address) common.Hash + stateReader func() (*state.DB, error) + callFaucetContract func(common.Address) common.Hash + getDeployedStakingContractAddress func() common.Address } // FetchAccountState implements the FetchAccountState interface to return account state. @@ -39,6 +38,21 @@ func (s *Server) GetFreeToken(ctx context.Context, request *proto.GetFreeTokenRe return &proto.GetFreeTokenResponse{TxId: s.callFaucetContract(address).Bytes()}, nil } +// GetStakingContractInfo implements the GetStakingContractInfo interface to return necessary info for staking. +func (s *Server) GetStakingContractInfo(ctx context.Context, request *proto.StakingContractInfoRequest) (*proto.StakingContractInfoResponse, error) { + var address common.Address + address.SetBytes(request.Address) + state, err := s.stateReader() + if err != nil { + return nil, err + } + return &proto.StakingContractInfoResponse{ + ContractAddress: s.getDeployedStakingContractAddress().Hex(), + Balance: state.GetBalance(address).Bytes(), + Nonce: state.GetNonce(address), + }, nil +} + // Start starts the Server on given ip and port. func (s *Server) Start(ip, port string) (*grpc.Server, error) { // TODO(minhdoan): Currently not using ip. Fix it later. @@ -55,7 +69,14 @@ func (s *Server) Start(ip, port string) (*grpc.Server, error) { } // NewServer creates new Server which implements ClientServiceServer interface. -func NewServer(stateReader func() (*state.DB, error), callFaucetContract func(common.Address) common.Hash) *Server { - s := &Server{stateReader: stateReader, callFaucetContract: callFaucetContract} +func NewServer( + stateReader func() (*state.DB, error), + callFaucetContract func(common.Address) common.Hash, + getDeployedStakingContractAddress func() common.Address) *Server { + s := &Server{ + stateReader: stateReader, + callFaucetContract: callFaucetContract, + getDeployedStakingContractAddress: getDeployedStakingContractAddress, + } return s } diff --git a/api/client/service/server_test.go b/api/client/service/server_test.go index edb7a6398..d8ce892f3 100644 --- a/api/client/service/server_test.go +++ b/api/client/service/server_test.go @@ -33,7 +33,7 @@ func TestGetFreeToken(test *testing.T) { return nil, nil }, func(common.Address) common.Hash { return hash - }) + }, nil) testBankKey, _ := crypto.GenerateKey() testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey) @@ -67,7 +67,7 @@ func TestFetchAccountState(test *testing.T) { return chain.State() }, func(common.Address) common.Hash { return hash - }) + }, nil) response, err := server.FetchAccountState(nil, &client.FetchAccountStateRequest{Address: testBankAddress.Bytes()}) diff --git a/api/drand/drand.pb.go b/api/drand/drand.pb.go new file mode 100644 index 000000000..3585b13ea --- /dev/null +++ b/api/drand/drand.pb.go @@ -0,0 +1,145 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: drand.proto + +package drand + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type MessageType int32 + +const ( + MessageType_UNKNOWN MessageType = 0 + MessageType_INIT MessageType = 1 + MessageType_COMMIT MessageType = 2 +) + +var MessageType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "INIT", + 2: "COMMIT", +} + +var MessageType_value = map[string]int32{ + "UNKNOWN": 0, + "INIT": 1, + "COMMIT": 2, +} + +func (x MessageType) String() string { + return proto.EnumName(MessageType_name, int32(x)) +} + +func (MessageType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_1d855c36cf2c0c50, []int{0} +} + +type Message struct { + Type MessageType `protobuf:"varint,1,opt,name=type,proto3,enum=drand.MessageType" json:"type,omitempty"` + SenderId uint32 `protobuf:"varint,3,opt,name=sender_id,json=senderId,proto3" json:"sender_id,omitempty"` + BlockHash []byte `protobuf:"bytes,4,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` + Payload []byte `protobuf:"bytes,5,opt,name=payload,proto3" json:"payload,omitempty"` + Signature []byte `protobuf:"bytes,6,opt,name=signature,proto3" json:"signature,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_1d855c36cf2c0c50, []int{0} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetType() MessageType { + if m != nil { + return m.Type + } + return MessageType_UNKNOWN +} + +func (m *Message) GetSenderId() uint32 { + if m != nil { + return m.SenderId + } + return 0 +} + +func (m *Message) GetBlockHash() []byte { + if m != nil { + return m.BlockHash + } + return nil +} + +func (m *Message) GetPayload() []byte { + if m != nil { + return m.Payload + } + return nil +} + +func (m *Message) GetSignature() []byte { + if m != nil { + return m.Signature + } + return nil +} + +func init() { + proto.RegisterEnum("drand.MessageType", MessageType_name, MessageType_value) + proto.RegisterType((*Message)(nil), "drand.Message") +} + +func init() { proto.RegisterFile("drand.proto", fileDescriptor_1d855c36cf2c0c50) } + +var fileDescriptor_1d855c36cf2c0c50 = []byte{ + // 214 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x8f, 0xcf, 0x4a, 0x87, 0x40, + 0x10, 0x80, 0xdb, 0xf2, 0xe7, 0x9f, 0xb1, 0x42, 0xe6, 0xb4, 0x50, 0x81, 0x74, 0x08, 0xe9, 0x20, + 0x51, 0x8f, 0xd0, 0xa5, 0x25, 0x5c, 0x41, 0x8c, 0x8e, 0xb2, 0xb6, 0x8b, 0x4a, 0xe2, 0x2e, 0xbb, + 0x76, 0xf0, 0x81, 0x7a, 0xcf, 0x60, 0xad, 0xf8, 0x1d, 0xbf, 0xef, 0x9b, 0x81, 0x19, 0x48, 0xa5, + 0x15, 0x8b, 0x2c, 0x8d, 0xd5, 0xab, 0xc6, 0x83, 0x87, 0xdb, 0x6f, 0x02, 0x51, 0xa5, 0x9c, 0x13, + 0x83, 0xc2, 0x3b, 0x08, 0xd6, 0xcd, 0x28, 0x4a, 0x72, 0x52, 0x5c, 0x3e, 0x62, 0xb9, 0x8f, 0xff, + 0xd6, 0x76, 0x33, 0xaa, 0xf1, 0x1d, 0xaf, 0x20, 0x71, 0x6a, 0x91, 0xca, 0x76, 0x93, 0xa4, 0x67, + 0x39, 0x29, 0x2e, 0x9a, 0x78, 0x17, 0x4c, 0xe2, 0x0d, 0x40, 0x3f, 0xeb, 0x8f, 0xcf, 0x6e, 0x14, + 0x6e, 0xa4, 0x41, 0x4e, 0x8a, 0xf3, 0x26, 0xf1, 0xe6, 0x45, 0xb8, 0x11, 0x29, 0x44, 0x46, 0x6c, + 0xb3, 0x16, 0x92, 0x1e, 0x7c, 0xfb, 0x43, 0xbc, 0x86, 0xc4, 0x4d, 0xc3, 0x22, 0xd6, 0x2f, 0xab, + 0x68, 0xb8, 0xef, 0xfd, 0x8b, 0xfb, 0x07, 0x48, 0x8f, 0x0e, 0xc1, 0x14, 0xa2, 0x37, 0xfe, 0xca, + 0xeb, 0x77, 0x9e, 0x9d, 0x60, 0x0c, 0x01, 0xe3, 0xac, 0xcd, 0x08, 0x02, 0x84, 0xcf, 0x75, 0x55, + 0xb1, 0x36, 0x3b, 0xed, 0x43, 0xff, 0xe7, 0xd3, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x79, 0xfa, + 0xf8, 0x57, 0xf6, 0x00, 0x00, 0x00, +} diff --git a/api/drand/drand.proto b/api/drand/drand.proto new file mode 100644 index 000000000..1689c335b --- /dev/null +++ b/api/drand/drand.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package drand; + +enum MessageType { + UNKNOWN = 0; + INIT = 1; + COMMIT = 2; +} + +message Message { + MessageType type = 1; + uint32 sender_id = 3; // TODO: make it public key + bytes block_hash = 4; + bytes payload = 5; + bytes signature = 6; +} diff --git a/api/proto/common.go b/api/proto/common.go index 94c921e87..b0cb59be4 100644 --- a/api/proto/common.go +++ b/api/proto/common.go @@ -31,6 +31,7 @@ const ( Node Client Identity + DRand // TODO: add more types ) @@ -81,3 +82,10 @@ func ConstructConsensusMessage(payload []byte) []byte { byteBuffer.Write(payload) return byteBuffer.Bytes() } + +// ConstructDRandMessage creates a message with the payload and returns as byte array. +func ConstructDRandMessage(payload []byte) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(DRand)}) + byteBuffer.Write(payload) + return byteBuffer.Bytes() +} diff --git a/api/proto/discovery/pingpong.go b/api/proto/discovery/pingpong.go index 9c645bf8d..e91d94c8d 100644 --- a/api/proto/discovery/pingpong.go +++ b/api/proto/discovery/pingpong.go @@ -19,6 +19,7 @@ import ( "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/api/proto" "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" ) @@ -88,6 +89,8 @@ func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey) *PongMessageType pong.PubKeys = append(pong.PubKeys, key) } + utils.GetLogInstance().Info("[pong message]", "keys", len(pong.PubKeys), "peers", len(pong.Peers)) + return pong } diff --git a/api/service/clientsupport/service.go b/api/service/clientsupport/service.go index 20f07bacd..db378f57e 100644 --- a/api/service/clientsupport/service.go +++ b/api/service/clientsupport/service.go @@ -23,9 +23,15 @@ type Service struct { } // New returns new client support service. -func New(stateReader func() (*state.DB, error), callFaucetContract func(common.Address) common.Hash, ip, nodePort string) *Service { +func New(stateReader func() (*state.DB, error), + callFaucetContract func(common.Address) common.Hash, + getDeployedStakingContract func() common.Address, + ip, nodePort string) *Service { port, _ := strconv.Atoi(nodePort) - return &Service{server: clientService.NewServer(stateReader, callFaucetContract), ip: ip, port: strconv.Itoa(port + ClientServicePortDiff)} + return &Service{ + server: clientService.NewServer(stateReader, callFaucetContract, getDeployedStakingContract), + ip: ip, + port: strconv.Itoa(port + ClientServicePortDiff)} } // StartService starts client support service. diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index d00c56663..93f2169c9 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -7,15 +7,9 @@ import ( "github.com/harmony-one/harmony/p2p/host" ) -// Constants for discovery service. -const ( - numIncoming = 128 - numOutgoing = 16 -) - // Service is the struct for discovery service. type Service struct { - Host p2p.Host + host p2p.Host Rendezvous string peerChan chan p2p.Peer stakingChan chan p2p.Peer @@ -27,7 +21,7 @@ type Service struct { // r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network) func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service { return &Service{ - Host: h, + host: h, Rendezvous: r, peerChan: peerChan, stakingChan: stakingChan, @@ -62,8 +56,10 @@ func (s *Service) contactP2pPeers() { log.Debug("end of info", "peer", peer.PeerID) return } - log.Debug("[DISCOVERY]", "peer", peer) - s.Host.AddPeer(&peer) + s.host.AddPeer(&peer) + // Add to outgoing peer list + s.host.AddOutgoingPeer(peer) + log.Debug("[DISCOVERY]", "add outgoing peer", peer) // TODO: stop ping if pinged before // TODO: call staking servcie here if it is a new node s.pingPeer(peer) @@ -79,12 +75,17 @@ func (s *Service) Init() { } func (s *Service) pingPeer(peer p2p.Peer) { - ping := proto_discovery.NewPingMessage(s.Host.GetSelfPeer()) + ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer()) buffer := ping.ConstructPingMessage() - log.Debug("Sending Ping Message to", "peer", peer) content := host.ConstructP2pMessage(byte(0), buffer) - s.Host.SendMessage(peer, content) - log.Debug("Sent Ping Message to", "peer", peer) + // s.host.SendMessage(peer, content) + // log.Debug("Sent Ping Message via unicast to", "peer", peer) + err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) + if err != nil { + log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon) + } else { + log.Debug("[PING] sent Ping Message via group send to", "peer", peer) + } if s.stakingChan != nil { s.stakingChan <- peer } diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index d3d2cd70f..92a48ad5f 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -28,6 +28,7 @@ type Service struct { peerChan chan p2p.Peer peerInfo <-chan peerstore.PeerInfo discovery *libp2pdis.RoutingDiscovery + lock sync.Mutex } // New returns role conversion service. @@ -115,7 +116,15 @@ func (s *Service) DoService() { return } if peer.ID != s.Host.GetP2PHost().ID() && len(peer.ID) > 0 { - utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs) + utils.GetLogInstance().Info("Found Peer", "peer", peer.ID, "addr", peer.Addrs, "my ID", s.Host.GetP2PHost().ID()) + s.lock.Lock() + if err := s.Host.GetP2PHost().Connect(s.ctx, peer); err != nil { + utils.GetLogInstance().Warn("can't connect to peer node", "error", err) + } else { + utils.GetLogInstance().Info("connected to peer node", "peer", peer) + } + s.lock.Unlock() + // figure out the public ip/port ip := "127.0.0.1" var port string for _, addr := range peer.Addrs { diff --git a/api/service/staking/service.go b/api/service/staking/service.go index 0b2c0496f..002e274a5 100644 --- a/api/service/staking/service.go +++ b/api/service/staking/service.go @@ -4,11 +4,10 @@ import ( "crypto/ecdsa" "math/big" - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" client "github.com/harmony-one/harmony/api/client/service" proto "github.com/harmony-one/harmony/api/client/service/proto" "github.com/harmony-one/harmony/api/proto/message" @@ -76,17 +75,17 @@ func (s *Service) DoService(peer p2p.Peer) { // See below of how to create a staking message. } -func (s *Service) getAccountState(beaconPeer p2p.Peer) *proto.FetchAccountStateResponse { +func (s *Service) getStakingInfo(beaconPeer p2p.Peer) *proto.StakingContractInfoResponse { client := client.NewClient(beaconPeer.IP, beaconPeer.Port) defer client.Close() - return client.GetBalance(crypto.PubkeyToAddress(s.accountKey.PublicKey)) + return client.GetStakingContractInfo(crypto.PubkeyToAddress(s.accountKey.PublicKey)) } func (s *Service) createStakingMessage(beaconPeer p2p.Peer) *message.Message { - accountState := s.getAccountState(beaconPeer) - toAddress := common.HexToAddress("0x4592d8f8d7b001e72cb26a73e4fa1806a51ac79d") + stakingInfo := s.getStakingInfo(beaconPeer) + toAddress := common.HexToAddress(stakingInfo.ContractAddress) tx := types.NewTransaction( - accountState.Nonce, + stakingInfo.Nonce, toAddress, 0, // beacon chain. big.NewInt(s.stakingAmount), diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index de1368a17..c248c8f61 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -64,7 +64,7 @@ func main() { // Init logging. loggingInit(*logFolder, *ip, *port) - privKey, err := utils.LoadKeyFromFile(*keyFile) + privKey, _, err := utils.LoadKeyFromFile(*keyFile) if err != nil { panic(err) } diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 8f444f1aa..44be6f380 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -72,7 +72,7 @@ func main() { var bcPeer *p2p.Peer var shardIDLeaderMap map[uint32]p2p.Peer - priKey, err := utils.LoadKeyFromFile(*keyFile) + priKey, _, err := utils.LoadKeyFromFile(*keyFile) if err != nil { panic(err) } diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index e9a984c3a..a53c5089e 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -129,14 +129,14 @@ func processNewCommnad() { panic("Failed to generate the private key") } storePrivateKey(crypto2.FromECDSA(priKey)) - fmt.Printf("New account created with address:\n {%s}\n", crypto2.PubkeyToAddress(priKey.PublicKey).Hex()) - fmt.Printf("Please keep a copy of the private key:\n {%s}\n", hex.EncodeToString(crypto2.FromECDSA(priKey))) + fmt.Printf("New account created with address:{%s}\n", crypto2.PubkeyToAddress(priKey.PublicKey).Hex()) + fmt.Printf("Please keep a copy of the private key:{%s}\n", hex.EncodeToString(crypto2.FromECDSA(priKey))) } func processListCommand() { for i, key := range readPrivateKeys() { - fmt.Printf("Account %d:\n {%s}\n", i, crypto2.PubkeyToAddress(key.PublicKey).Hex()) - fmt.Printf(" PrivateKey: {%s}\n", hex.EncodeToString(key.D.Bytes())) + fmt.Printf("Account %d:{%s}\n", i, crypto2.PubkeyToAddress(key.PublicKey).Hex()) + fmt.Printf(" PrivateKey:{%s}\n", hex.EncodeToString(key.D.Bytes())) } } diff --git a/cmd/harmony.go b/cmd/harmony.go index 91b7de3b2..b5c0c9753 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -11,6 +11,9 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + peerstore "github.com/libp2p/go-libp2p-peerstore" + multiaddr "github.com/multiformats/go-multiaddr" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/internal/attack" pkg_newnode "github.com/harmony-one/harmony/internal/newnode" @@ -19,8 +22,6 @@ import ( "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" - peerstore "github.com/libp2p/go-libp2p-peerstore" - multiaddr "github.com/multiformats/go-multiaddr" ) var ( @@ -108,6 +109,12 @@ func main() { // isBeacon indicates this node is a beacon chain node isBeacon := flag.Bool("is_beacon", false, "true means this node is a beacon chain node") + // isLeader indicates this node is a beacon chain leader node during the bootstrap process + isLeader := flag.Bool("is_leader", false, "true means this node is a beacon chain leader node") + + // logConn logs incoming/outgoing connections + logConn := flag.Bool("log_conn", false, "log incoming/outgoing connections") + flag.Parse() if *versionFlag { @@ -131,56 +138,72 @@ func main() { utils.BootNodes = bootNodeAddrs } - var shardID string + var shardID = "0" var peers []p2p.Peer var leader p2p.Peer var selfPeer p2p.Peer var clientPeer *p2p.Peer var BCPeer *p2p.Peer - priKey, err := utils.LoadKeyFromFile(*keyFile) + var role string + + nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile) if err != nil { panic(err) } - if *bcAddr != "" { - // Turn the destination into a multiaddr. - maddr, err := multiaddr.NewMultiaddr(*bcAddr) - if err != nil { - panic(err) + peerPriKey, peerPubKey := utils.GenKey(*ip, *port) + if peerPriKey == nil || peerPubKey == nil { + panic(fmt.Errorf("generate key error")) + } + selfPeer = p2p.Peer{IP: *ip, Port: *port, ValidatorID: -1, PubKey: peerPubKey} + + if !*libp2pPD { + if *bcAddr != "" { + // Turn the destination into a multiaddr. + maddr, err := multiaddr.NewMultiaddr(*bcAddr) + if err != nil { + panic(err) + } + + // Extract the peer ID from the multiaddr. + info, err := peerstore.InfoFromP2pAddr(maddr) + if err != nil { + panic(err) + } + + BCPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} + } else { + BCPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} } - // Extract the peer ID from the multiaddr. - info, err := peerstore.InfoFromP2pAddr(maddr) - if err != nil { - panic(err) - } + //Use Peer Discovery to get shard/leader/peer/... + candidateNode := pkg_newnode.New(*ip, *port, nodePriKey) + candidateNode.AddPeer(BCPeer) + candidateNode.ContactBeaconChain(*BCPeer) - BCPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort, Addrs: info.Addrs, PeerID: info.ID} - } else { - BCPeer = &p2p.Peer{IP: *bcIP, Port: *bcPort} - } + shardID = candidateNode.GetShardID() + leader = candidateNode.GetLeader() + selfPeer = candidateNode.GetSelfPeer() + clientPeer = candidateNode.GetClientPeer() + selfPeer.PubKey = candidateNode.PubK - //Use Peer Discovery to get shard/leader/peer/... - candidateNode := pkg_newnode.New(*ip, *port, priKey) - candidateNode.AddPeer(BCPeer) - candidateNode.ContactBeaconChain(*BCPeer) - - shardID = candidateNode.GetShardID() - leader = candidateNode.GetLeader() - selfPeer = candidateNode.GetSelfPeer() - clientPeer = candidateNode.GetClientPeer() - selfPeer.PubKey = candidateNode.PubK + if leader.IP == *ip && leader.Port == *port { + role = "leader" + } else { + role = "validator" + } - var role string - if leader.IP == *ip && leader.Port == *port { - role = "leader" + if role == "validator" { + // Attack determination. + attack.GetInstance().SetAttackEnabled(attackDetermination(*attackedMode)) + } } else { - role = "validator" - } - - if role == "validator" { - // Attack determination. - attack.GetInstance().SetAttackEnabled(attackDetermination(*attackedMode)) + if *isLeader { + role = "leader" + leader = selfPeer + } else { + role = "validator" + } } // Init logging. loggingInit(*logFolder, role, *ip, *port, *onlyLogTps) @@ -191,7 +214,10 @@ func main() { ldb, _ = InitLDBDatabase(*ip, *port, *freshDB) } - host, err := p2pimpl.NewHost(&selfPeer, priKey) + host, err := p2pimpl.NewHost(&selfPeer, nodePriKey) + if *logConn { + host.GetP2PHost().Network().Notify(utils.ConnLogger) + } if err != nil { panic("unable to new host in harmony") } @@ -201,6 +227,7 @@ func main() { host.AddPeer(&leader) // Consensus object. + // TODO: consensus object shouldn't start here consensus := consensus.New(host, shardID, peers, leader) consensus.MinPeers = *minPeers @@ -216,15 +243,17 @@ func main() { // Current node. currentNode := node.New(host, consensus, ldb) currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers - if role == "leader" { - if *isBeacon { + currentNode.Role = node.NewNode + + if *isBeacon { + if role == "leader" { currentNode.Role = node.BeaconLeader } else { - currentNode.Role = node.ShardLeader + currentNode.Role = node.BeaconValidator } } else { - if *isBeacon { - currentNode.Role = node.BeaconValidator + if role == "leader" { + currentNode.Role = node.ShardLeader } else { currentNode.Role = node.ShardValidator } @@ -240,14 +269,14 @@ func main() { consensus.OnConsensusDone = currentNode.PostConsensusProcessing currentNode.State = node.NodeWaitToJoin - if *libp2pPD { - currentNode.Role = node.NewNode - } else { + if !*libp2pPD { if consensus.IsLeader { currentNode.State = node.NodeLeader } else { go currentNode.JoinShard(leader) } + } else { + currentNode.UseLibP2P = true } go currentNode.SupportSyncing() diff --git a/consensus/consensus.go b/consensus/consensus.go index 4e2c74ec7..60d067955 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -213,8 +213,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons return &consensus } -// Checks the basic meta of a consensus message. -// +// Checks the basic meta of a consensus message, including the signature. func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Message, publicKey *bls.PublicKey) error { consensusID := message.ConsensusId blockHash := message.BlockHash @@ -380,7 +379,7 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int { consensus.pubKeyLock.Lock() consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) consensus.pubKeyLock.Unlock() - utils.GetLogInstance().Debug("[SYNC] new peer added") + utils.GetLogInstance().Debug("[SYNC]", "new peer added", peer) } count++ } diff --git a/drand/drand.go b/drand/drand.go new file mode 100644 index 000000000..ab878a705 --- /dev/null +++ b/drand/drand.go @@ -0,0 +1,194 @@ +package drand + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "strconv" + "sync" + + protobuf "github.com/golang/protobuf/proto" + "github.com/harmony-one/bls/ffi/go/bls" + drand_proto "github.com/harmony-one/harmony/api/drand" + bls_cosi "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" +) + +// DRand is the main struct which contains state for the distributed randomness protocol. +type DRand struct { + vrfs *map[uint32][]byte + bitmap *bls_cosi.Mask + pRand *[32]byte + rand *[32]byte + + // map of nodeID to validator Peer object + // FIXME: should use PubKey of p2p.Peer as the hashkey + validators sync.Map // key is uint16, value is p2p.Peer + + // Leader's address + leader p2p.Peer + + // Public keys of the committee including leader and validators + PublicKeys []*bls.PublicKey + + // private/public keys of current node + priKey *bls.SecretKey + pubKey *bls.PublicKey + + // Whether I am leader. False means I am validator + IsLeader bool + + // Leader or validator Id - 4 byte + nodeID uint32 + + // The p2p host used to send/receive p2p messages + host p2p.Host + + // Shard Id which this node belongs to + ShardID uint32 + + // Blockhash - 32 byte + blockHash [32]byte +} + +// New creates a new dRand object +func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *DRand { + dRand := DRand{} + dRand.host = host + + selfPeer := host.GetSelfPeer() + if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { + dRand.IsLeader = true + } else { + dRand.IsLeader = false + } + + dRand.leader = leader + for _, peer := range peers { + dRand.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) + } + + dRand.vrfs = &map[uint32][]byte{} + + // Initialize cosign bitmap + allPublicKeys := make([]*bls.PublicKey, 0) + for _, validatorPeer := range peers { + allPublicKeys = append(allPublicKeys, validatorPeer.PubKey) + } + allPublicKeys = append(allPublicKeys, leader.PubKey) + + dRand.PublicKeys = allPublicKeys + + bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey) + dRand.bitmap = bitmap + + dRand.pRand = nil + dRand.rand = nil + + // For now use socket address as ID + // TODO: populate Id derived from address + dRand.nodeID = utils.GetUniqueIDFromPeer(selfPeer) + + // Set private key for myself so that I can sign messages. + nodeIDBytes := make([]byte, 32) + binary.LittleEndian.PutUint32(nodeIDBytes, dRand.nodeID) + privateKey := bls.SecretKey{} + err := privateKey.SetLittleEndian(nodeIDBytes) + dRand.priKey = &privateKey + dRand.pubKey = privateKey.GetPublicKey() + + myShardID, err := strconv.Atoi(ShardID) + if err != nil { + panic("Unparseable shard Id" + ShardID) + } + dRand.ShardID = uint32(myShardID) + + return &dRand +} + +// Sign on the drand message signature field. +func (dRand *DRand) signDRandMessage(message *drand_proto.Message) error { + message.Signature = nil + // TODO: use custom serialization method rather than protobuf + marshaledMessage, err := protobuf.Marshal(message) + if err != nil { + return err + } + // 64 byte of signature on previous data + hash := sha256.Sum256(marshaledMessage) + signature := dRand.priKey.SignHash(hash[:]) + + message.Signature = signature.Serialize() + return nil +} + +// Signs the drand message and returns the marshaled message. +func (dRand *DRand) signAndMarshalDRandMessage(message *drand_proto.Message) ([]byte, error) { + err := dRand.signDRandMessage(message) + if err != nil { + return []byte{}, err + } + + marshaledMessage, err := protobuf.Marshal(message) + if err != nil { + return []byte{}, err + } + return marshaledMessage, nil +} + +func (dRand *DRand) vrf(blockHash [32]byte) (rand [32]byte, proof []byte) { + // TODO: implement vrf + return [32]byte{}, []byte{} +} + +// GetValidatorPeers returns list of validator peers. +func (dRand *DRand) GetValidatorPeers() []p2p.Peer { + validatorPeers := make([]p2p.Peer, 0) + + dRand.validators.Range(func(k, v interface{}) bool { + if peer, ok := v.(p2p.Peer); ok { + validatorPeers = append(validatorPeers, peer) + return true + } + return false + }) + + return validatorPeers +} + +// Verify the signature of the message are valid from the signer's public key. +func verifyMessageSig(signerPubKey *bls.PublicKey, message drand_proto.Message) error { + signature := message.Signature + message.Signature = nil + messageBytes, err := protobuf.Marshal(&message) + if err != nil { + return err + } + + msgSig := bls.Sign{} + err = msgSig.Deserialize(signature) + if err != nil { + return err + } + msgHash := sha256.Sum256(messageBytes) + if !msgSig.VerifyHash(signerPubKey, msgHash[:]) { + return errors.New("failed to verify the signature") + } + return nil +} + +// Gets the validator peer based on validator ID. +func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer { + v, ok := dRand.validators.Load(validatorID) + if !ok { + utils.GetLogInstance().Warn("Unrecognized validator", "validatorID", validatorID, "dRand", dRand) + return nil + } + value, ok := v.(p2p.Peer) + if !ok { + utils.GetLogInstance().Warn("Invalid validator", "validatorID", validatorID, "dRand", dRand) + return nil + } + return &value +} diff --git a/drand/drand_leader.go b/drand/drand_leader.go new file mode 100644 index 000000000..561309775 --- /dev/null +++ b/drand/drand_leader.go @@ -0,0 +1,93 @@ +package drand + +import ( + protobuf "github.com/golang/protobuf/proto" + drand_proto "github.com/harmony-one/harmony/api/drand" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p/host" +) + +// WaitForEpochBlock waits for the first epoch block to run DRG on +func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}) { + go func() { + defer close(stoppedChan) + for { + select { + default: + // keep waiting for new blocks + newBlock := <-blockChannel + // TODO: think about potential race condition + + dRand.init(newBlock) + case <-stopChan: + return + } + } + }() +} + +func (dRand *DRand) init(epochBlock *types.Block) { + // Copy over block hash and block header data + blockHash := epochBlock.Hash() + copy(dRand.blockHash[:], blockHash[:]) + + msgToSend := dRand.constructInitMessage() + + // Leader commit vrf itself + rand, proof := dRand.vrf(dRand.blockHash) + + (*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...) + + host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil) +} + +// ProcessMessageLeader dispatches messages for the leader to corresponding processors. +func (dRand *DRand) ProcessMessageLeader(payload []byte) { + message := drand_proto.Message{} + err := protobuf.Unmarshal(payload, &message) + + if err != nil { + utils.GetLogInstance().Error("Failed to unmarshal message payload.", "err", err, "dRand", dRand) + } + + switch message.Type { + case drand_proto.MessageType_COMMIT: + dRand.processCommitMessage(message) + default: + utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand) + } +} + +// ProcessMessageValidator dispatches validator's consensus message. +func (dRand *DRand) processCommitMessage(message drand_proto.Message) { + if message.Type != drand_proto.MessageType_COMMIT { + utils.GetLogInstance().Error("Wrong message type received", "expected", drand_proto.MessageType_COMMIT, "got", message.Type) + return + } + + // Verify message signature + err := verifyMessageSig(dRand.leader.PubKey, message) + if err != nil { + utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) + return + } + + rand := message.Payload[:32] + proof := message.Payload[32:] + _ = rand + _ = proof + // TODO: check the validity of the vrf commit + + validatorID := message.SenderId + validatorPeer := dRand.getValidatorPeerByID(validatorID) + vrfs := dRand.vrfs + utils.GetLogInstance().Debug("Received new prepare signature", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) + + (*vrfs)[validatorID] = message.Payload + dRand.bitmap.SetKey(validatorPeer.PubKey, true) // Set the bitmap indicating that this validator signed. + + if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) { + // Construct pRand and initiate consensus on it + } +} diff --git a/drand/drand_leader_msg.go b/drand/drand_leader_msg.go new file mode 100644 index 000000000..48564fadc --- /dev/null +++ b/drand/drand_leader_msg.go @@ -0,0 +1,21 @@ +package drand + +import ( + drand_proto "github.com/harmony-one/harmony/api/drand" + "github.com/harmony-one/harmony/api/proto" + "github.com/harmony-one/harmony/internal/utils" +) + +// Constructs the init message +func (drand *DRand) constructInitMessage() []byte { + message := drand_proto.Message{} + message.Type = drand_proto.MessageType_INIT + + message.BlockHash = drand.blockHash[:] + // Don't need the payload in init message + marshaledMessage, err := drand.signAndMarshalDRandMessage(&message) + if err != nil { + utils.GetLogInstance().Error("Failed to sign and marshal the init message", "error", err) + } + return proto.ConstructDRandMessage(marshaledMessage) +} diff --git a/drand/drand_leader_msg_test.go b/drand/drand_leader_msg_test.go new file mode 100644 index 000000000..93b7256d5 --- /dev/null +++ b/drand/drand_leader_msg_test.go @@ -0,0 +1,26 @@ +package drand + +import ( + "testing" + + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" +) + +func TestConstructInitMessage(test *testing.T) { + leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} + validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } + dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand.blockHash = [32]byte{} + msg := dRand.constructInitMessage() + + if len(msg) != 87 { + test.Errorf("Init message is not constructed in the correct size: %d", len(msg)) + } +} diff --git a/drand/drand_test.go b/drand/drand_test.go new file mode 100644 index 000000000..b089221a9 --- /dev/null +++ b/drand/drand_test.go @@ -0,0 +1,24 @@ +package drand + +import ( + "testing" + + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" +) + +func TestNew(test *testing.T) { + leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} + validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } + dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) + + if !dRand.IsLeader { + test.Error("dRand should belong to a leader") + } +} diff --git a/drand/drand_validator.go b/drand/drand_validator.go new file mode 100644 index 000000000..151becd01 --- /dev/null +++ b/drand/drand_validator.go @@ -0,0 +1,52 @@ +package drand + +import ( + protobuf "github.com/golang/protobuf/proto" + drand_proto "github.com/harmony-one/harmony/api/drand" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p/host" +) + +// ProcessMessageValidator dispatches messages for the validator to corresponding processors. +func (dRand *DRand) ProcessMessageValidator(payload []byte) { + message := drand_proto.Message{} + err := protobuf.Unmarshal(payload, &message) + + if err != nil { + utils.GetLogInstance().Error("Failed to unmarshal message payload.", "err", err, "dRand", dRand) + } + + switch message.Type { + case drand_proto.MessageType_COMMIT: + dRand.processInitMessage(message) + default: + utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand) + } +} + +// ProcessMessageValidator dispatches validator's consensus message. +func (dRand *DRand) processInitMessage(message drand_proto.Message) { + if message.Type != drand_proto.MessageType_INIT { + utils.GetLogInstance().Error("Wrong message type received", "expected", drand_proto.MessageType_INIT, "got", message.Type) + return + } + + blockHash := message.BlockHash + + // Verify message signature + err := verifyMessageSig(dRand.leader.PubKey, message) + if err != nil { + utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) + return + } + + // TODO: check the blockHash is the block hash of last block of last epoch. + copy(dRand.blockHash[:], blockHash[:]) + + rand, proof := dRand.vrf(dRand.blockHash) + + msgToSend := dRand.constructCommitMessage(rand, proof) + + // Send the commit message back to leader + host.SendMessage(dRand.host, dRand.leader, msgToSend, nil) +} diff --git a/drand/drand_validator_msg.go b/drand/drand_validator_msg.go new file mode 100644 index 000000000..d4fca412f --- /dev/null +++ b/drand/drand_validator_msg.go @@ -0,0 +1,22 @@ +package drand + +import ( + drand_proto "github.com/harmony-one/harmony/api/drand" + "github.com/harmony-one/harmony/api/proto" + "github.com/harmony-one/harmony/internal/utils" +) + +// Constructs the init message +func (drand *DRand) constructCommitMessage(vrf [32]byte, proof []byte) []byte { + message := drand_proto.Message{} + message.Type = drand_proto.MessageType_COMMIT + + message.BlockHash = drand.blockHash[:] + message.Payload = append(vrf[:], proof...) + + marshaledMessage, err := drand.signAndMarshalDRandMessage(&message) + if err != nil { + utils.GetLogInstance().Error("Failed to sign and marshal the commit message", "error", err) + } + return proto.ConstructDRandMessage(marshaledMessage) +} diff --git a/drand/drand_validator_msg_test.go b/drand/drand_validator_msg_test.go new file mode 100644 index 000000000..aa5e86e8f --- /dev/null +++ b/drand/drand_validator_msg_test.go @@ -0,0 +1,26 @@ +package drand + +import ( + "testing" + + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2p/p2pimpl" +) + +func TestConstructCommitMessage(test *testing.T) { + leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} + validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2pimpl.NewHost(&leader, priKey) + if err != nil { + test.Fatalf("newhost failure: %v", err) + } + dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) + dRand.blockHash = [32]byte{} + msg := dRand.constructCommitMessage([32]byte{}, []byte{}) + + if len(msg) != 121 { + test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) + } +} diff --git a/internal/utils/connlogger.go b/internal/utils/connlogger.go new file mode 100644 index 000000000..d283867a3 --- /dev/null +++ b/internal/utils/connlogger.go @@ -0,0 +1,57 @@ +package utils + +import ( + "github.com/ethereum/go-ethereum/log" + net "github.com/libp2p/go-libp2p-net" + ma "github.com/multiformats/go-multiaddr" +) + +type connLogger struct{} + +func (connLogger) Listen(net net.Network, ma ma.Multiaddr) { + log.Debug("[CONNECTIONS] Listener starting", "net", net, "addr", ma) +} + +func (connLogger) ListenClose(net net.Network, ma ma.Multiaddr) { + log.Debug("[CONNECTIONS] Listener closing", "net", net, "addr", ma) +} + +func (connLogger) Connected(net net.Network, conn net.Conn) { + log.Debug("[CONNECTIONS] Connected", "net", net, + "localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), + "remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), + ) +} + +func (connLogger) Disconnected(net net.Network, conn net.Conn) { + log.Debug("[CONNECTIONS] Disconnected", "net", net, + "localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), + "remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), + ) +} + +func (connLogger) OpenedStream(net net.Network, stream net.Stream) { + conn := stream.Conn() + log.Debug("[CONNECTIONS] Stream opened", "net", net, + "localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), + "remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), + "protocol", stream.Protocol(), + ) +} + +func (connLogger) ClosedStream(net net.Network, stream net.Stream) { + conn := stream.Conn() + log.Debug("[CONNECTIONS] Stream closed", "net", net, + "localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), + "remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), + "protocol", stream.Protocol(), + ) +} + +// ConnLogger is a LibP2P connection logger. +// Add on a LibP2P host by calling: +// +// host.Network().Notify(utils.ConnLogger) +// +// It logs all listener/connection/stream open/close activities at debug level. +var ConnLogger connLogger diff --git a/internal/utils/flags.go b/internal/utils/flags.go index 8c206e18c..fff38b228 100644 --- a/internal/utils/flags.go +++ b/internal/utils/flags.go @@ -48,6 +48,7 @@ func StringsToAddrs(addrStrings []string) (maddrs []ma.Multiaddr, err error) { // DefaultBootNodeAddrStrings is a list of Harmony bootnodes address. Used to find other peers in the network. var DefaultBootNodeAddrStrings = []string{ + // FIXME: (leo) this is a bootnode I used for local test, change it to long running ones later "/ip4/127.0.0.1/tcp/9876/p2p/QmayB8NwxmfGE4Usb4H61M8uwbfc7LRbmXb3ChseJgbVuf", } diff --git a/internal/utils/utils.go b/internal/utils/utils.go index bc103a694..73fc96c71 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -150,19 +150,20 @@ func Load(path string, v interface{}) error { } // LoadPrivateKey parses the key string in base64 format and return PrivKey -func LoadPrivateKey(key string) (p2p_crypto.PrivKey, error) { +func LoadPrivateKey(key string) (p2p_crypto.PrivKey, p2p_crypto.PubKey, error) { if key != "" { k1, err := p2p_crypto.ConfigDecodeKey(key) if err != nil { - return nil, fmt.Errorf("failed to decode key: %v", err) + return nil, nil, fmt.Errorf("failed to decode key: %v", err) } priKey, err := p2p_crypto.UnmarshalPrivateKey(k1) if err != nil { - return nil, fmt.Errorf("failed to unmarshal private key: %v", err) + return nil, nil, fmt.Errorf("failed to unmarshal private key: %v", err) } - return priKey, nil + pubKey := priKey.GetPublic() + return priKey, pubKey, nil } - return nil, fmt.Errorf("empty key string") + return nil, nil, fmt.Errorf("empty key string") } // SavePrivateKey convert the PrivKey to base64 format and return string @@ -194,13 +195,13 @@ func SaveKeyToFile(keyfile string, key p2p_crypto.PrivKey) (err error) { // LoadKeyFromFile load private key from keyfile // If the private key is not loadable or no file, it will generate // a new random private key -func LoadKeyFromFile(keyfile string) (key p2p_crypto.PrivKey, err error) { +func LoadKeyFromFile(keyfile string) (key p2p_crypto.PrivKey, pk p2p_crypto.PubKey, err error) { var keyStruct PrivKeyStore err = Load(keyfile, &keyStruct) if err != nil { log.Print("No priviate key can be loaded from file", "keyfile", keyfile) log.Print("Using random private key") - key, _, err = GenKeyP2PRand() + key, pk, err = GenKeyP2PRand() if err != nil { log.Panic("LoadKeyFromFile", "GenKeyP2PRand Error", err) } @@ -208,8 +209,8 @@ func LoadKeyFromFile(keyfile string) (key p2p_crypto.PrivKey, err error) { if err != nil { log.Print("LoadKeyFromFile", "failed to save key to keyfile", err) } - return key, nil + return key, pk, nil } - key, err = LoadPrivateKey(keyStruct.Key) - return key, err + key, pk, err = LoadPrivateKey(keyStruct.Key) + return key, pk, err } diff --git a/internal/utils/utils_test.go b/internal/utils/utils_test.go index 4a813a2ef..f0027df6b 100644 --- a/internal/utils/utils_test.go +++ b/internal/utils/utils_test.go @@ -107,7 +107,7 @@ func TestSaveLoadPrivateKey(t *testing.T) { t.Fatalf("failed to save private key: %v", err) } - pk1, err := LoadPrivateKey(str) + pk1, _, err := LoadPrivateKey(str) if err != nil { t.Fatalf("failed to load key: %v", err) } @@ -135,7 +135,7 @@ func TestSaveLoadKeyFile(t *testing.T) { t.Fatalf("failed to save key to file: %v", err) } - key1, err := LoadKeyFromFile(filename) + key1, _, err := LoadKeyFromFile(filename) if err != nil { t.Fatalf("failed to load key from file (%s): %v", filename, err) } @@ -144,7 +144,7 @@ func TestSaveLoadKeyFile(t *testing.T) { t.Fatalf("loaded key is not equal to the saved one") } - key2, err := LoadKeyFromFile(nonexist) + key2, _, err := LoadKeyFromFile(nonexist) if err != nil { t.Fatalf("failed to load key from non-exist file: %v", err) diff --git a/node/node.go b/node/node.go index 0512d064f..ea21e326a 100644 --- a/node/node.go +++ b/node/node.go @@ -129,6 +129,7 @@ type Node struct { SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. BCPeers []p2p.Peer // list of Beacon Chain Peers. This is needed by all nodes. + // TODO: Neighbors should store only neighbor nodes in the same shard Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer State State // State of the Node stateMutex sync.Mutex // mutex for change node state @@ -172,6 +173,13 @@ type Node struct { TestBankKeys []*ecdsa.PrivateKey ContractKeys []*ecdsa.PrivateKey ContractAddresses []common.Address + + // Group Message Receiver + groupReceiver p2p.GroupReceiver + + // fully integrate with libp2p for networking + // FIXME: this is temporary hack until we can fully replace the old one + UseLibP2P bool } // Blockchain returns the blockchain from node @@ -203,7 +211,11 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) types.Transactions { // StartServer starts a server and process the requests by a handler. func (node *Node) StartServer() { - node.host.BindHandlerAndServe(node.StreamHandler) + if node.UseLibP2P { + select {} + } else { + node.host.BindHandlerAndServe(node.StreamHandler) + } } // Count the total number of transactions in the blockchain @@ -282,6 +294,9 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node.OfflinePeers = make(chan p2p.Peer) go node.RemovePeersHandler() + // start the goroutine to receive group message + go node.ReceiveGroupMessage() + return &node } @@ -343,14 +358,14 @@ func (node *Node) DoSyncing() { continue case consensusBlockInfo := <-node.Consensus.ConsensusBlock: if !node.IsOutOfSync(consensusBlockInfo) { + startHash := node.blockchain.CurrentBlock().Hash() + node.stateSync.StartStateSync(startHash[:], node.blockchain, node.Worker) if node.State == NodeNotInSync { utils.GetLogInstance().Info("[SYNC] Node is now IN SYNC!") } node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() - // wait for last mile block finish; think a better way - time.Sleep(200 * time.Millisecond) node.stateSync.CloseConnections() node.stateSync = nil continue @@ -657,7 +672,7 @@ func (node *Node) setupForShardLeader() { // Register new block service. node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. - node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.SelfPeer.IP, node.SelfPeer.Port)) + node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) } func (node *Node) setupForShardValidator() { @@ -666,6 +681,13 @@ func (node *Node) setupForShardValidator() { func (node *Node) setupForBeaconLeader() { chanPeer := make(chan p2p.Peer) + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID @@ -676,12 +698,19 @@ func (node *Node) setupForBeaconLeader() { // Register new block service. node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. - node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.SelfPeer.IP, node.SelfPeer.Port)) + node.serviceManager.RegisterService(service_manager.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) } func (node *Node) setupForBeaconValidator() { chanPeer := make(chan p2p.Peer) + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) // Register networkinfo service. "0" is the beacon shard ID @@ -692,6 +721,13 @@ func (node *Node) setupForNewNode() { chanPeer := make(chan p2p.Peer) stakingPeer := make(chan p2p.Peer) + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + // Register staking service. node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer)) // Register peer discovery service. "0" is the beacon shard ID diff --git a/node/node_handler.go b/node/node_handler.go index 007eded21..88e7979a5 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -2,8 +2,10 @@ package node import ( "bytes" + "context" "fmt" "os" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" @@ -43,9 +45,29 @@ func (node *Node) StreamHandler(s p2p.Stream) { utils.GetLogInstance().Error("Read p2p data failed", "err", err, "node", node) return } + node.messageHandler(content) } +// ReceiveGroupMessage use libp2p pubsub mechanism to receive broadcast messages +func (node *Node) ReceiveGroupMessage() { + ctx := context.Background() + for { + if node.groupReceiver == nil { + time.Sleep(100 * time.Millisecond) + continue + } + msg, sender, err := node.groupReceiver.Receive(ctx) + if sender != node.host.GetID() { + utils.GetLogInstance().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) + if err == nil { + // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size + node.messageHandler(msg[5:]) + } + } + } +} + // messageHandler parses the message and dispatch the actions func (node *Node) messageHandler(content []byte) { node.MaybeBroadcastAsValidator(content) @@ -239,7 +261,6 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { utils.GetLogInstance().Error("Can't get Ping Message") return -1 } - // utils.GetLogInstance().Info("Ping", "Msg", ping) peer := new(p2p.Peer) peer.IP = ping.Node.IP @@ -254,6 +275,12 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { return -1 } + utils.GetLogInstance().Debug("[pingMessageHandler]", "incoming peer", peer) + + // add to incoming peer list + node.host.AddIncomingPeer(*peer) + node.host.ConnectHostPeer(*peer) + if ping.Node.Role == proto_node.ClientRole { utils.GetLogInstance().Info("Add Client Peer to Node", "Node", node.Consensus.GetNodeID(), "Client", peer) node.ClientPeer = peer @@ -263,25 +290,39 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { // Add to Node's peer list anyway node.AddPeers([]*p2p.Peer{peer}) - peers := node.Consensus.GetValidatorPeers() - pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys) - buffer := pong.ConstructPongMessage() - - // Send a Pong message directly to the sender - // This is necessary because the sender will need to get a ValidatorID - // Just broadcast won't work, some validators won't receive the latest - // PublicKeys as we rely on a valid ValidatorID to do broadcast. - // This is very buggy, but we will move to libp2p, hope the problem will - // be resolved then. - // However, I disable it for now as we are sending redundant PONG messages - // to all validators. This may not be needed. But it maybe add back. - // p2p.SendMessage(*peer, buffer) - - // Broadcast the message to all validators, as publicKeys is updated - // FIXME: HAR-89 use a separate nodefind/neighbor message - host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers) - - return len(peers) + if node.Consensus.IsLeader { + peers := node.Consensus.GetValidatorPeers() + pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys) + buffer := pong.ConstructPongMessage() + + // Send a Pong message directly to the sender + // This is necessary because the sender will need to get a ValidatorID + // Just broadcast won't work, some validators won't receive the latest + // PublicKeys as we rely on a valid ValidatorID to do broadcast. + // This is very buggy, but we will move to libp2p, hope the problem will + // be resolved then. + // However, I disable it for now as we are sending redundant PONG messages + // to all validators. This may not be needed. But it maybe add back. + // p2p.SendMessage(*peer, buffer) + + // Broadcast the message to all validators, as publicKeys is updated + // FIXME: HAR-89 use a separate nodefind/neighbor message + + if node.UseLibP2P { + content := host.ConstructP2pMessage(byte(0), buffer) + err := node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) + if err != nil { + utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", p2p.GroupIDBeacon) + } else { + utils.GetLogInstance().Debug("[PONG] sent Pong Message via group send", "group", p2p.GroupIDBeacon) + } + } else { + host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers) + utils.GetLogInstance().Info("PingMsgHandler send pong message") + } + } + + return 1 } func (node *Node) pongMessageHandler(msgPayload []byte) int { @@ -291,8 +332,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return -1 } - // utils.GetLogInstance().Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID()) - peers := make([]*p2p.Peer, 0) for _, p := range pong.Peers { @@ -311,6 +350,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { peers = append(peers, peer) } + utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #peers", len(peers)) + if len(peers) > 0 { node.AddPeers(peers) } @@ -331,6 +372,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { publicKeys = append(publicKeys, &key) } + utils.GetLogInstance().Debug("[pongMessageHandler]", "received msg #keys", len(publicKeys)) + if node.State == NodeWaitToJoin { node.State = NodeReadyForConsensus // Notify JoinShard to stop sending Ping messages diff --git a/p2p/group.go b/p2p/group.go index 291d19a2f..5fcf6c690 100644 --- a/p2p/group.go +++ b/p2p/group.go @@ -22,6 +22,12 @@ func (id GroupID) String() string { return fmt.Sprintf("%x", string(id)) } +// Const of group ID +const ( + GroupIDBeacon GroupID = "harmony/0.0.1/beacon" + GroupIDGlobal GroupID = "harmony/0.0.1/global" +) + // GroupReceiver is a multicast group message receiver interface. type GroupReceiver interface { // Close closes this receiver. diff --git a/p2p/host.go b/p2p/host.go index aa8248d41..db39f50c1 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -17,6 +17,10 @@ type Host interface { GetID() peer.ID GetP2PHost() p2p_host.Host + AddIncomingPeer(Peer) + AddOutgoingPeer(Peer) + ConnectHostPeer(Peer) + // SendMessageToGroups sends a message to one or more multicast groups. SendMessageToGroups(groups []GroupID, msg []byte) error diff --git a/p2p/host/hostv2/hostv2.go b/p2p/host/hostv2/hostv2.go index 7717757b9..d37519c9e 100644 --- a/p2p/host/hostv2/hostv2.go +++ b/p2p/host/hostv2/hostv2.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "io" + "sync" "github.com/ethereum/go-ethereum/log" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" libp2p "github.com/libp2p/go-libp2p" @@ -25,6 +27,10 @@ const ( BatchSizeInByte = 1 << 16 // ProtocolID The ID of protocol used in stream handling. ProtocolID = "/harmony/0.0.1" + + // Constants for discovery service. + numIncoming = 128 + numOutgoing = 16 ) // PubSub captures the pubsub interface we expect from libp2p. @@ -39,6 +45,10 @@ type HostV2 struct { pubsub PubSub self p2p.Peer priKey p2p_crypto.PrivKey + lock sync.Mutex + + incomingPeers []p2p.Peer // list of incoming Peers. TODO: fixed number incoming + outgoingPeers []p2p.Peer // list of outgoing Peers. TODO: fixed number of outgoing } // SendMessageToGroups sends a message to one or more multicast groups. @@ -124,6 +134,16 @@ func (host *HostV2) AddPeer(p *p2p.Peer) error { return nil } +// AddIncomingPeer add peer to incoming peer list +func (host *HostV2) AddIncomingPeer(peer p2p.Peer) { + host.incomingPeers = append(host.incomingPeers, peer) +} + +// AddOutgoingPeer add peer to outgoing peer list +func (host *HostV2) AddOutgoingPeer(peer p2p.Peer) { + host.outgoingPeers = append(host.outgoingPeers, peer) +} + // Peerstore returns the peer store func (host *HostV2) Peerstore() peerstore.Peerstore { return host.h.Peerstore() @@ -142,7 +162,8 @@ func New(self *p2p.Peer, priKey p2p_crypto.PrivKey, opts ...p2p_config.Option) * append(opts, libp2p.ListenAddrs(listenAddr), libp2p.Identity(priKey))..., ) catchError(err) - pubsub, err := pubsub.NewGossipSub(ctx, p2pHost) + // pubsub, err := pubsub.NewGossipSub(ctx, p2pHost) + pubsub, err := pubsub.NewFloodSub(ctx, p2pHost) catchError(err) self.PeerID = p2pHost.ID() @@ -210,3 +231,26 @@ func (host *HostV2) Close() error { func (host *HostV2) GetP2PHost() p2p_host.Host { return host.h } + +// ConnectHostPeer connects to peer host +func (host *HostV2) ConnectHostPeer(peer p2p.Peer) { + ctx := context.Background() + addr := fmt.Sprintf("/ip4/%s/tcp/%s/ipfs/%s", peer.IP, peer.Port, peer.PeerID.Pretty()) + peerAddr, err := ma.NewMultiaddr(addr) + if err != nil { + utils.GetLogInstance().Error("ConnectHostPeer", "new ma error", err, "peer", peer) + return + } + peerInfo, err := peerstore.InfoFromP2pAddr(peerAddr) + if err != nil { + utils.GetLogInstance().Error("ConnectHostPeer", "new peerinfo error", err, "peer", peer) + return + } + host.lock.Lock() + defer host.lock.Unlock() + if err := host.h.Connect(ctx, *peerInfo); err != nil { + utils.GetLogInstance().Warn("can't connect to peer", "error", err, "peer", peer) + } else { + utils.GetLogInstance().Info("connected to peer host", "node", *peerInfo) + } +} diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index 694459f0c..1d00a3df3 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -5,11 +5,12 @@ package mock_p2p import ( + reflect "reflect" + gomock "github.com/golang/mock/gomock" p2p "github.com/harmony-one/harmony/p2p" go_libp2p_host "github.com/libp2p/go-libp2p-host" go_libp2p_peer "github.com/libp2p/go-libp2p-peer" - reflect "reflect" ) // MockHost is a mock of Host interface @@ -141,3 +142,33 @@ func (m *MockHost) GroupReceiver(arg0 p2p.GroupID) (p2p.GroupReceiver, error) { func (mr *MockHostMockRecorder) GroupReceiver(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GroupReceiver", reflect.TypeOf((*MockHost)(nil).GroupReceiver), arg0) } + +// AddIncomingPeer mocks base method +func (m *MockHost) AddIncomingPeer(peer p2p.Peer) { + m.ctrl.Call(m, "AddIncomingPeer", peer) +} + +// AddIncomingPeer indicates an expected call of AddIncomingPeer +func (mr *MockHostMockRecorder) AddIncomingPeer(groups, msg interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddIncomingPeer", reflect.TypeOf((*MockHost)(nil).AddIncomingPeer), groups, msg) +} + +// AddOutgoingPeer mocks base method +func (m *MockHost) AddOutgoingPeer(peer p2p.Peer) { + m.ctrl.Call(m, "AddOutgoingPeer", peer) +} + +// AddOutgoingPeer indicates an expected call of AddOutgoingPeer +func (mr *MockHostMockRecorder) AddOutgoingPeer(groups, msg interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddOutgoingPeer", reflect.TypeOf((*MockHost)(nil).AddOutgoingPeer), groups, msg) +} + +// ConnectHostPeer mocks base method +func (m *MockHost) ConnectHostPeer(peer p2p.Peer) { + m.ctrl.Call(m, "ConnectHostPeer", peer) +} + +// ConnectHostPeer indicates an expected call of ConnectHostPeer +func (mr *MockHostMockRecorder) ConnectHostPeer(groups, msg interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectHostPeer", reflect.TypeOf((*MockHost)(nil).ConnectHostPeer), groups, msg) +} diff --git a/test/chain/main.go b/test/chain/main.go index b401fbe2d..362304ef7 100644 --- a/test/chain/main.go +++ b/test/chain/main.go @@ -96,7 +96,7 @@ func main() { txs[i] = tx } //Add a contract deployment transaction. - contractData := "0x60806040526802b5e3af16b188000060015560028054600160a060020a031916331790556101aa806100326000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a5780634ddd108a1461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a7230582003d799bcee73e96e0f40ca432d9c3d2aa9c00a1eba8d00877114a0d7234790ce0029" + contractData := "0x60806040526706f05b59d3b2000060015560028054600160a060020a031916331790556101aa806100316000396000f3fe608060405260043610610045577c0100000000000000000000000000000000000000000000000000000000600035046327c78c42811461004a578063b69ef8a81461008c575b600080fd5b34801561005657600080fd5b5061008a6004803603602081101561006d57600080fd5b503573ffffffffffffffffffffffffffffffffffffffff166100b3565b005b34801561009857600080fd5b506100a1610179565b60408051918252519081900360200190f35b60025473ffffffffffffffffffffffffffffffffffffffff1633146100d757600080fd5b600154303110156100e757600080fd5b73ffffffffffffffffffffffffffffffffffffffff811660009081526020819052604090205460ff161561011a57600080fd5b73ffffffffffffffffffffffffffffffffffffffff8116600081815260208190526040808220805460ff1916600190811790915554905181156108fc0292818181858888f19350505050158015610175573d6000803e3d6000fd5b5050565b30319056fea165627a7a723058206b894c1f3badf3b26a7a2768ab8141b1e6fa1c1ddc4622f4f44a7d5041edc9350029" _ = contractData dataEnc := common.FromHex(contractData) diff --git a/test/deploy.sh b/test/deploy.sh index 0b7904f3b..3e3f67e9a 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -84,7 +84,7 @@ while getopts "hdtD:m:s:k:nSP" option; do case $option in h) usage ;; d) DB='-db_supported' ;; - t) TXGEN=$OPTARG ;; + t) TXGEN=false ;; D) DURATION=$OPTARG ;; m) MIN=$OPTARG ;; s) SHARDS=$OPTARG ;; @@ -130,28 +130,23 @@ log_folder="tmp_log/log-$t" mkdir -p $log_folder LOG_FILE=$log_folder/r.log -echo "launching beacon chain ..." -$DRYRUN $ROOT/bin/beacon -numShards $SHARDS > $log_folder/beacon.log 2>&1 | tee -a $LOG_FILE & -sleep 1 #waiting for beaconchain -BC_MA=$(grep "Beacon Chain Started" $log_folder/beacon.log | awk -F: ' { print $2 } ') - -echo "launching boot node ..." -$DRYRUN $ROOT/bin/bootnode > $log_folder/bootnode.log 2>&1 | tee -a $LOG_FILE & -sleep 1 -BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ') - HMY_OPT= HMY_OPT2= -if [ -n "$BC_MA" ]; then - HMY_OPT=" -bc_addr $BC_MA" -fi -if [ -n "$BN_MA" ]; then +if [ "$P2P" == "false" ]; then + echo "launching beacon chain ..." + $DRYRUN $ROOT/bin/beacon -numShards $SHARDS > $log_folder/beacon.log 2>&1 | tee -a $LOG_FILE & + sleep 1 #waiting for beaconchain + BC_MA=$(grep "Beacon Chain Started" $log_folder/beacon.log | awk -F: ' { print $2 } ') + HMY_OPT=" -bc_addr $BC_MA" +else + echo "launching boot node ..." + $DRYRUN $ROOT/bin/bootnode > $log_folder/bootnode.log 2>&1 | tee -a $LOG_FILE & + sleep 1 + BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ') HMY_OPT2=" -bootnodes $BN_MA" -fi - -if [ "$P2P" == "true" ]; then - HMY_OPT2+=" -libp2p_pd" + HMY_OPT2+=" -libp2p_pd -is_beacon" + TXGEN=false fi NUM_NN=0 @@ -159,10 +154,13 @@ NUM_NN=0 # Start nodes while IFS='' read -r line || [[ -n "$line" ]]; do IFS=' ' read ip port mode shardID <<< $line - if [[ "$mode" == "leader" || "$mode" == "validator" ]]; then + if [ "$mode" == "leader" ]; then + $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 -key /tmp/$ip-$port.key -is_leader 2>&1 | tee -a $LOG_FILE & + fi + if [ "$mode" == "validator" ]; then $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE & - sleep 0.5 fi + sleep 0.5 if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then (( NUM_NN += 35 )) (sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT $HMY_OPT2 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) & @@ -182,10 +180,12 @@ if [ "$TXGEN" == "true" ]; then if [ "$mode" == "client" ]; then $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT 2>&1 | tee -a $LOG_FILE fi +else + sleep $DURATION fi # save bc_config.json -cp -f bc_config.json $log_folder +[ -e bc_config.json ] && cp -f bc_config.json $log_folder cleanup check_result diff --git a/test/kill_node.sh b/test/kill_node.sh index ca8164b9d..7891e3138 100755 --- a/test/kill_node.sh +++ b/test/kill_node.sh @@ -1,6 +1,6 @@ #!/bin/bash -for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|beacon" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`; +for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|beacon\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`; do echo 'Killed process: '$pid kill -9 $pid