Merge branch 'master' of github.com:harmony-one/harmony into rj_branch

pull/99/head
Rongjian Lan 6 years ago
commit af14350fa1
  1. 12
      blockchain/blockchain.go
  2. 2
      client/client.go
  3. 11
      node/node.go
  4. 22
      node/node_handler.go
  5. 8
      proto/client/client.go
  6. 8
      proto/common.go
  7. 36
      proto/node/node.go
  8. 4
      proto/node/pingpong.go
  9. 76
      syncing/downloader/client.go
  10. 8
      syncing/downloader/errors.go
  11. 12
      syncing/downloader/interface.go
  12. 240
      syncing/downloader/proto/downloader.pb.go
  13. 29
      syncing/downloader/proto/downloader.proto
  14. 47
      syncing/downloader/server.go
  15. 109
      syncing/downloader/server_test.go
  16. 8
      syncing/errors.go
  17. 109
      syncing/syncing.go

@ -233,6 +233,18 @@ func CreateBlockchain(address [20]byte, shardID uint32) *Blockchain {
return &bc return &bc
} }
// CreateBlockchainWithMoreBlocks ...
func CreateBlockchainWithMoreBlocks(addresses [][20]byte, shardID uint32) *Blockchain {
blocks := make([]*Block, 0)
for _, address := range addresses {
cbtx := NewCoinbaseTX(address, genesisCoinbaseData, shardID)
blocks = append(blocks, NewGenesisBlock(cbtx, shardID))
}
bc := Blockchain{blocks}
return &bc
}
// CreateStateBlock creates state block based on the utxos. // CreateStateBlock creates state block based on the utxos.
func (bc *Blockchain) CreateStateBlock(utxoPool *UTXOPool) *Block { func (bc *Blockchain) CreateStateBlock(utxoPool *UTXOPool) *Block {
var numBlocks int32 var numBlocks int32

@ -25,7 +25,7 @@ type Client struct {
log log.Logger // Log utility log log.Logger // Log utility
} }
// The message handler for CLIENT/Transaction messages. // The message handler for Client/Transaction messages.
func (client *Client) TransactionMessageHandler(msgPayload []byte) { func (client *Client) TransactionMessageHandler(msgPayload []byte) {
messageType := client_proto.TransactionMessageType(msgPayload[0]) messageType := client_proto.TransactionMessageType(msgPayload[0])
switch messageType { switch messageType {

@ -5,17 +5,18 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"math/big"
"net"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
"math/big"
"net"
"strings"
"sync"
"time"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/client" "github.com/harmony-one/harmony/client"

@ -29,7 +29,7 @@ const (
// MinNumberOfTransactionsPerBlock is the min number of transaction per a block. // MinNumberOfTransactionsPerBlock is the min number of transaction per a block.
MinNumberOfTransactionsPerBlock = 6000 MinNumberOfTransactionsPerBlock = 6000
// MaxNumberOfTransactionsPerBlock is the max number of transaction per a block. // MaxNumberOfTransactionsPerBlock is the max number of transaction per a block.
MaxNumberOfTransactionsPerBlock = 20000 MaxNumberOfTransactionsPerBlock = 8000
// NumBlocksBeforeStateBlock is the number of blocks allowed before generating state block // NumBlocksBeforeStateBlock is the number of blocks allowed before generating state block
NumBlocksBeforeStateBlock = 1000 NumBlocksBeforeStateBlock = 1000
) )
@ -103,14 +103,14 @@ func (node *Node) NodeHandler(conn net.Conn) {
consensusObj.ProcessMessageValidator(msgPayload) consensusObj.ProcessMessageValidator(msgPayload)
} }
} }
case proto.NODE: case proto.Node:
actionType := proto_node.NodeMessageType(msgType) actionType := proto_node.NodeMessageType(msgType)
switch actionType { switch actionType {
case proto_node.Transaction: case proto_node.Transaction:
node.log.Info("NET: received message: Node/Transaction") node.log.Info("NET: received message: Node/Transaction")
node.transactionMessageHandler(msgPayload) node.transactionMessageHandler(msgPayload)
case proto_node.BLOCK: case proto_node.Block:
node.log.Info("NET: received message: Node/BLOCK") node.log.Info("NET: received message: Node/Block")
blockMsgType := proto_node.BlockMessageType(msgPayload[0]) blockMsgType := proto_node.BlockMessageType(msgPayload[0])
switch blockMsgType { switch blockMsgType {
case proto_node.Sync: case proto_node.Sync:
@ -124,8 +124,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
case proto_node.BlockchainSync: case proto_node.BlockchainSync:
node.log.Info("NET: received message: Node/BlockchainSync") node.log.Info("NET: received message: Node/BlockchainSync")
node.handleBlockchainSync(msgPayload, conn) node.handleBlockchainSync(msgPayload, conn)
case proto_node.CLIENT: case proto_node.Client:
node.log.Info("NET: received message: Node/CLIENT") node.log.Info("NET: received message: Node/Client")
clientMsgType := proto_node.ClientMessageType(msgPayload[0]) clientMsgType := proto_node.ClientMessageType(msgPayload[0])
switch clientMsgType { switch clientMsgType {
case proto_node.LookupUtxo: case proto_node.LookupUtxo:
@ -138,8 +138,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID))
} }
case proto_node.CONTROL: case proto_node.Control:
node.log.Info("NET: received message: Node/CONTROL") node.log.Info("NET: received message: Node/Control")
controlType := msgPayload[0] controlType := msgPayload[0]
if proto_node.ControlMessageType(controlType) == proto_node.STOP { if proto_node.ControlMessageType(controlType) == proto_node.STOP {
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain())
@ -191,9 +191,9 @@ func (node *Node) NodeHandler(conn net.Conn) {
node.log.Info("NET: received message: PONG") node.log.Info("NET: received message: PONG")
node.pongMessageHandler(msgPayload) node.pongMessageHandler(msgPayload)
} }
case proto.CLIENT: case proto.Client:
actionType := client.ClientMessageType(msgType) actionType := client.ClientMessageType(msgType)
node.log.Info("NET: received message: CLIENT/Transaction") node.log.Info("NET: received message: Client/Transaction")
switch actionType { switch actionType {
case client.Transaction: case client.Transaction:
if node.Client != nil { if node.Client != nil {
@ -235,7 +235,7 @@ FOR_LOOP:
} }
msgCategory, _ := proto.GetMessageCategory(content) msgCategory, _ := proto.GetMessageCategory(content)
if err != nil || msgCategory != proto.NODE { if err != nil || msgCategory != proto.Node {
node.log.Error("Failed in reading message category from syncing node", err) node.log.Error("Failed in reading message category from syncing node", err)
return return
} }

@ -8,7 +8,7 @@ import (
"github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto"
) )
// The specific types of message under CLIENT category // The specific types of message under Client category
type ClientMessageType byte type ClientMessageType byte
const ( const (
@ -16,7 +16,7 @@ const (
// TODO: add more types // TODO: add more types
) )
// The types of messages used for CLIENT/Transaction // The types of messages used for Client/Transaction
type TransactionMessageType int type TransactionMessageType int
const ( const (
@ -31,7 +31,7 @@ type FetchUtxoResponseMessage struct {
// [leader] Constructs the proof of accept or reject message that will be sent to client // [leader] Constructs the proof of accept or reject message that will be sent to client
func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte { func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.CLIENT)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(ProofOfLock)) byteBuffer.WriteByte(byte(ProofOfLock))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
@ -42,7 +42,7 @@ func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof
// Constructs the response message to fetch utxo message // Constructs the response message to fetch utxo message
func ConstructFetchUtxoResponseMessage(utxoMap *blockchain.UtxoMap, shardID uint32) []byte { func ConstructFetchUtxoResponseMessage(utxoMap *blockchain.UtxoMap, shardID uint32) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.CLIENT)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(UtxoResponse)) byteBuffer.WriteByte(byte(UtxoResponse))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)

@ -10,12 +10,12 @@ The message structure of any message in Harmony network
---- content start ----- ---- content start -----
1 byte - message category 1 byte - message category
0x00: Consensus 0x00: Consensus
0x01: NODE... 0x01: Node...
1 byte - message type 1 byte - message type
- for Consensus category - for Consensus category
0x00: consensus 0x00: consensus
0x01: sharding ... 0x01: sharding ...
- for NODE category - for Node category
0x00: transaction ... 0x00: transaction ...
n - 2 bytes - actual message payload n - 2 bytes - actual message payload
---- content end ----- ---- content end -----
@ -27,8 +27,8 @@ type MessageCategory byte
//Consensus and other message categories //Consensus and other message categories
const ( const (
Consensus MessageCategory = iota Consensus MessageCategory = iota
NODE Node
CLIENT Client
Identity Identity
// TODO: add more types // TODO: add more types
) )

@ -10,7 +10,7 @@ import (
"github.com/harmony-one/harmony/proto" "github.com/harmony-one/harmony/proto"
) )
// NodeMessageType is to indicate the specific type of message under NODE category // NodeMessageType is to indicate the specific type of message under Node category
type NodeMessageType byte type NodeMessageType byte
const ( const (
@ -19,9 +19,9 @@ const (
const ( const (
Transaction NodeMessageType = iota Transaction NodeMessageType = iota
BLOCK Block
CLIENT Client
CONTROL Control
BlockchainSync BlockchainSync
PING // node send ip/pki to register with leader PING // node send ip/pki to register with leader
PONG // node broadcast pubK PONG // node broadcast pubK
@ -43,7 +43,7 @@ const (
GetBlock GetBlock
) )
// TransactionMessageType representa the types of messages used for NODE/Transaction // TransactionMessageType representa the types of messages used for Node/Transaction
type TransactionMessageType int type TransactionMessageType int
const ( const (
@ -52,21 +52,21 @@ const (
Unlock Unlock
) )
// BlockMessageType represents the types of messages used for NODE/BLOCK // BlockMessageType represents the types of messages used for Node/Block
type BlockMessageType int type BlockMessageType int
const ( const (
Sync BlockMessageType = iota Sync BlockMessageType = iota
) )
// The types of messages used for NODE/BLOCK // The types of messages used for Node/Block
type ClientMessageType int type ClientMessageType int
const ( const (
LookupUtxo ClientMessageType = iota LookupUtxo ClientMessageType = iota
) )
// The types of messages used for NODE/CONTROL // The types of messages used for Node/Control
type ControlMessageType int type ControlMessageType int
// ControlMessageType // ControlMessageType
@ -105,7 +105,7 @@ func DeserializeBlockchainSyncMessage(d []byte) (*BlockchainSyncMessage, error)
// ConstructUnlockToCommitOrAbortMessage constructs the unlock to commit or abort message that will be sent to leaders. // ConstructUnlockToCommitOrAbortMessage constructs the unlock to commit or abort message that will be sent to leaders.
// This is for client. // This is for client.
func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte { func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(Unlock)) byteBuffer.WriteByte(byte(Unlock))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
@ -116,8 +116,8 @@ func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transactio
// ConstructFetchUtxoMessage constructs the fetch utxo message that will be sent to Harmony network. // ConstructFetchUtxoMessage constructs the fetch utxo message that will be sent to Harmony network.
// this is for client. // this is for client.
func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte { func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(CLIENT)) byteBuffer.WriteByte(byte(Client))
byteBuffer.WriteByte(byte(LookupUtxo)) byteBuffer.WriteByte(byte(LookupUtxo))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
@ -128,7 +128,7 @@ func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte {
// ConstructTransactionListMessage constructs serialized transactions // ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte { func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(Send)) byteBuffer.WriteByte(byte(Send))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
@ -146,7 +146,7 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b
// ConstructBlockchainSyncMessage constructs Blockchain Sync Message. // ConstructBlockchainSyncMessage constructs Blockchain Sync Message.
func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte { func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(BlockchainSync)) byteBuffer.WriteByte(byte(BlockchainSync))
byteBuffer.WriteByte(byte(msgType)) byteBuffer.WriteByte(byte(msgType))
if msgType != GetLastBlockHashes { if msgType != GetLastBlockHashes {
@ -165,7 +165,7 @@ func GenerateBlockchainSyncMessage(payload []byte) *BlockchainSyncMessage {
// ConstructRequestTransactionsMessage constructs serialized transactions // ConstructRequestTransactionsMessage constructs serialized transactions
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(Request)) byteBuffer.WriteByte(byte(Request))
for _, txID := range transactionIds { for _, txID := range transactionIds {
@ -176,16 +176,16 @@ func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
// ConstructStopMessage constructs STOP message for node to stop // ConstructStopMessage constructs STOP message for node to stop
func ConstructStopMessage() []byte { func ConstructStopMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(CONTROL)) byteBuffer.WriteByte(byte(Control))
byteBuffer.WriteByte(byte(STOP)) byteBuffer.WriteByte(byte(STOP))
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }
// ConstructBlocksSyncMessage constructs blocks sync message to send blocks to other nodes // ConstructBlocksSyncMessage constructs blocks sync message to send blocks to other nodes
func ConstructBlocksSyncMessage(blocks []blockchain.Block) []byte { func ConstructBlocksSyncMessage(blocks []blockchain.Block) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(BLOCK)) byteBuffer.WriteByte(byte(Block))
byteBuffer.WriteByte(byte(Sync)) byteBuffer.WriteByte(byte(Sync))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)

@ -124,7 +124,7 @@ func GetPongMessage(payload []byte) (*PongMessageType, error) {
// ConstructPingMessage contructs ping message from node to leader // ConstructPingMessage contructs ping message from node to leader
func (ping PingMessageType) ConstructPingMessage() []byte { func (ping PingMessageType) ConstructPingMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(PING)) byteBuffer.WriteByte(byte(PING))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
@ -138,7 +138,7 @@ func (ping PingMessageType) ConstructPingMessage() []byte {
// ConstructPongMessage contructs pong message from leader to node // ConstructPongMessage contructs pong message from leader to node
func (pong PongMessageType) ConstructPongMessage() []byte { func (pong PongMessageType) ConstructPongMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(PONG)) byteBuffer.WriteByte(byte(PONG))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)

@ -0,0 +1,76 @@
package downloader
import (
"context"
"fmt"
"log"
"time"
pb "github.com/harmony-one/harmony/syncing/downloader/proto"
"google.golang.org/grpc"
)
// PrintResult ...
func PrintResult(client *Client) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER}
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Fatalf("Error")
}
log.Println(response)
}
// Client ...
type Client struct {
dlClient pb.DownloaderClient
opts []grpc.DialOption
conn *grpc.ClientConn
}
// ClientSetup ...
func ClientSetup(ip, port string) *Client {
client := Client{}
client.opts = append(client.opts, grpc.WithInsecure())
var err error
client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
return nil
}
client.dlClient = pb.NewDownloaderClient(client.conn)
return &client
}
// Close ...
func (client *Client) Close() {
client.conn.Close()
}
// GetBlockHashes ...
func (client *Client) GetBlockHashes() *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER}
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Fatalf("Error")
}
return response
}
// GetBlocks ...
func (client *Client) GetBlocks(heights []int32) *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_BLOCK}
request.Height = make([]int32, len(heights))
copy(request.Height, heights)
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Fatalf("Error")
}
return response
}

@ -0,0 +1,8 @@
package downloader
import "errors"
// Errors ...
var (
ErrDownloaderWithNoNode = errors.New("no node attached")
)

@ -0,0 +1,12 @@
package downloader
import (
pb "github.com/harmony-one/harmony/syncing/downloader/proto"
)
// DownloadInterface ...
type DownloadInterface interface {
// Syncing blockchain from other peers.
// The returned channel is the signal of syncing finish.
CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error)
}

@ -0,0 +1,240 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: downloader.proto
package downloader
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type DownloaderRequest_RequestType int32
const (
DownloaderRequest_HEADER DownloaderRequest_RequestType = 0
DownloaderRequest_BLOCK DownloaderRequest_RequestType = 1
DownloaderRequest_UNKOWN DownloaderRequest_RequestType = 2
)
var DownloaderRequest_RequestType_name = map[int32]string{
0: "HEADER",
1: "BLOCK",
2: "UNKOWN",
}
var DownloaderRequest_RequestType_value = map[string]int32{
"HEADER": 0,
"BLOCK": 1,
"UNKOWN": 2,
}
func (x DownloaderRequest_RequestType) String() string {
return proto.EnumName(DownloaderRequest_RequestType_name, int32(x))
}
func (DownloaderRequest_RequestType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_6a99ec95c7ab1ff1, []int{0, 0}
}
// DownloaderRequest is the generic download request.
type DownloaderRequest struct {
// Request type.
Type DownloaderRequest_RequestType `protobuf:"varint,1,opt,name=type,proto3,enum=downloader.DownloaderRequest_RequestType" json:"type,omitempty"`
// The array of ids or heights of the blocks we want to download.
Height []int32 `protobuf:"varint,2,rep,packed,name=height,proto3" json:"height,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DownloaderRequest) Reset() { *m = DownloaderRequest{} }
func (m *DownloaderRequest) String() string { return proto.CompactTextString(m) }
func (*DownloaderRequest) ProtoMessage() {}
func (*DownloaderRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_6a99ec95c7ab1ff1, []int{0}
}
func (m *DownloaderRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DownloaderRequest.Unmarshal(m, b)
}
func (m *DownloaderRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DownloaderRequest.Marshal(b, m, deterministic)
}
func (m *DownloaderRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_DownloaderRequest.Merge(m, src)
}
func (m *DownloaderRequest) XXX_Size() int {
return xxx_messageInfo_DownloaderRequest.Size(m)
}
func (m *DownloaderRequest) XXX_DiscardUnknown() {
xxx_messageInfo_DownloaderRequest.DiscardUnknown(m)
}
var xxx_messageInfo_DownloaderRequest proto.InternalMessageInfo
func (m *DownloaderRequest) GetType() DownloaderRequest_RequestType {
if m != nil {
return m.Type
}
return DownloaderRequest_HEADER
}
func (m *DownloaderRequest) GetHeight() []int32 {
if m != nil {
return m.Height
}
return nil
}
// DownloaderResponse is the generic response of DownloaderRequest.
type DownloaderResponse struct {
// payload of Block.
Payload [][]byte `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DownloaderResponse) Reset() { *m = DownloaderResponse{} }
func (m *DownloaderResponse) String() string { return proto.CompactTextString(m) }
func (*DownloaderResponse) ProtoMessage() {}
func (*DownloaderResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_6a99ec95c7ab1ff1, []int{1}
}
func (m *DownloaderResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DownloaderResponse.Unmarshal(m, b)
}
func (m *DownloaderResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DownloaderResponse.Marshal(b, m, deterministic)
}
func (m *DownloaderResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_DownloaderResponse.Merge(m, src)
}
func (m *DownloaderResponse) XXX_Size() int {
return xxx_messageInfo_DownloaderResponse.Size(m)
}
func (m *DownloaderResponse) XXX_DiscardUnknown() {
xxx_messageInfo_DownloaderResponse.DiscardUnknown(m)
}
var xxx_messageInfo_DownloaderResponse proto.InternalMessageInfo
func (m *DownloaderResponse) GetPayload() [][]byte {
if m != nil {
return m.Payload
}
return nil
}
func init() {
proto.RegisterEnum("downloader.DownloaderRequest_RequestType", DownloaderRequest_RequestType_name, DownloaderRequest_RequestType_value)
proto.RegisterType((*DownloaderRequest)(nil), "downloader.DownloaderRequest")
proto.RegisterType((*DownloaderResponse)(nil), "downloader.DownloaderResponse")
}
func init() { proto.RegisterFile("downloader.proto", fileDescriptor_6a99ec95c7ab1ff1) }
var fileDescriptor_6a99ec95c7ab1ff1 = []byte{
// 213 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x48, 0xc9, 0x2f, 0xcf,
0xcb, 0xc9, 0x4f, 0x4c, 0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88,
0x28, 0xcd, 0x61, 0xe4, 0x12, 0x74, 0x81, 0x73, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84,
0x6c, 0xb9, 0x58, 0x4a, 0x2a, 0x0b, 0x52, 0x25, 0x18, 0x15, 0x18, 0x35, 0xf8, 0x8c, 0x34, 0xf5,
0x90, 0x8c, 0xc0, 0x50, 0xac, 0x07, 0xa5, 0x43, 0x2a, 0x0b, 0x52, 0x83, 0xc0, 0xda, 0x84, 0xc4,
0xb8, 0xd8, 0x32, 0x52, 0x33, 0xd3, 0x33, 0x4a, 0x24, 0x98, 0x14, 0x98, 0x35, 0x58, 0x83, 0xa0,
0x3c, 0x25, 0x03, 0x2e, 0x6e, 0x24, 0xc5, 0x42, 0x5c, 0x5c, 0x6c, 0x1e, 0xae, 0x8e, 0x2e, 0xae,
0x41, 0x02, 0x0c, 0x42, 0x9c, 0x5c, 0xac, 0x4e, 0x3e, 0xfe, 0xce, 0xde, 0x02, 0x8c, 0x20, 0xe1,
0x50, 0x3f, 0x6f, 0xff, 0x70, 0x3f, 0x01, 0x26, 0x25, 0x3d, 0x2e, 0x21, 0x64, 0x0b, 0x8b, 0x0b,
0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x24, 0xb8, 0xd8, 0x0b, 0x12, 0x2b, 0x41, 0x82, 0x12, 0x8c, 0x0a,
0xcc, 0x1a, 0x3c, 0x41, 0x30, 0xae, 0x51, 0x18, 0x17, 0x17, 0x42, 0xbd, 0x90, 0x07, 0x17, 0x6b,
0x60, 0x69, 0x6a, 0x51, 0xa5, 0x90, 0x2c, 0x5e, 0x1f, 0x48, 0xc9, 0xe1, 0x92, 0x86, 0xd8, 0xa7,
0xc4, 0x90, 0xc4, 0x06, 0x0e, 0x39, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x6d, 0x18,
0x54, 0x4d, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// DownloaderClient is the client API for Downloader service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DownloaderClient interface {
Query(ctx context.Context, in *DownloaderRequest, opts ...grpc.CallOption) (*DownloaderResponse, error)
}
type downloaderClient struct {
cc *grpc.ClientConn
}
func NewDownloaderClient(cc *grpc.ClientConn) DownloaderClient {
return &downloaderClient{cc}
}
func (c *downloaderClient) Query(ctx context.Context, in *DownloaderRequest, opts ...grpc.CallOption) (*DownloaderResponse, error) {
out := new(DownloaderResponse)
err := c.cc.Invoke(ctx, "/downloader.Downloader/Query", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DownloaderServer is the server API for Downloader service.
type DownloaderServer interface {
Query(context.Context, *DownloaderRequest) (*DownloaderResponse, error)
}
func RegisterDownloaderServer(s *grpc.Server, srv DownloaderServer) {
s.RegisterService(&_Downloader_serviceDesc, srv)
}
func _Downloader_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DownloaderRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DownloaderServer).Query(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/downloader.Downloader/Query",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DownloaderServer).Query(ctx, req.(*DownloaderRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Downloader_serviceDesc = grpc.ServiceDesc{
ServiceName: "downloader.Downloader",
HandlerType: (*DownloaderServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Query",
Handler: _Downloader_Query_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "downloader.proto",
}

@ -0,0 +1,29 @@
syntax = "proto3";
package downloader;
// Downloader is the service used for downloading/sycning blocks.
service Downloader {
rpc Query(DownloaderRequest) returns (DownloaderResponse) {}
}
// DownloaderRequest is the generic download request.
message DownloaderRequest {
enum RequestType {
HEADER = 0;
BLOCK = 1;
UNKOWN = 2;
}
// Request type.
RequestType type = 1;
// The array of ids or heights of the blocks we want to download.
repeated int32 height = 2;
}
// DownloaderResponse is the generic response of DownloaderRequest.
message DownloaderResponse {
// payload of Block.
repeated bytes payload = 1;
}

@ -0,0 +1,47 @@
package downloader
import (
"context"
"fmt"
"log"
"net"
"google.golang.org/grpc"
pb "github.com/harmony-one/harmony/syncing/downloader/proto"
)
// Server ...
type Server struct {
downloadInterface DownloadInterface
}
// Query returns the feature at the given point.
func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) {
response, err := s.downloadInterface.CalculateResponse(request)
if err != nil {
return nil, err
}
// response := pb.DownloaderResponse{}
// response.Payload = [][]byte{{0, 0, 2}}
return response, nil
}
// Start ...
func (s *Server) Start(ip, port string) (*grpc.Server, error) {
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", ip, port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
pb.RegisterDownloaderServer(grpcServer, s)
go grpcServer.Serve(lis)
return grpcServer, nil
}
// NewServer ...
func NewServer(dlInterface DownloadInterface) *Server {
s := &Server{downloadInterface: dlInterface}
return s
}

@ -0,0 +1,109 @@
package downloader_test
import (
"fmt"
"reflect"
"testing"
bc "github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/syncing/downloader"
pb "github.com/harmony-one/harmony/syncing/downloader/proto"
)
const (
serverPort = "9997"
serverIP = "127.0.0.1"
clientPort = "9999"
)
var (
PriIntOne = 111
PriIntTwo = 222
TestAddressOne = pki.GetAddressFromInt(PriIntOne)
TestAddressTwo = pki.GetAddressFromInt(PriIntTwo)
ShardID = uint32(0)
)
type FakeNode struct {
bc *bc.Blockchain
}
// GetBlockHashes used for state download.
func (node *FakeNode) GetBlockHashes() [][]byte {
res := [][]byte{}
for _, block := range node.bc.Blocks {
res = append(res, block.Hash[:])
}
return res
}
// GetBlocks used for state download.
func (node *FakeNode) GetBlocks() [][]byte {
res := [][]byte{}
for _, block := range node.bc.Blocks {
res = append(res, block.Serialize())
}
return res
}
// SetBlockchain is used for testing
func (node *FakeNode) Init() {
addresses := [][20]byte{TestAddressOne, TestAddressTwo}
node.bc = bc.CreateBlockchainWithMoreBlocks(addresses, ShardID)
}
func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) {
response := &pb.DownloaderResponse{}
if request.Type == pb.DownloaderRequest_HEADER {
for _, block := range node.bc.Blocks {
response.Payload = append(response.Payload, block.Hash[:])
}
} else {
for _, id := range request.Height {
response.Payload = append(response.Payload, node.bc.Blocks[id].Serialize())
}
}
return response, nil
}
func TestGetBlockHashes(t *testing.T) {
fakeNode := &FakeNode{}
fakeNode.Init()
s := downloader.NewServer(fakeNode)
grcpServer, err := s.Start(serverIP, serverPort)
if err != nil {
t.Error(err)
}
defer grcpServer.Stop()
client := downloader.ClientSetup(serverIP, serverPort)
defer client.Close()
response := client.GetBlockHashes()
if !reflect.DeepEqual(response.Payload, fakeNode.GetBlockHashes()) {
t.Error("not equal")
}
}
func TestGetBlocks(t *testing.T) {
fakeNode := &FakeNode{}
fakeNode.Init()
s := downloader.NewServer(fakeNode)
grcpServer, err := s.Start(serverIP, serverPort)
if err != nil {
t.Error(err)
}
defer grcpServer.Stop()
client := downloader.ClientSetup(serverIP, serverPort)
defer client.Close()
response := client.GetBlockHashes()
if !reflect.DeepEqual(response.Payload, fakeNode.GetBlockHashes()) {
t.Error("not equal")
}
response = client.GetBlocks([]int32{0, 1})
fmt.Println(len(response.Payload))
if !reflect.DeepEqual(response.Payload, fakeNode.GetBlocks()) {
t.Error("not equal")
}
}

@ -0,0 +1,8 @@
package syncing
import "errors"
// Errors ...
var (
ErrSyncPeerConfigClientNotReady = errors.New("client is not ready")
)

@ -1,8 +1,6 @@
package syncing package syncing
import ( import (
"bufio"
"net"
"reflect" "reflect"
"sync" "sync"
"time" "time"
@ -10,23 +8,21 @@ import (
"github.com/Workiva/go-datastructures/queue" "github.com/Workiva/go-datastructures/queue"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
proto_node "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/syncing/downloader"
) )
// SyncPeerConfig is peer config to sync. // SyncPeerConfig is peer config to sync.
type SyncPeerConfig struct { type SyncPeerConfig struct {
peer p2p.Peer ip string
conn net.Conn port string
w *bufio.Writer client *downloader.Client
err error blockHashes [][]byte
trusted bool
blockHashes [][32]byte
} }
// SyncBlockTask is the task struct to sync a specific block. // SyncBlockTask is the task struct to sync a specific block.
type SyncBlockTask struct { type SyncBlockTask struct {
index int index int
blockHash [32]byte blockHash []byte
} }
// SyncConfig contains an array of SyncPeerConfig. // SyncConfig contains an array of SyncPeerConfig.
@ -48,6 +44,29 @@ type StateSync struct {
stateSyncTaskQueue *queue.Queue stateSyncTaskQueue *queue.Queue
} }
// GetBlockHashes ...
func (peerConfig *SyncPeerConfig) GetBlockHashes() error {
if peerConfig.client == nil {
return ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlockHashes()
peerConfig.blockHashes = make([][]byte, len(response.Payload))
for i := range response.Payload {
peerConfig.blockHashes[i] = make([]byte, len(response.Payload[i]))
copy(peerConfig.blockHashes[i], response.Payload[i])
}
return nil
}
// GetBlocks ...
func (peerConfig *SyncPeerConfig) GetBlocks(heights []int32) ([][]byte, error) {
if peerConfig.client == nil {
return nil, ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlocks(heights)
return response.Payload, nil
}
// ProcessStateSyncFromPeers used to do state sync. // ProcessStateSyncFromPeers used to do state sync.
func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) { func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.Blockchain) (chan struct{}, error) {
// TODO: Validate peers. // TODO: Validate peers.
@ -71,8 +90,10 @@ func (ss *StateSync) createSyncConfig(peers []p2p.Peer) {
peers: make([]SyncPeerConfig, ss.peerNumber), peers: make([]SyncPeerConfig, ss.peerNumber),
} }
for id := range ss.syncConfig.peers { for id := range ss.syncConfig.peers {
ss.syncConfig.peers[id].peer = peers[id] ss.syncConfig.peers[id] = SyncPeerConfig{
ss.syncConfig.peers[id].trusted = false ip: peers[id].Ip,
port: peers[id].Port,
}
} }
} }
@ -83,29 +104,27 @@ func (ss *StateSync) makeConnectionToPeers() {
for _, synPeerConfig := range ss.syncConfig.peers { for _, synPeerConfig := range ss.syncConfig.peers {
go func(peerConfig *SyncPeerConfig) { go func(peerConfig *SyncPeerConfig) {
defer wg.Done() defer wg.Done()
peerConfig.conn, peerConfig.err = p2p.DialWithSocketClient(peerConfig.peer.Ip, peerConfig.peer.Port) peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port)
}(&synPeerConfig) }(&synPeerConfig)
} }
wg.Wait() wg.Wait()
ss.activePeerNumber = 0 ss.activePeerNumber = 0
for _, configPeer := range ss.syncConfig.peers { for _, configPeer := range ss.syncConfig.peers {
if configPeer.err == nil { if configPeer.client != nil {
ss.activePeerNumber++ ss.activePeerNumber++
configPeer.w = bufio.NewWriter(configPeer.conn)
configPeer.trusted = true
} }
} }
} }
// areConsensusHashesEqual chesk if all consensus hashes are equal. // areConsensusHashesEqual chesk if all consensus hashes are equal.
func (ss *StateSync) areConsensusHashesEqual() bool { func (ss *StateSync) areConsensusHashesEqual() bool {
var fixedPeer *SyncPeerConfig var firstPeer *SyncPeerConfig
for _, configPeer := range ss.syncConfig.peers { for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted { if configPeer.client != nil {
if fixedPeer == nil { if firstPeer == nil {
fixedPeer = &configPeer firstPeer = &configPeer
} }
if !reflect.DeepEqual(configPeer.blockHashes, fixedPeer) { if !reflect.DeepEqual(configPeer.blockHashes, firstPeer.blockHashes) {
return false return false
} }
} }
@ -120,29 +139,15 @@ func (ss *StateSync) getConsensusHashes() {
wg.Add(ss.activePeerNumber) wg.Add(ss.activePeerNumber)
for _, configPeer := range ss.syncConfig.peers { for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil { if configPeer.client == nil {
continue continue
} }
go func(peerConfig *SyncPeerConfig) { go func(peerConfig *SyncPeerConfig) {
defer wg.Done() defer wg.Done()
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetLastBlockHashes, [32]byte{}) peerConfig.client.GetBlockHashes()
peerConfig.w.Write(msg)
peerConfig.w.Flush()
var content []byte
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
var blockchainSyncMessage *proto_node.BlockchainSyncMessage
blockchainSyncMessage, peerConfig.err = proto_node.DeserializeBlockchainSyncMessage(content)
if peerConfig.err != nil {
peerConfig.trusted = false
return
}
peerConfig.blockHashes = blockchainSyncMessage.BlockHashes
}(&configPeer) }(&configPeer)
} }
wg.Wait()
if ss.areConsensusHashesEqual() { if ss.areConsensusHashesEqual() {
break break
} }
@ -153,11 +158,12 @@ func (ss *StateSync) getConsensusHashes() {
func (ss *StateSync) generateStateSyncTaskQueue() { func (ss *StateSync) generateStateSyncTaskQueue() {
ss.stateSyncTaskQueue = queue.New(0) ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers { for _, configPeer := range ss.syncConfig.peers {
if configPeer.trusted { if configPeer.client != nil {
for id, blockHash := range configPeer.blockHashes { for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}) ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
} }
ss.blockHeight = len(configPeer.blockHashes) ss.blockHeight = len(configPeer.blockHashes)
break
} }
} }
} }
@ -169,7 +175,7 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(int(ss.stateSyncTaskQueue.Len())) wg.Add(int(ss.stateSyncTaskQueue.Len()))
for _, configPeer := range ss.syncConfig.peers { for _, configPeer := range ss.syncConfig.peers {
if configPeer.err != nil { if configPeer.client == nil {
continue continue
} }
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) { go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *blockchain.Blockchain) {
@ -180,18 +186,15 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
break break
} }
syncTask := task[0].(SyncBlockTask) syncTask := task[0].(SyncBlockTask)
msg := proto_node.ConstructBlockchainSyncMessage(proto_node.GetBlock, syncTask.blockHash) for {
peerConfig.w.Write(msg) id := syncTask.index
peerConfig.w.Flush() heights := []int32{int32(id)}
var content []byte payload, err := peerConfig.GetBlocks(heights)
content, peerConfig.err = p2p.ReadMessageContent(peerConfig.conn) if err != nil {
if peerConfig.err != nil { // Write log
peerConfig.trusted = false } else {
return bc.Blocks[id], err = blockchain.DeserializeBlock(payload[0])
} }
block, err := blockchain.DeserializeBlock(content)
if err == nil {
bc.Blocks[syncTask.index] = block
} }
} }
}(&configPeer, ss.stateSyncTaskQueue, bc) }(&configPeer, ss.stateSyncTaskQueue, bc)
@ -215,7 +218,3 @@ func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain)
ss.downloadBlocks(bc) ss.downloadBlocks(bc)
} }
func getConsensus(syncConfig *SyncConfig) bool {
return true
}

Loading…
Cancel
Save