From fe6fc151bff7cc5b9b9ad77039c104878ccd988f Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Fri, 23 Nov 2018 11:30:44 -0800 Subject: [PATCH 01/12] change constants to gofmt style --- client/client.go | 2 +- node/node_handler.go | 22 +++++++++++----------- proto/client/client.go | 8 ++++---- proto/common.go | 8 ++++---- proto/node/node.go | 36 ++++++++++++++++++------------------ proto/node/pingpong.go | 4 ++-- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/client/client.go b/client/client.go index 7bfa94966..5522f2a4e 100644 --- a/client/client.go +++ b/client/client.go @@ -25,7 +25,7 @@ type Client struct { log log.Logger // Log utility } -// The message handler for CLIENT/Transaction messages. +// The message handler for Client/Transaction messages. func (client *Client) TransactionMessageHandler(msgPayload []byte) { messageType := client_proto.TransactionMessageType(msgPayload[0]) switch messageType { diff --git a/node/node_handler.go b/node/node_handler.go index fd1c711cf..9c36242a8 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -29,7 +29,7 @@ const ( // MinNumberOfTransactionsPerBlock is the min number of transaction per a block. MinNumberOfTransactionsPerBlock = 6000 // MaxNumberOfTransactionsPerBlock is the max number of transaction per a block. - MaxNumberOfTransactionsPerBlock = 20000 + MaxNumberOfTransactionsPerBlock = 8000 // NumBlocksBeforeStateBlock is the number of blocks allowed before generating state block NumBlocksBeforeStateBlock = 1000 ) @@ -103,14 +103,14 @@ func (node *Node) NodeHandler(conn net.Conn) { consensusObj.ProcessMessageValidator(msgPayload) } } - case proto.NODE: + case proto.Node: actionType := proto_node.NodeMessageType(msgType) switch actionType { case proto_node.Transaction: node.log.Info("NET: received message: Node/Transaction") node.transactionMessageHandler(msgPayload) - case proto_node.BLOCK: - node.log.Info("NET: received message: Node/BLOCK") + case proto_node.Block: + node.log.Info("NET: received message: Node/Block") blockMsgType := proto_node.BlockMessageType(msgPayload[0]) switch blockMsgType { case proto_node.Sync: @@ -124,8 +124,8 @@ func (node *Node) NodeHandler(conn net.Conn) { case proto_node.BlockchainSync: node.log.Info("NET: received message: Node/BlockchainSync") node.handleBlockchainSync(msgPayload, conn) - case proto_node.CLIENT: - node.log.Info("NET: received message: Node/CLIENT") + case proto_node.Client: + node.log.Info("NET: received message: Node/Client") clientMsgType := proto_node.ClientMessageType(msgPayload[0]) switch clientMsgType { case proto_node.LookupUtxo: @@ -138,8 +138,8 @@ func (node *Node) NodeHandler(conn net.Conn) { p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) } - case proto_node.CONTROL: - node.log.Info("NET: received message: Node/CONTROL") + case proto_node.Control: + node.log.Info("NET: received message: Node/Control") controlType := msgPayload[0] if proto_node.ControlMessageType(controlType) == proto_node.STOP { node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) @@ -191,9 +191,9 @@ func (node *Node) NodeHandler(conn net.Conn) { node.log.Info("NET: received message: PONG") node.pongMessageHandler(msgPayload) } - case proto.CLIENT: + case proto.Client: actionType := client.ClientMessageType(msgType) - node.log.Info("NET: received message: CLIENT/Transaction") + node.log.Info("NET: received message: Client/Transaction") switch actionType { case client.Transaction: if node.Client != nil { @@ -235,7 +235,7 @@ FOR_LOOP: } msgCategory, _ := proto.GetMessageCategory(content) - if err != nil || msgCategory != proto.NODE { + if err != nil || msgCategory != proto.Node { node.log.Error("Failed in reading message category from syncing node", err) return } diff --git a/proto/client/client.go b/proto/client/client.go index f06d6ebd7..5bcc08177 100644 --- a/proto/client/client.go +++ b/proto/client/client.go @@ -8,7 +8,7 @@ import ( "github.com/harmony-one/harmony/proto" ) -// The specific types of message under CLIENT category +// The specific types of message under Client category type ClientMessageType byte const ( @@ -16,7 +16,7 @@ const ( // TODO: add more types ) -// The types of messages used for CLIENT/Transaction +// The types of messages used for Client/Transaction type TransactionMessageType int const ( @@ -31,7 +31,7 @@ type FetchUtxoResponseMessage struct { // [leader] Constructs the proof of accept or reject message that will be sent to client func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.CLIENT)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)}) byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(ProofOfLock)) encoder := gob.NewEncoder(byteBuffer) @@ -42,7 +42,7 @@ func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof // Constructs the response message to fetch utxo message func ConstructFetchUtxoResponseMessage(utxoMap *blockchain.UtxoMap, shardID uint32) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.CLIENT)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)}) byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(UtxoResponse)) encoder := gob.NewEncoder(byteBuffer) diff --git a/proto/common.go b/proto/common.go index 3a7f8b81f..be8100e0f 100644 --- a/proto/common.go +++ b/proto/common.go @@ -10,12 +10,12 @@ The message structure of any message in Harmony network ---- content start ----- 1 byte - message category 0x00: Consensus - 0x01: NODE... + 0x01: Node... 1 byte - message type - for Consensus category 0x00: consensus 0x01: sharding ... - - for NODE category + - for Node category 0x00: transaction ... n - 2 bytes - actual message payload ---- content end ----- @@ -27,8 +27,8 @@ type MessageCategory byte //Consensus and other message categories const ( Consensus MessageCategory = iota - NODE - CLIENT + Node + Client Identity // TODO: add more types ) diff --git a/proto/node/node.go b/proto/node/node.go index 6ce5f462d..5e2f86ced 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -10,7 +10,7 @@ import ( "github.com/harmony-one/harmony/proto" ) -// NodeMessageType is to indicate the specific type of message under NODE category +// NodeMessageType is to indicate the specific type of message under Node category type NodeMessageType byte const ( @@ -19,9 +19,9 @@ const ( const ( Transaction NodeMessageType = iota - BLOCK - CLIENT - CONTROL + Block + Client + Control BlockchainSync PING // node send ip/pki to register with leader PONG // node broadcast pubK @@ -43,7 +43,7 @@ const ( GetBlock ) -// TransactionMessageType representa the types of messages used for NODE/Transaction +// TransactionMessageType representa the types of messages used for Node/Transaction type TransactionMessageType int const ( @@ -52,21 +52,21 @@ const ( Unlock ) -// BlockMessageType represents the types of messages used for NODE/BLOCK +// BlockMessageType represents the types of messages used for Node/Block type BlockMessageType int const ( Sync BlockMessageType = iota ) -// The types of messages used for NODE/BLOCK +// The types of messages used for Node/Block type ClientMessageType int const ( LookupUtxo ClientMessageType = iota ) -// The types of messages used for NODE/CONTROL +// The types of messages used for Node/Control type ControlMessageType int // ControlMessageType @@ -105,7 +105,7 @@ func DeserializeBlockchainSyncMessage(d []byte) (*BlockchainSyncMessage, error) // ConstructUnlockToCommitOrAbortMessage constructs the unlock to commit or abort message that will be sent to leaders. // This is for client. func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Unlock)) encoder := gob.NewEncoder(byteBuffer) @@ -116,8 +116,8 @@ func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transactio // ConstructFetchUtxoMessage constructs the fetch utxo message that will be sent to Harmony network. // this is for client. func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) - byteBuffer.WriteByte(byte(CLIENT)) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) + byteBuffer.WriteByte(byte(Client)) byteBuffer.WriteByte(byte(LookupUtxo)) encoder := gob.NewEncoder(byteBuffer) @@ -128,7 +128,7 @@ func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte { // ConstructTransactionListMessage constructs serialized transactions func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Send)) encoder := gob.NewEncoder(byteBuffer) @@ -146,7 +146,7 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b // ConstructBlockchainSyncMessage constructs Blockchain Sync Message. func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(BlockchainSync)) byteBuffer.WriteByte(byte(msgType)) if msgType != GetLastBlockHashes { @@ -165,7 +165,7 @@ func GenerateBlockchainSyncMessage(payload []byte) *BlockchainSyncMessage { // ConstructRequestTransactionsMessage constructs serialized transactions func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Request)) for _, txID := range transactionIds { @@ -176,16 +176,16 @@ func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { // ConstructStopMessage constructs STOP message for node to stop func ConstructStopMessage() []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) - byteBuffer.WriteByte(byte(CONTROL)) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) + byteBuffer.WriteByte(byte(Control)) byteBuffer.WriteByte(byte(STOP)) return byteBuffer.Bytes() } // ConstructBlocksSyncMessage constructs blocks sync message to send blocks to other nodes func ConstructBlocksSyncMessage(blocks []blockchain.Block) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) - byteBuffer.WriteByte(byte(BLOCK)) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) + byteBuffer.WriteByte(byte(Block)) byteBuffer.WriteByte(byte(Sync)) encoder := gob.NewEncoder(byteBuffer) diff --git a/proto/node/pingpong.go b/proto/node/pingpong.go index c652c3045..bd2e09a83 100644 --- a/proto/node/pingpong.go +++ b/proto/node/pingpong.go @@ -124,7 +124,7 @@ func GetPongMessage(payload []byte) (*PongMessageType, error) { // ConstructPingMessage contructs ping message from node to leader func (ping PingMessageType) ConstructPingMessage() []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(PING)) encoder := gob.NewEncoder(byteBuffer) @@ -138,7 +138,7 @@ func (ping PingMessageType) ConstructPingMessage() []byte { // ConstructPongMessage contructs pong message from leader to node func (pong PongMessageType) ConstructPongMessage() []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer.WriteByte(byte(PONG)) encoder := gob.NewEncoder(byteBuffer) From 38a690bfc20a25230d66713f40f3f0587ade785f Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 11:16:29 -0800 Subject: [PATCH 02/12] add func for download and testing --- node/node.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/node/node.go b/node/node.go index fee218829..e26450151 100644 --- a/node/node.go +++ b/node/node.go @@ -80,6 +80,16 @@ type Node struct { testBankKey *ecdsa.PrivateKey } +// GetBlockHashes used for state download. +func (node *Node) GetBlockHashes() [][32]byte { + return node.blockchain.GetBlockHashes() +} + +// SetBlockchain is used for testing +func (node *Node) SetBlockchain(bc *blockchain.Blockchain) { + node.blockchain = bc +} + // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client func (node *Node) addCrossTxsToReturn(crossTxs []*blockchain.CrossShardTxAndProof) { node.crossTxToReturnMutex.Lock() From ec51aa168a797b3df2eb92cfcd0bd7944454bcdc Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 11:17:38 -0800 Subject: [PATCH 03/12] add download proto --- syncing/downloader/proto/downloader.pb.go | 240 ++++++++++++++++++++++ syncing/downloader/proto/downloader.proto | 29 +++ 2 files changed, 269 insertions(+) create mode 100644 syncing/downloader/proto/downloader.pb.go create mode 100644 syncing/downloader/proto/downloader.proto diff --git a/syncing/downloader/proto/downloader.pb.go b/syncing/downloader/proto/downloader.pb.go new file mode 100644 index 000000000..f5610f28d --- /dev/null +++ b/syncing/downloader/proto/downloader.pb.go @@ -0,0 +1,240 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: downloader.proto + +package downloader + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type DownloaderRequest_RequestType int32 + +const ( + DownloaderRequest_HEADER DownloaderRequest_RequestType = 0 + DownloaderRequest_BLOCK DownloaderRequest_RequestType = 1 + DownloaderRequest_UNKOWN DownloaderRequest_RequestType = 2 +) + +var DownloaderRequest_RequestType_name = map[int32]string{ + 0: "HEADER", + 1: "BLOCK", + 2: "UNKOWN", +} + +var DownloaderRequest_RequestType_value = map[string]int32{ + "HEADER": 0, + "BLOCK": 1, + "UNKOWN": 2, +} + +func (x DownloaderRequest_RequestType) String() string { + return proto.EnumName(DownloaderRequest_RequestType_name, int32(x)) +} + +func (DownloaderRequest_RequestType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_6a99ec95c7ab1ff1, []int{0, 0} +} + +// DownloaderRequest is the generic download request. +type DownloaderRequest struct { + // Request type. + Type DownloaderRequest_RequestType `protobuf:"varint,1,opt,name=type,proto3,enum=downloader.DownloaderRequest_RequestType" json:"type,omitempty"` + // The array of ids or heights of the blocks we want to download. + Height []int32 `protobuf:"varint,2,rep,packed,name=height,proto3" json:"height,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DownloaderRequest) Reset() { *m = DownloaderRequest{} } +func (m *DownloaderRequest) String() string { return proto.CompactTextString(m) } +func (*DownloaderRequest) ProtoMessage() {} +func (*DownloaderRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_6a99ec95c7ab1ff1, []int{0} +} + +func (m *DownloaderRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DownloaderRequest.Unmarshal(m, b) +} +func (m *DownloaderRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DownloaderRequest.Marshal(b, m, deterministic) +} +func (m *DownloaderRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DownloaderRequest.Merge(m, src) +} +func (m *DownloaderRequest) XXX_Size() int { + return xxx_messageInfo_DownloaderRequest.Size(m) +} +func (m *DownloaderRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DownloaderRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DownloaderRequest proto.InternalMessageInfo + +func (m *DownloaderRequest) GetType() DownloaderRequest_RequestType { + if m != nil { + return m.Type + } + return DownloaderRequest_HEADER +} + +func (m *DownloaderRequest) GetHeight() []int32 { + if m != nil { + return m.Height + } + return nil +} + +// DownloaderResponse is the generic response of DownloaderRequest. +type DownloaderResponse struct { + // payload of Block. + Payload [][]byte `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DownloaderResponse) Reset() { *m = DownloaderResponse{} } +func (m *DownloaderResponse) String() string { return proto.CompactTextString(m) } +func (*DownloaderResponse) ProtoMessage() {} +func (*DownloaderResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_6a99ec95c7ab1ff1, []int{1} +} + +func (m *DownloaderResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DownloaderResponse.Unmarshal(m, b) +} +func (m *DownloaderResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DownloaderResponse.Marshal(b, m, deterministic) +} +func (m *DownloaderResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DownloaderResponse.Merge(m, src) +} +func (m *DownloaderResponse) XXX_Size() int { + return xxx_messageInfo_DownloaderResponse.Size(m) +} +func (m *DownloaderResponse) XXX_DiscardUnknown() { + xxx_messageInfo_DownloaderResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_DownloaderResponse proto.InternalMessageInfo + +func (m *DownloaderResponse) GetPayload() [][]byte { + if m != nil { + return m.Payload + } + return nil +} + +func init() { + proto.RegisterEnum("downloader.DownloaderRequest_RequestType", DownloaderRequest_RequestType_name, DownloaderRequest_RequestType_value) + proto.RegisterType((*DownloaderRequest)(nil), "downloader.DownloaderRequest") + proto.RegisterType((*DownloaderResponse)(nil), "downloader.DownloaderResponse") +} + +func init() { proto.RegisterFile("downloader.proto", fileDescriptor_6a99ec95c7ab1ff1) } + +var fileDescriptor_6a99ec95c7ab1ff1 = []byte{ + // 213 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x48, 0xc9, 0x2f, 0xcf, + 0xcb, 0xc9, 0x4f, 0x4c, 0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88, + 0x28, 0xcd, 0x61, 0xe4, 0x12, 0x74, 0x81, 0x73, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, + 0x6c, 0xb9, 0x58, 0x4a, 0x2a, 0x0b, 0x52, 0x25, 0x18, 0x15, 0x18, 0x35, 0xf8, 0x8c, 0x34, 0xf5, + 0x90, 0x8c, 0xc0, 0x50, 0xac, 0x07, 0xa5, 0x43, 0x2a, 0x0b, 0x52, 0x83, 0xc0, 0xda, 0x84, 0xc4, + 0xb8, 0xd8, 0x32, 0x52, 0x33, 0xd3, 0x33, 0x4a, 0x24, 0x98, 0x14, 0x98, 0x35, 0x58, 0x83, 0xa0, + 0x3c, 0x25, 0x03, 0x2e, 0x6e, 0x24, 0xc5, 0x42, 0x5c, 0x5c, 0x6c, 0x1e, 0xae, 0x8e, 0x2e, 0xae, + 0x41, 0x02, 0x0c, 0x42, 0x9c, 0x5c, 0xac, 0x4e, 0x3e, 0xfe, 0xce, 0xde, 0x02, 0x8c, 0x20, 0xe1, + 0x50, 0x3f, 0x6f, 0xff, 0x70, 0x3f, 0x01, 0x26, 0x25, 0x3d, 0x2e, 0x21, 0x64, 0x0b, 0x8b, 0x0b, + 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x24, 0xb8, 0xd8, 0x0b, 0x12, 0x2b, 0x41, 0x82, 0x12, 0x8c, 0x0a, + 0xcc, 0x1a, 0x3c, 0x41, 0x30, 0xae, 0x51, 0x18, 0x17, 0x17, 0x42, 0xbd, 0x90, 0x07, 0x17, 0x6b, + 0x60, 0x69, 0x6a, 0x51, 0xa5, 0x90, 0x2c, 0x5e, 0x1f, 0x48, 0xc9, 0xe1, 0x92, 0x86, 0xd8, 0xa7, + 0xc4, 0x90, 0xc4, 0x06, 0x0e, 0x39, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x6d, 0x18, + 0x54, 0x4d, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// DownloaderClient is the client API for Downloader service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type DownloaderClient interface { + Query(ctx context.Context, in *DownloaderRequest, opts ...grpc.CallOption) (*DownloaderResponse, error) +} + +type downloaderClient struct { + cc *grpc.ClientConn +} + +func NewDownloaderClient(cc *grpc.ClientConn) DownloaderClient { + return &downloaderClient{cc} +} + +func (c *downloaderClient) Query(ctx context.Context, in *DownloaderRequest, opts ...grpc.CallOption) (*DownloaderResponse, error) { + out := new(DownloaderResponse) + err := c.cc.Invoke(ctx, "/downloader.Downloader/Query", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DownloaderServer is the server API for Downloader service. +type DownloaderServer interface { + Query(context.Context, *DownloaderRequest) (*DownloaderResponse, error) +} + +func RegisterDownloaderServer(s *grpc.Server, srv DownloaderServer) { + s.RegisterService(&_Downloader_serviceDesc, srv) +} + +func _Downloader_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DownloaderRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DownloaderServer).Query(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/downloader.Downloader/Query", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DownloaderServer).Query(ctx, req.(*DownloaderRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Downloader_serviceDesc = grpc.ServiceDesc{ + ServiceName: "downloader.Downloader", + HandlerType: (*DownloaderServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Query", + Handler: _Downloader_Query_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "downloader.proto", +} diff --git a/syncing/downloader/proto/downloader.proto b/syncing/downloader/proto/downloader.proto new file mode 100644 index 000000000..3b8b4658b --- /dev/null +++ b/syncing/downloader/proto/downloader.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package downloader; + +// Downloader is the service used for downloading/sycning blocks. +service Downloader { + rpc Query(DownloaderRequest) returns (DownloaderResponse) {} +} + +// DownloaderRequest is the generic download request. +message DownloaderRequest { + enum RequestType { + HEADER = 0; + BLOCK = 1; + UNKOWN = 2; + } + + // Request type. + RequestType type = 1; + + // The array of ids or heights of the blocks we want to download. + repeated int32 height = 2; +} + +// DownloaderResponse is the generic response of DownloaderRequest. +message DownloaderResponse { + // payload of Block. + repeated bytes payload = 1; +} From c43378352dc28cbebf18a1cc728efed3caffa4dd Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 11:18:06 -0800 Subject: [PATCH 04/12] add server api for downloader --- syncing/downloader/server.go | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 syncing/downloader/server.go diff --git a/syncing/downloader/server.go b/syncing/downloader/server.go new file mode 100644 index 000000000..532371a99 --- /dev/null +++ b/syncing/downloader/server.go @@ -0,0 +1,48 @@ +package downloader + +import ( + "context" + "fmt" + "net" + + "github.com/harmony-one/harmony/node" + pb "github.com/harmony-one/harmony/syncing/downloader/proto" + "google.golang.org/grpc" +) + +// Server ... +type Server struct { + node *node.Node +} + +// Query returns the feature at the given point. +func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { + res := &pb.DownloaderResponse{} + if request.Type == pb.DownloaderRequest_HEADER { + } else { + res.Payload = append(res.Payload, []byte{1}) + } + return res, nil +} + +// Start ... +func (s *Server) Start(port string) error { + if s.node == nil { + return ErrDownloaderWithNoNode + } + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", port)) + if err != nil { + + } + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + pb.RegisterDownloaderServer(grpcServer, s) + grpcServer.Serve(lis) + return nil +} + +// NewServer ... +func NewServer(node *node.Node) *Server { + s := &Server{node: node} + return s +} From 0d0f1fd5c982e03bf01e21f8eb5bc06e8aa810ff Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 11:18:30 -0800 Subject: [PATCH 05/12] add client and server_test --- syncing/downloader/client.go | 48 +++++++++++++++++++++++++++++++ syncing/downloader/errors.go | 8 ++++++ syncing/downloader/server_test.go | 36 +++++++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 syncing/downloader/client.go create mode 100644 syncing/downloader/errors.go create mode 100644 syncing/downloader/server_test.go diff --git a/syncing/downloader/client.go b/syncing/downloader/client.go new file mode 100644 index 000000000..5b85898d2 --- /dev/null +++ b/syncing/downloader/client.go @@ -0,0 +1,48 @@ +package downloader + +import ( + "context" + "fmt" + "log" + "time" + + pb "github.com/harmony-one/harmony/syncing/downloader/proto" + "google.golang.org/grpc" +) + +// Client ... +type Client struct { + dlClient pb.DownloaderClient + opts []grpc.DialOption + conn *grpc.ClientConn +} + +// GetHeaders ... +func (client *Client) GetHeaders() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} + + result, err := client.dlClient.Query(ctx, request) + if err != nil { + log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err) + } + log.Println(result) +} + +// Close ... +func (client *Client) Close() { + client.conn.Close() +} + +// ClientSetUp ... +func ClientSetUp(ip, port string) *Client { + client := Client{} + var err error + client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) + if err != nil { + log.Fatalf("fail to dial: %v", err) + } + client.dlClient = pb.NewDownloaderClient(client.conn) + return &client +} diff --git a/syncing/downloader/errors.go b/syncing/downloader/errors.go new file mode 100644 index 000000000..b5aacc029 --- /dev/null +++ b/syncing/downloader/errors.go @@ -0,0 +1,8 @@ +package downloader + +import "errors" + +// Errors ... +var ( + ErrDownloaderWithNoNode = errors.New("no node attached") +) diff --git a/syncing/downloader/server_test.go b/syncing/downloader/server_test.go new file mode 100644 index 000000000..8a4e13654 --- /dev/null +++ b/syncing/downloader/server_test.go @@ -0,0 +1,36 @@ +package downloader + +import ( + "testing" + + "github.com/harmony-one/harmony/blockchain" + "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/node" +) + +const ( + serverPort = "9997" + serverIP = "127.0.0.1" + clientPort = "9999" +) + +var ( + PriIntOne = 111 + TestAddressOne = pki.GetAddressFromInt(PriIntOne) +) + +func setupServer() *Server { + bc := blockchain.CreateBlockchain(TestAddressOne, 0) + node := &node.Node{} + node.SetBlockchain(bc) + server := NewServer(node) + return server +} + +func TestGetBlockHashes(t *testing.T) { + server := setupServer() + server.Start(serverIP) + + // client := ClientSetUp(serverIP, serverPort) + // client.GetHeaders() +} From 36382f50744d39dacef272c1b368602a7247c405 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 12:28:13 -0800 Subject: [PATCH 06/12] add working version of client and server --- blockchain/blockchain.go | 12 ++++++++++ syncing/downloader/client/client.go | 37 +++++++++++++++++++++++++++++ syncing/downloader/server/server.go | 37 +++++++++++++++++++++++++++++ syncing/downloader/server_test.go | 6 ++++- 4 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 syncing/downloader/client/client.go create mode 100644 syncing/downloader/server/server.go diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 02a40668d..70e076ab2 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -233,6 +233,18 @@ func CreateBlockchain(address [20]byte, shardID uint32) *Blockchain { return &bc } +// CreateBlockchainWithMoreBlocks ... +func CreateBlockchainWithMoreBlocks(addresses [][20]byte, shardID uint32) *Blockchain { + blocks := make([]*Block, 0) + for _, address := range addresses { + cbtx := NewCoinbaseTX(address, genesisCoinbaseData, shardID) + blocks = append(blocks, NewGenesisBlock(cbtx, shardID)) + } + + bc := Blockchain{blocks} + return &bc +} + // CreateStateBlock creates state block based on the utxos. func (bc *Blockchain) CreateStateBlock(utxoPool *UTXOPool) *Block { var numBlocks int32 diff --git a/syncing/downloader/client/client.go b/syncing/downloader/client/client.go new file mode 100644 index 000000000..f34b5d648 --- /dev/null +++ b/syncing/downloader/client/client.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + pb "github.com/harmony-one/harmony/syncing/downloader/proto" + "google.golang.org/grpc" +) + +// printResult ... +func printResult(client pb.DownloaderClient) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} + response, err := client.Query(ctx, request) + if err != nil { + log.Fatalf("Error") + } + log.Println(response) +} + +func main() { + flag.Parse() + var opts []grpc.DialOption + opts = append(opts, grpc.WithInsecure()) + conn, err := grpc.Dial("localhost:9999", opts...) + if err != nil { + log.Fatalf("fail to dial: %v", err) + } + defer conn.Close() + client := pb.NewDownloaderClient(conn) + + printResult(client) +} diff --git a/syncing/downloader/server/server.go b/syncing/downloader/server/server.go new file mode 100644 index 000000000..811225117 --- /dev/null +++ b/syncing/downloader/server/server.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "log" + "net" + + "google.golang.org/grpc" + + pb "github.com/harmony-one/harmony/syncing/downloader/proto" +) + +type downloaderServer struct { +} + +// GetFeature returns the feature at the given point. +func (s *downloaderServer) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { + response := pb.DownloaderResponse{} + response.Payload = [][]byte{{0, 0, 2}} + return &response, nil +} + +func newServer() *downloaderServer { + s := &downloaderServer{} + return s +} + +func main() { + lis, err := net.Listen("tcp", "localhost:9999") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + pb.RegisterDownloaderServer(grpcServer, newServer()) + grpcServer.Serve(lis) +} diff --git a/syncing/downloader/server_test.go b/syncing/downloader/server_test.go index 8a4e13654..1af1509a7 100644 --- a/syncing/downloader/server_test.go +++ b/syncing/downloader/server_test.go @@ -1,6 +1,7 @@ package downloader import ( + "fmt" "testing" "github.com/harmony-one/harmony/blockchain" @@ -16,14 +17,17 @@ const ( var ( PriIntOne = 111 + PriIntTwo = 222 TestAddressOne = pki.GetAddressFromInt(PriIntOne) + TestAddressTwo = pki.GetAddressFromInt(PriIntTwo) ) func setupServer() *Server { - bc := blockchain.CreateBlockchain(TestAddressOne, 0) + bc := blockchain.CreateBlockchainWithMoreBlocks([][20]byte{TestAddressOne, TestAddressTwo}, 0) node := &node.Node{} node.SetBlockchain(bc) server := NewServer(node) + fmt.Println("minh ", len(bc.Blocks)) return server } From 0fcc95d695432560b6390dfc3d04a26484e2b5fa Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 13:14:23 -0800 Subject: [PATCH 07/12] convert working client/server into apis --- syncing/downloader/client/client.go | 54 ++++++++++++++++++++++------- syncing/downloader/server/server.go | 37 ++++++++++++++------ 2 files changed, 68 insertions(+), 23 deletions(-) diff --git a/syncing/downloader/client/client.go b/syncing/downloader/client/client.go index f34b5d648..d2f06d75a 100644 --- a/syncing/downloader/client/client.go +++ b/syncing/downloader/client/client.go @@ -2,7 +2,7 @@ package main import ( "context" - "flag" + "fmt" "log" "time" @@ -10,28 +10,58 @@ import ( "google.golang.org/grpc" ) -// printResult ... -func printResult(client pb.DownloaderClient) { +// PrintResult ... +func PrintResult(client *Client) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} - response, err := client.Query(ctx, request) + response, err := client.dlClient.Query(ctx, request) if err != nil { log.Fatalf("Error") } log.Println(response) } -func main() { - flag.Parse() - var opts []grpc.DialOption - opts = append(opts, grpc.WithInsecure()) - conn, err := grpc.Dial("localhost:9999", opts...) +// Client ... +type Client struct { + dlClient pb.DownloaderClient + opts []grpc.DialOption + conn *grpc.ClientConn +} + +// ClientSetup ... +func ClientSetup(ip, port string) *Client { + client := Client{} + client.opts = append(client.opts, grpc.WithInsecure()) + var err error + client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) if err != nil { log.Fatalf("fail to dial: %v", err) } - defer conn.Close() - client := pb.NewDownloaderClient(conn) - printResult(client) + client.dlClient = pb.NewDownloaderClient(client.conn) + return &client +} + +// Close ... +func (client *Client) Close() { + client.conn.Close() +} + +// GetHeaders ... +func (client *Client) GetHeaders() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} + response, err := client.dlClient.Query(ctx, request) + if err != nil { + log.Fatalf("Error") + } + log.Println(response) +} + +func main() { + client := ClientSetup("localhost", "9999") + client.GetHeaders() + defer client.Close() } diff --git a/syncing/downloader/server/server.go b/syncing/downloader/server/server.go index 811225117..f788b8f26 100644 --- a/syncing/downloader/server/server.go +++ b/syncing/downloader/server/server.go @@ -2,36 +2,51 @@ package main import ( "context" + "fmt" "log" "net" "google.golang.org/grpc" + "github.com/harmony-one/harmony/node" pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) -type downloaderServer struct { +// Server ... +type Server struct { + node *node.Node } -// GetFeature returns the feature at the given point. -func (s *downloaderServer) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { +// Query returns the feature at the given point. +func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { response := pb.DownloaderResponse{} response.Payload = [][]byte{{0, 0, 2}} return &response, nil } -func newServer() *downloaderServer { - s := &downloaderServer{} - return s -} - -func main() { - lis, err := net.Listen("tcp", "localhost:9999") +// Start ... +func (s *Server) Start(ip, port string) error { + // if s.node == nil { + // return ErrDownloaderWithNoNode + // } + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", ip, port)) if err != nil { log.Fatalf("failed to listen: %v", err) } var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) - pb.RegisterDownloaderServer(grpcServer, newServer()) + pb.RegisterDownloaderServer(grpcServer, s) grpcServer.Serve(lis) + return nil +} + +// NewServer ... +func NewServer(node *node.Node) *Server { + s := &Server{node: node} + return s +} + +func main() { + s := NewServer(nil) + s.Start("localhost", "9999") } From 7e575ba4f10a7d2d706406abb9049d3828911d1f Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 16:09:26 -0800 Subject: [PATCH 08/12] add very simple test --- syncing/downloader/client/client.go | 6 +++--- syncing/downloader/server/server.go | 8 ++++---- syncing/downloader/server_test.go | 32 ++++++++++++++--------------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/syncing/downloader/client/client.go b/syncing/downloader/client/client.go index d2f06d75a..61b1c7730 100644 --- a/syncing/downloader/client/client.go +++ b/syncing/downloader/client/client.go @@ -1,4 +1,4 @@ -package main +package downloader import ( "context" @@ -49,7 +49,7 @@ func (client *Client) Close() { } // GetHeaders ... -func (client *Client) GetHeaders() { +func (client *Client) GetHeaders() []byte { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} @@ -57,7 +57,7 @@ func (client *Client) GetHeaders() { if err != nil { log.Fatalf("Error") } - log.Println(response) + return response.Payload[0] } func main() { diff --git a/syncing/downloader/server/server.go b/syncing/downloader/server/server.go index f788b8f26..dc139af13 100644 --- a/syncing/downloader/server/server.go +++ b/syncing/downloader/server/server.go @@ -1,4 +1,4 @@ -package main +package downloader import ( "context" @@ -25,7 +25,7 @@ func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb. } // Start ... -func (s *Server) Start(ip, port string) error { +func (s *Server) Start(ip, port string) (*grpc.Server, error) { // if s.node == nil { // return ErrDownloaderWithNoNode // } @@ -36,8 +36,8 @@ func (s *Server) Start(ip, port string) error { var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) pb.RegisterDownloaderServer(grpcServer, s) - grpcServer.Serve(lis) - return nil + go grpcServer.Serve(lis) + return grpcServer, nil } // NewServer ... diff --git a/syncing/downloader/server_test.go b/syncing/downloader/server_test.go index 1af1509a7..83a74ae5b 100644 --- a/syncing/downloader/server_test.go +++ b/syncing/downloader/server_test.go @@ -1,12 +1,11 @@ package downloader import ( - "fmt" "testing" - "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/crypto/pki" - "github.com/harmony-one/harmony/node" + client "github.com/harmony-one/harmony/syncing/downloader/client" + server "github.com/harmony-one/harmony/syncing/downloader/server" ) const ( @@ -22,19 +21,20 @@ var ( TestAddressTwo = pki.GetAddressFromInt(PriIntTwo) ) -func setupServer() *Server { - bc := blockchain.CreateBlockchainWithMoreBlocks([][20]byte{TestAddressOne, TestAddressTwo}, 0) - node := &node.Node{} - node.SetBlockchain(bc) - server := NewServer(node) - fmt.Println("minh ", len(bc.Blocks)) - return server -} - func TestGetBlockHashes(t *testing.T) { - server := setupServer() - server.Start(serverIP) + s := server.NewServer(nil) + grcpServer, err := s.Start(serverIP, serverPort) + if err != nil { + t.Error(err) + } + defer grcpServer.Stop() + + client := client.ClientSetup(serverIP, serverPort) + payload := client.GetHeaders() + if payload[2] != 2 { + t.Error("minh") + } - // client := ClientSetUp(serverIP, serverPort) - // client.GetHeaders() + defer client.Close() + client.GetHeaders() } From 8369b89cfd519ff849a95ed874c0cf613bf68435 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 16:12:38 -0800 Subject: [PATCH 09/12] clean-up client/server. Move client/server to parent folder --- syncing/downloader/client.go | 47 ++++++++++++-------- syncing/downloader/client/client.go | 67 ----------------------------- syncing/downloader/server.go | 26 +++++------ syncing/downloader/server/server.go | 52 ---------------------- syncing/downloader/server_test.go | 6 +-- 5 files changed, 43 insertions(+), 155 deletions(-) delete mode 100644 syncing/downloader/client/client.go delete mode 100644 syncing/downloader/server/server.go diff --git a/syncing/downloader/client.go b/syncing/downloader/client.go index 5b85898d2..575356143 100644 --- a/syncing/downloader/client.go +++ b/syncing/downloader/client.go @@ -10,6 +10,18 @@ import ( "google.golang.org/grpc" ) +// PrintResult ... +func PrintResult(client *Client) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} + response, err := client.dlClient.Query(ctx, request) + if err != nil { + log.Fatalf("Error") + } + log.Println(response) +} + // Client ... type Client struct { dlClient pb.DownloaderClient @@ -17,17 +29,18 @@ type Client struct { conn *grpc.ClientConn } -// GetHeaders ... -func (client *Client) GetHeaders() { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} - - result, err := client.dlClient.Query(ctx, request) +// ClientSetup ... +func ClientSetup(ip, port string) *Client { + client := Client{} + client.opts = append(client.opts, grpc.WithInsecure()) + var err error + client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) if err != nil { - log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err) + log.Fatalf("fail to dial: %v", err) } - log.Println(result) + + client.dlClient = pb.NewDownloaderClient(client.conn) + return &client } // Close ... @@ -35,14 +48,14 @@ func (client *Client) Close() { client.conn.Close() } -// ClientSetUp ... -func ClientSetUp(ip, port string) *Client { - client := Client{} - var err error - client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) +// GetHeaders ... +func (client *Client) GetHeaders() []byte { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} + response, err := client.dlClient.Query(ctx, request) if err != nil { - log.Fatalf("fail to dial: %v", err) + log.Fatalf("Error") } - client.dlClient = pb.NewDownloaderClient(client.conn) - return &client + return response.Payload[0] } diff --git a/syncing/downloader/client/client.go b/syncing/downloader/client/client.go deleted file mode 100644 index 61b1c7730..000000000 --- a/syncing/downloader/client/client.go +++ /dev/null @@ -1,67 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "log" - "time" - - pb "github.com/harmony-one/harmony/syncing/downloader/proto" - "google.golang.org/grpc" -) - -// PrintResult ... -func PrintResult(client *Client) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} - response, err := client.dlClient.Query(ctx, request) - if err != nil { - log.Fatalf("Error") - } - log.Println(response) -} - -// Client ... -type Client struct { - dlClient pb.DownloaderClient - opts []grpc.DialOption - conn *grpc.ClientConn -} - -// ClientSetup ... -func ClientSetup(ip, port string) *Client { - client := Client{} - client.opts = append(client.opts, grpc.WithInsecure()) - var err error - client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) - if err != nil { - log.Fatalf("fail to dial: %v", err) - } - - client.dlClient = pb.NewDownloaderClient(client.conn) - return &client -} - -// Close ... -func (client *Client) Close() { - client.conn.Close() -} - -// GetHeaders ... -func (client *Client) GetHeaders() []byte { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} - response, err := client.dlClient.Query(ctx, request) - if err != nil { - log.Fatalf("Error") - } - return response.Payload[0] -} - -func main() { - client := ClientSetup("localhost", "9999") - client.GetHeaders() - defer client.Close() -} diff --git a/syncing/downloader/server.go b/syncing/downloader/server.go index 532371a99..7a18673fd 100644 --- a/syncing/downloader/server.go +++ b/syncing/downloader/server.go @@ -3,11 +3,13 @@ package downloader import ( "context" "fmt" + "log" "net" + "google.golang.org/grpc" + "github.com/harmony-one/harmony/node" pb "github.com/harmony-one/harmony/syncing/downloader/proto" - "google.golang.org/grpc" ) // Server ... @@ -17,28 +19,22 @@ type Server struct { // Query returns the feature at the given point. func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { - res := &pb.DownloaderResponse{} - if request.Type == pb.DownloaderRequest_HEADER { - } else { - res.Payload = append(res.Payload, []byte{1}) - } - return res, nil + response := pb.DownloaderResponse{} + response.Payload = [][]byte{{0, 0, 2}} + return &response, nil } // Start ... -func (s *Server) Start(port string) error { - if s.node == nil { - return ErrDownloaderWithNoNode - } - lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", port)) +func (s *Server) Start(ip, port string) (*grpc.Server, error) { + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", ip, port)) if err != nil { - + log.Fatalf("failed to listen: %v", err) } var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) pb.RegisterDownloaderServer(grpcServer, s) - grpcServer.Serve(lis) - return nil + go grpcServer.Serve(lis) + return grpcServer, nil } // NewServer ... diff --git a/syncing/downloader/server/server.go b/syncing/downloader/server/server.go deleted file mode 100644 index dc139af13..000000000 --- a/syncing/downloader/server/server.go +++ /dev/null @@ -1,52 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "log" - "net" - - "google.golang.org/grpc" - - "github.com/harmony-one/harmony/node" - pb "github.com/harmony-one/harmony/syncing/downloader/proto" -) - -// Server ... -type Server struct { - node *node.Node -} - -// Query returns the feature at the given point. -func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { - response := pb.DownloaderResponse{} - response.Payload = [][]byte{{0, 0, 2}} - return &response, nil -} - -// Start ... -func (s *Server) Start(ip, port string) (*grpc.Server, error) { - // if s.node == nil { - // return ErrDownloaderWithNoNode - // } - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", ip, port)) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - var opts []grpc.ServerOption - grpcServer := grpc.NewServer(opts...) - pb.RegisterDownloaderServer(grpcServer, s) - go grpcServer.Serve(lis) - return grpcServer, nil -} - -// NewServer ... -func NewServer(node *node.Node) *Server { - s := &Server{node: node} - return s -} - -func main() { - s := NewServer(nil) - s.Start("localhost", "9999") -} diff --git a/syncing/downloader/server_test.go b/syncing/downloader/server_test.go index 83a74ae5b..52d222469 100644 --- a/syncing/downloader/server_test.go +++ b/syncing/downloader/server_test.go @@ -4,8 +4,6 @@ import ( "testing" "github.com/harmony-one/harmony/crypto/pki" - client "github.com/harmony-one/harmony/syncing/downloader/client" - server "github.com/harmony-one/harmony/syncing/downloader/server" ) const ( @@ -22,14 +20,14 @@ var ( ) func TestGetBlockHashes(t *testing.T) { - s := server.NewServer(nil) + s := NewServer(nil) grcpServer, err := s.Start(serverIP, serverPort) if err != nil { t.Error(err) } defer grcpServer.Stop() - client := client.ClientSetup(serverIP, serverPort) + client := ClientSetup(serverIP, serverPort) payload := client.GetHeaders() if payload[2] != 2 { t.Error("minh") From 48b40bc88c2642806ee417173065dcc0c422ebca Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 22:10:44 -0800 Subject: [PATCH 10/12] add TestGetBlockHashes and refactor related code --- node/node.go | 21 ++++---------- syncing/downloader/client.go | 6 ++-- syncing/downloader/interface.go | 12 ++++++++ syncing/downloader/server.go | 17 ++++++----- syncing/downloader/server_test.go | 48 +++++++++++++++++++++++++++---- 5 files changed, 74 insertions(+), 30 deletions(-) create mode 100644 syncing/downloader/interface.go diff --git a/node/node.go b/node/node.go index e26450151..cae0b69b5 100644 --- a/node/node.go +++ b/node/node.go @@ -5,17 +5,18 @@ import ( "crypto/ecdsa" "encoding/gob" "fmt" + "math/big" + "net" + "strings" + "sync" + "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/node/worker" - "math/big" - "net" - "strings" - "sync" - "time" "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/client" @@ -80,16 +81,6 @@ type Node struct { testBankKey *ecdsa.PrivateKey } -// GetBlockHashes used for state download. -func (node *Node) GetBlockHashes() [][32]byte { - return node.blockchain.GetBlockHashes() -} - -// SetBlockchain is used for testing -func (node *Node) SetBlockchain(bc *blockchain.Blockchain) { - node.blockchain = bc -} - // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client func (node *Node) addCrossTxsToReturn(crossTxs []*blockchain.CrossShardTxAndProof) { node.crossTxToReturnMutex.Lock() diff --git a/syncing/downloader/client.go b/syncing/downloader/client.go index 575356143..d4418182c 100644 --- a/syncing/downloader/client.go +++ b/syncing/downloader/client.go @@ -48,8 +48,8 @@ func (client *Client) Close() { client.conn.Close() } -// GetHeaders ... -func (client *Client) GetHeaders() []byte { +// GetBlockHashes ... +func (client *Client) GetBlockHashes() *pb.DownloaderResponse { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER} @@ -57,5 +57,5 @@ func (client *Client) GetHeaders() []byte { if err != nil { log.Fatalf("Error") } - return response.Payload[0] + return response } diff --git a/syncing/downloader/interface.go b/syncing/downloader/interface.go new file mode 100644 index 000000000..3acde71bf --- /dev/null +++ b/syncing/downloader/interface.go @@ -0,0 +1,12 @@ +package downloader + +import ( + pb "github.com/harmony-one/harmony/syncing/downloader/proto" +) + +// DownloadInterface ... +type DownloadInterface interface { + // Syncing blockchain from other peers. + // The returned channel is the signal of syncing finish. + CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) +} diff --git a/syncing/downloader/server.go b/syncing/downloader/server.go index 7a18673fd..c3d0a5f70 100644 --- a/syncing/downloader/server.go +++ b/syncing/downloader/server.go @@ -8,20 +8,23 @@ import ( "google.golang.org/grpc" - "github.com/harmony-one/harmony/node" pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) // Server ... type Server struct { - node *node.Node + downloadInterface DownloadInterface } // Query returns the feature at the given point. func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { - response := pb.DownloaderResponse{} - response.Payload = [][]byte{{0, 0, 2}} - return &response, nil + response, err := s.downloadInterface.CalculateResponse(request) + if err != nil { + return nil, err + } + // response := pb.DownloaderResponse{} + // response.Payload = [][]byte{{0, 0, 2}} + return response, nil } // Start ... @@ -38,7 +41,7 @@ func (s *Server) Start(ip, port string) (*grpc.Server, error) { } // NewServer ... -func NewServer(node *node.Node) *Server { - s := &Server{node: node} +func NewServer(dlInterface DownloadInterface) *Server { + s := &Server{downloadInterface: dlInterface} return s } diff --git a/syncing/downloader/server_test.go b/syncing/downloader/server_test.go index 52d222469..0e80af69f 100644 --- a/syncing/downloader/server_test.go +++ b/syncing/downloader/server_test.go @@ -1,9 +1,13 @@ package downloader import ( + "fmt" + "reflect" "testing" + bc "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/crypto/pki" + pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) const ( @@ -17,10 +21,45 @@ var ( PriIntTwo = 222 TestAddressOne = pki.GetAddressFromInt(PriIntOne) TestAddressTwo = pki.GetAddressFromInt(PriIntTwo) + ShardID = uint32(0) ) +type FakeNode struct { + bc *bc.Blockchain +} + +// GetBlockHashes used for state download. +func (node *FakeNode) GetBlockHashes() [][]byte { + res := [][]byte{} + for _, block := range node.bc.Blocks { + res = append(res, block.Hash[:]) + } + return res +} + +// SetBlockchain is used for testing +func (node *FakeNode) Init() { + addresses := [][20]byte{TestAddressOne, TestAddressTwo} + node.bc = bc.CreateBlockchainWithMoreBlocks(addresses, ShardID) +} + +func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { + response := &pb.DownloaderResponse{} + if request.Type == pb.DownloaderRequest_HEADER { + fmt.Println("minh ", len(node.bc.Blocks)) + for _, block := range node.bc.Blocks { + response.Payload = append(response.Payload, block.Hash[:]) + } + } else { + + } + return response, nil +} + func TestGetBlockHashes(t *testing.T) { - s := NewServer(nil) + fakeNode := &FakeNode{} + fakeNode.Init() + s := NewServer(fakeNode) grcpServer, err := s.Start(serverIP, serverPort) if err != nil { t.Error(err) @@ -28,11 +67,10 @@ func TestGetBlockHashes(t *testing.T) { defer grcpServer.Stop() client := ClientSetup(serverIP, serverPort) - payload := client.GetHeaders() - if payload[2] != 2 { - t.Error("minh") + response := client.GetBlockHashes() + if !reflect.DeepEqual(response.Payload, fakeNode.GetBlockHashes()) { + t.Error("not equal") } defer client.Close() - client.GetHeaders() } From f6c514f802effbc7dd9955e38abd45c7e0ea91b8 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Sun, 25 Nov 2018 23:29:34 -0800 Subject: [PATCH 11/12] add test for GetBlocks --- syncing/downloader/client.go | 14 ++++++++++ syncing/downloader/server_test.go | 43 +++++++++++++++++++++++++++---- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/syncing/downloader/client.go b/syncing/downloader/client.go index d4418182c..976452af4 100644 --- a/syncing/downloader/client.go +++ b/syncing/downloader/client.go @@ -59,3 +59,17 @@ func (client *Client) GetBlockHashes() *pb.DownloaderResponse { } return response } + +// GetBlocks ... +func (client *Client) GetBlocks(heights []int32) *pb.DownloaderResponse { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_BLOCK} + request.Height = make([]int32, len(heights)) + copy(request.Height, heights) + response, err := client.dlClient.Query(ctx, request) + if err != nil { + log.Fatalf("Error") + } + return response +} diff --git a/syncing/downloader/server_test.go b/syncing/downloader/server_test.go index 0e80af69f..098abe9c8 100644 --- a/syncing/downloader/server_test.go +++ b/syncing/downloader/server_test.go @@ -1,4 +1,4 @@ -package downloader +package downloader_test import ( "fmt" @@ -7,6 +7,7 @@ import ( bc "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/syncing/downloader" pb "github.com/harmony-one/harmony/syncing/downloader/proto" ) @@ -37,6 +38,15 @@ func (node *FakeNode) GetBlockHashes() [][]byte { return res } +// GetBlocks used for state download. +func (node *FakeNode) GetBlocks() [][]byte { + res := [][]byte{} + for _, block := range node.bc.Blocks { + res = append(res, block.Serialize()) + } + return res +} + // SetBlockchain is used for testing func (node *FakeNode) Init() { addresses := [][20]byte{TestAddressOne, TestAddressTwo} @@ -46,12 +56,13 @@ func (node *FakeNode) Init() { func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { response := &pb.DownloaderResponse{} if request.Type == pb.DownloaderRequest_HEADER { - fmt.Println("minh ", len(node.bc.Blocks)) for _, block := range node.bc.Blocks { response.Payload = append(response.Payload, block.Hash[:]) } } else { - + for _, id := range request.Height { + response.Payload = append(response.Payload, node.bc.Blocks[id].Serialize()) + } } return response, nil } @@ -59,18 +70,40 @@ func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.Down func TestGetBlockHashes(t *testing.T) { fakeNode := &FakeNode{} fakeNode.Init() - s := NewServer(fakeNode) + s := downloader.NewServer(fakeNode) grcpServer, err := s.Start(serverIP, serverPort) if err != nil { t.Error(err) } defer grcpServer.Stop() - client := ClientSetup(serverIP, serverPort) + client := downloader.ClientSetup(serverIP, serverPort) + defer client.Close() response := client.GetBlockHashes() if !reflect.DeepEqual(response.Payload, fakeNode.GetBlockHashes()) { t.Error("not equal") } +} + +func TestGetBlocks(t *testing.T) { + fakeNode := &FakeNode{} + fakeNode.Init() + s := downloader.NewServer(fakeNode) + grcpServer, err := s.Start(serverIP, serverPort) + if err != nil { + t.Error(err) + } + defer grcpServer.Stop() + client := downloader.ClientSetup(serverIP, serverPort) defer client.Close() + response := client.GetBlockHashes() + if !reflect.DeepEqual(response.Payload, fakeNode.GetBlockHashes()) { + t.Error("not equal") + } + response = client.GetBlocks([]int32{0, 1}) + fmt.Println(len(response.Payload)) + if !reflect.DeepEqual(response.Payload, fakeNode.GetBlocks()) { + t.Error("not equal") + } } From 4bff45145fea94a5edd46bc28270f716750e0418 Mon Sep 17 00:00:00 2001 From: Minh Doan Date: Mon, 26 Nov 2018 10:28:06 -0800 Subject: [PATCH 12/12] update syncing implementation with new downloader api --- syncing/downloader/client.go | 1 + syncing/errors.go | 8 +++ syncing/syncing.go | 111 +++++++++++++++++------------------ 3 files changed, 64 insertions(+), 56 deletions(-) create mode 100644 syncing/errors.go diff --git a/syncing/downloader/client.go b/syncing/downloader/client.go index 976452af4..12b469c26 100644 --- a/syncing/downloader/client.go +++ b/syncing/downloader/client.go @@ -37,6 +37,7 @@ func ClientSetup(ip, port string) *Client { client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) if err != nil { log.Fatalf("fail to dial: %v", err) + return nil } client.dlClient = pb.NewDownloaderClient(client.conn) diff --git a/syncing/errors.go b/syncing/errors.go new file mode 100644 index 000000000..ada2f731d --- /dev/null +++ b/syncing/errors.go @@ -0,0 +1,8 @@ +package syncing + +import "errors" + +// Errors ... +var ( + ErrSyncPeerConfigClientNotReady = errors.New("client is not ready") +) diff --git a/syncing/syncing.go b/syncing/syncing.go index b56d40d10..ae04a57f0 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -1,8 +1,6 @@ package syncing import ( - "bufio" - "net" "reflect" "sync" "time" @@ -10,23 +8,21 @@ import ( "github.com/Workiva/go-datastructures/queue" "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/p2p" - proto_node "github.com/harmony-one/harmony/proto/node" + "github.com/harmony-one/harmony/syncing/downloader" ) // SyncPeerConfig is peer config to sync. type SyncPeerConfig struct { - peer p2p.Peer - conn net.Conn - w *bufio.Writer - err error - trusted bool - blockHashes [][32]byte + ip string + port string + client *downloader.Client + blockHashes [][]byte } // SyncBlockTask is the task struct to sync a specific block. type SyncBlockTask struct { index int - blockHash [32]byte + blockHash []byte } // SyncConfig contains an array of SyncPeerConfig. @@ -48,6 +44,29 @@ type StateSync struct { stateSyncTaskQueue *queue.Queue } +// GetBlockHashes ... +func (peerConfig *SyncPeerConfig) GetBlockHashes() error { + if peerConfig.client == nil { + return ErrSyncPeerConfigClientNotReady + } + response := peerConfig.client.GetBlockHashes() + peerConfig.blockHashes = make([][]byte, len(response.Payload)) + for i := range response.Payload { + peerConfig.blockHashes[i] = make([]byte, len(response.Payload[i])) + copy(peerConfig.blockHashes[i], response.Payload[i]) + } + return nil +} + +// GetBlocks ... +func (peerConfig *SyncPeerConfig) GetBlocks(heights []int32) ([][]byte, error) { + if peerConfig.client == nil { + return nil, ErrSyncPeerConfigClientNotReady + } + response := peerConfig.client.GetBlocks(heights) + return response.Payload, nil +} + // ProcessStateSyncFromPeers used to do state sync. func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { // TODO: Validate peers. @@ -71,8 +90,10 @@ func (ss *StateSync) createSyncConfig(peers []p2p.Peer) { peers: make([]SyncPeerConfig, ss.peerNumber), } for id := range ss.syncConfig.peers { - ss.syncConfig.peers[id].peer = peers[id] - ss.syncConfig.peers[id].trusted = false + ss.syncConfig.peers[id] = SyncPeerConfig{ + ip: peers[id].Ip, + port: peers[id].Port, + } } } @@ -83,29 +104,27 @@ func (ss *StateSync) makeConnectionToPeers() { for _, synPeerConfig := range ss.syncConfig.peers { go func(peerConfig *SyncPeerConfig) { defer wg.Done() - peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port) + peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) }(&synPeerConfig) } wg.Wait() ss.activePeerNumber = 0 for _, configPeer := range ss.syncConfig.peers { - if configPeer.err == nil { + if configPeer.client != nil { ss.activePeerNumber++ - configPeer.w = bufio.NewWriter(configPeer.conn) - configPeer.trusted = true } } } // areConsensusHashesEqual chesk if all consensus hashes are equal. func (ss *StateSync) areConsensusHashesEqual() bool { - var fixedPeer *SyncPeerConfig + var firstPeer *SyncPeerConfig for _, configPeer := range ss.syncConfig.peers { - if configPeer.trusted { - if fixedPeer == nil { - fixedPeer = &configPeer + if configPeer.client != nil { + if firstPeer == nil { + firstPeer = &configPeer } - if !reflect.DeepEqual(configPeer.blockHashes, fixedPeer) { + if !reflect.DeepEqual(configPeer.blockHashes, firstPeer.blockHashes) { return false } } @@ -120,29 +139,15 @@ func (ss *StateSync) getConsensusHashes() { wg.Add(ss.activePeerNumber) for _, configPeer := range ss.syncConfig.peers { - if configPeer.err != nil { + if configPeer.client == nil { continue } go func(peerConfig *SyncPeerConfig) { defer wg.Done() - msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetLastBlockHashes, [32]byte{}) - peerConfig.w.Write(msg) - peerConfig.w.Flush() - var content []byte - content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) - if peerConfig.err != nil { - peerConfig.trusted = false - return - } - var blockchainSyncMessage *proto_node.BlockchainSyncMessage - blockchainSyncMessage, peerConfig.err = proto_node.DeserializeBlockchainSyncMessage(content) - if peerConfig.err != nil { - peerConfig.trusted = false - return - } - peerConfig.blockHashes = blockchainSyncMessage.BlockHashes + peerConfig.client.GetBlockHashes() }(&configPeer) } + wg.Wait() if ss.areConsensusHashesEqual() { break } @@ -153,11 +158,12 @@ func (ss *StateSync) getConsensusHashes() { func (ss *StateSync) generateStateSyncTaskQueue() { ss.stateSyncTaskQueue = queue.New(0) for _, configPeer := range ss.syncConfig.peers { - if configPeer.trusted { + if configPeer.client != nil { for id, blockHash := range configPeer.blockHashes { ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) } ss.blockHeight = len(configPeer.blockHashes) + break } } } @@ -169,7 +175,7 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { var wg sync.WaitGroup wg.Add(int(ss.stateSyncTaskQueue.Len())) for _, configPeer := range ss.syncConfig.peers { - if configPeer.err != nil { + if configPeer.client == nil { continue } go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) { @@ -180,18 +186,15 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { break } syncTask := task[0].(SyncBlockTask) - msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetBlock, syncTask.blockHash) - peerConfig.w.Write(msg) - peerConfig.w.Flush() - var content []byte - content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) - if peerConfig.err != nil { - peerConfig.trusted = false - return - } - block, err := blockchain.DeserializeBlock(content) - if err == nil { - bc.Blocks[syncTask.index] = block + for { + id := syncTask.index + heights := []int32{int32(id)} + payload, err := peerConfig.GetBlocks(heights) + if err != nil { + // Write log + } else { + bc.Blocks[id], err = blockchain.DeserializeBlock(payload[0]) + } } } }(&configPeer, ss.stateSyncTaskQueue, bc) @@ -215,7 +218,3 @@ func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) ss.downloadBlocks(bc) } - -func getConsensus(syncConfig *SyncConfig) bool { - return true -}