state syncing integration (#283)

* state syncing integration

* fix grpc too many sockets issue; fix last mile catch up issue

* fix in sync message not updated; modify sync_test with new flag; pass
golint test

* address issues in the pull request
pull/293/head
chaosma 6 years ago committed by GitHub
parent a1c0df83af
commit 3161a42e49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      api/services/syncing/downloader/client.go
  2. 128
      api/services/syncing/downloader/proto/downloader.pb.go
  3. 14
      api/services/syncing/downloader/proto/downloader.proto
  4. 3
      api/services/syncing/downloader/server.go
  5. 5
      api/services/syncing/errors.go
  6. 2
      api/services/syncing/interface.go
  7. 370
      api/services/syncing/syncing.go
  8. 12
      api/services/syncing/syncing.md
  9. 3
      cmd/client/txgen/main.go
  10. 19
      consensus/consensus.go
  11. 7
      consensus/consensus_leader.go
  12. 17
      consensus/consensus_test.go
  13. 14
      consensus/consensus_validator.go
  14. 20
      crypto/cosi.go
  15. 222
      node/node.go
  16. 11
      node/node_handler.go
  17. 12
      test/configs/local_config2.txt
  18. 33
      test/deploy.sh

@ -2,12 +2,14 @@ package downloader
import (
"context"
"encoding/binary"
"fmt"
"log"
"time"
pb "github.com/harmony-one/harmony/api/services/syncing/downloader/proto"
"google.golang.org/grpc"
pb "github.com/harmony-one/harmony/api/services/syncing/downloader/proto"
)
// Client is the client model for downloader package.
@ -24,7 +26,7 @@ func ClientSetup(ip, port string) *Client {
var err error
client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
log.Printf("client.go:ClientSetup fail to dial: %v", err)
return nil
}
@ -34,17 +36,20 @@ func ClientSetup(ip, port string) *Client {
// Close closes the Client.
func (client *Client) Close() {
client.conn.Close()
err := client.conn.Close()
if err != nil {
log.Printf("unable to close connection %v with error %v ", client.conn, err)
}
}
// GetBlockHashes gets block hashes from all the peers by calling grpc request.
func (client *Client) GetBlockHashes() *pb.DownloaderResponse {
func (client *Client) GetBlockHashes(startHash []byte) *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER}
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER, BlockHash: startHash}
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Fatalf("Error")
log.Printf("[SYNC] GetBlockHashes query failed with error %v", err)
}
return response
}
@ -61,7 +66,46 @@ func (client *Client) GetBlocks(hashes [][]byte) *pb.DownloaderResponse {
}
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Fatalf("Error")
log.Printf("[SYNC] downloader/client.go:GetBlocks query failed with error %v", err)
}
return response
}
// Register will register node's ip/port information to peers receive newly created blocks in future
// hash is the bytes of "ip:port" string representation
func (client *Client) Register(hash []byte) *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_REGISTER}
request.PeerHash = make([]byte, len(hash))
copy(request.PeerHash, hash)
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Printf("[SYNC] client.go:Register failed with code %v", err)
}
return response
}
// PushNewBlock will send the lastest verified blow to registered nodes
func (client *Client) PushNewBlock(peerID uint32, blockHash []byte, timeout bool) *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
peerHash := make([]byte, 4)
binary.BigEndian.PutUint32(peerHash, peerID)
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_NEWBLOCK}
request.BlockHash = make([]byte, len(blockHash))
copy(request.BlockHash, blockHash)
request.PeerHash = make([]byte, len(peerHash))
copy(request.PeerHash, peerHash)
if timeout {
request.Type = pb.DownloaderRequest_REGISTERTIMEOUT
}
response, err := client.dlClient.Query(ctx, request)
if err != nil {
log.Printf("[SYNC] unable to send new block to unsync node with error: %v", err)
}
return response
}

@ -6,10 +6,9 @@ package downloader
import (
context "context"
fmt "fmt"
math "math"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -21,26 +20,35 @@ var _ = math.Inf
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type DownloaderRequest_RequestType int32
const (
DownloaderRequest_HEADER DownloaderRequest_RequestType = 0
DownloaderRequest_BLOCK DownloaderRequest_RequestType = 1
DownloaderRequest_UNKOWN DownloaderRequest_RequestType = 2
DownloaderRequest_HEADER DownloaderRequest_RequestType = 0
DownloaderRequest_BLOCK DownloaderRequest_RequestType = 1
DownloaderRequest_NEWBLOCK DownloaderRequest_RequestType = 2
DownloaderRequest_REGISTER DownloaderRequest_RequestType = 3
DownloaderRequest_REGISTERTIMEOUT DownloaderRequest_RequestType = 4
DownloaderRequest_UNKNOWN DownloaderRequest_RequestType = 5
)
var DownloaderRequest_RequestType_name = map[int32]string{
0: "HEADER",
1: "BLOCK",
2: "UNKOWN",
2: "NEWBLOCK",
3: "REGISTER",
4: "REGISTERTIMEOUT",
5: "UNKNOWN",
}
var DownloaderRequest_RequestType_value = map[string]int32{
"HEADER": 0,
"BLOCK": 1,
"UNKOWN": 2,
"HEADER": 0,
"BLOCK": 1,
"NEWBLOCK": 2,
"REGISTER": 3,
"REGISTERTIMEOUT": 4,
"UNKNOWN": 5,
}
func (x DownloaderRequest_RequestType) String() string {
@ -51,12 +59,42 @@ func (DownloaderRequest_RequestType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_6a99ec95c7ab1ff1, []int{0, 0}
}
type DownloaderResponse_RegisterResponseType int32
const (
DownloaderResponse_SUCCESS DownloaderResponse_RegisterResponseType = 0
DownloaderResponse_FAIL DownloaderResponse_RegisterResponseType = 1
DownloaderResponse_INSYNC DownloaderResponse_RegisterResponseType = 2
)
var DownloaderResponse_RegisterResponseType_name = map[int32]string{
0: "SUCCESS",
1: "FAIL",
2: "INSYNC",
}
var DownloaderResponse_RegisterResponseType_value = map[string]int32{
"SUCCESS": 0,
"FAIL": 1,
"INSYNC": 2,
}
func (x DownloaderResponse_RegisterResponseType) String() string {
return proto.EnumName(DownloaderResponse_RegisterResponseType_name, int32(x))
}
func (DownloaderResponse_RegisterResponseType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_6a99ec95c7ab1ff1, []int{1, 0}
}
// DownloaderRequest is the generic download request.
type DownloaderRequest struct {
// Request type.
Type DownloaderRequest_RequestType `protobuf:"varint,1,opt,name=type,proto3,enum=downloader.DownloaderRequest_RequestType" json:"type,omitempty"`
// The hashes of the blocks we want to download.
Hashes [][]byte `protobuf:"bytes,2,rep,name=hashes,proto3" json:"hashes,omitempty"`
PeerHash []byte `protobuf:"bytes,3,opt,name=peerHash,proto3" json:"peerHash,omitempty"`
BlockHash []byte `protobuf:"bytes,4,opt,name=blockHash,proto3" json:"blockHash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -101,13 +139,29 @@ func (m *DownloaderRequest) GetHashes() [][]byte {
return nil
}
func (m *DownloaderRequest) GetPeerHash() []byte {
if m != nil {
return m.PeerHash
}
return nil
}
func (m *DownloaderRequest) GetBlockHash() []byte {
if m != nil {
return m.BlockHash
}
return nil
}
// DownloaderResponse is the generic response of DownloaderRequest.
type DownloaderResponse struct {
// payload of Block.
Payload [][]byte `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Payload [][]byte `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty"`
// response of registration request
Type DownloaderResponse_RegisterResponseType `protobuf:"varint,2,opt,name=type,proto3,enum=downloader.DownloaderResponse_RegisterResponseType" json:"type,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DownloaderResponse) Reset() { *m = DownloaderResponse{} }
@ -142,8 +196,16 @@ func (m *DownloaderResponse) GetPayload() [][]byte {
return nil
}
func (m *DownloaderResponse) GetType() DownloaderResponse_RegisterResponseType {
if m != nil {
return m.Type
}
return DownloaderResponse_SUCCESS
}
func init() {
proto.RegisterEnum("downloader.DownloaderRequest_RequestType", DownloaderRequest_RequestType_name, DownloaderRequest_RequestType_value)
proto.RegisterEnum("downloader.DownloaderResponse_RegisterResponseType", DownloaderResponse_RegisterResponseType_name, DownloaderResponse_RegisterResponseType_value)
proto.RegisterType((*DownloaderRequest)(nil), "downloader.DownloaderRequest")
proto.RegisterType((*DownloaderResponse)(nil), "downloader.DownloaderResponse")
}
@ -151,21 +213,29 @@ func init() {
func init() { proto.RegisterFile("downloader.proto", fileDescriptor_6a99ec95c7ab1ff1) }
var fileDescriptor_6a99ec95c7ab1ff1 = []byte{
// 210 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x48, 0xc9, 0x2f, 0xcf,
0xcb, 0xc9, 0x4f, 0x4c, 0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88,
0x28, 0xcd, 0x61, 0xe4, 0x12, 0x74, 0x81, 0x73, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84,
0x6c, 0xb9, 0x58, 0x4a, 0x2a, 0x0b, 0x52, 0x25, 0x18, 0x15, 0x18, 0x35, 0xf8, 0x8c, 0x34, 0xf5,
0x90, 0x8c, 0xc0, 0x50, 0xac, 0x07, 0xa5, 0x43, 0x2a, 0x0b, 0x52, 0x83, 0xc0, 0xda, 0x84, 0xc4,
0xb8, 0xd8, 0x32, 0x12, 0x8b, 0x33, 0x52, 0x8b, 0x25, 0x98, 0x14, 0x98, 0x35, 0x78, 0x82, 0xa0,
0x3c, 0x25, 0x03, 0x2e, 0x6e, 0x24, 0xc5, 0x42, 0x5c, 0x5c, 0x6c, 0x1e, 0xae, 0x8e, 0x2e, 0xae,
0x41, 0x02, 0x0c, 0x42, 0x9c, 0x5c, 0xac, 0x4e, 0x3e, 0xfe, 0xce, 0xde, 0x02, 0x8c, 0x20, 0xe1,
0x50, 0x3f, 0x6f, 0xff, 0x70, 0x3f, 0x01, 0x26, 0x25, 0x3d, 0x2e, 0x21, 0x64, 0x0b, 0x8b, 0x0b,
0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x24, 0xb8, 0xd8, 0x0b, 0x12, 0x2b, 0x41, 0x82, 0x12, 0x8c, 0x60,
0x0b, 0x60, 0x5c, 0xa3, 0x30, 0x2e, 0x2e, 0x84, 0x7a, 0x21, 0x0f, 0x2e, 0xd6, 0xc0, 0xd2, 0xd4,
0xa2, 0x4a, 0x21, 0x59, 0xbc, 0x3e, 0x90, 0x92, 0xc3, 0x25, 0x0d, 0xb1, 0x4f, 0x89, 0x21, 0x89,
0x0d, 0x1c, 0x72, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xab, 0xfd, 0xd9, 0xfa, 0x4d, 0x01,
0x00, 0x00,
// 337 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xcf, 0x4e, 0xc2, 0x40,
0x10, 0xc6, 0xd9, 0x52, 0xfe, 0x0d, 0x44, 0xd7, 0xd1, 0x98, 0x86, 0xa8, 0x21, 0x3d, 0xe1, 0xa5,
0x07, 0x38, 0x79, 0xf0, 0x80, 0x65, 0x85, 0x06, 0x2c, 0x71, 0x5b, 0x24, 0x1e, 0x8b, 0x6c, 0xc4,
0x48, 0x68, 0xed, 0x96, 0x98, 0xbe, 0x81, 0xcf, 0xe3, 0x13, 0x9a, 0x96, 0x3f, 0x25, 0x51, 0x39,
0x35, 0xbf, 0x6f, 0xba, 0x33, 0xf3, 0x7d, 0xbb, 0x40, 0x67, 0xfe, 0xe7, 0x72, 0xe1, 0x7b, 0x33,
0x11, 0x1a, 0x41, 0xe8, 0x47, 0x3e, 0x42, 0xa6, 0xe8, 0x5f, 0x0a, 0x9c, 0x74, 0x77, 0xc8, 0xc5,
0xc7, 0x4a, 0xc8, 0x08, 0x6f, 0x41, 0x8d, 0xe2, 0x40, 0x68, 0xa4, 0x41, 0x9a, 0x47, 0xad, 0x6b,
0x63, 0xaf, 0xc5, 0xaf, 0x9f, 0x8d, 0xcd, 0xd7, 0x8d, 0x03, 0xc1, 0xd3, 0x63, 0x78, 0x0e, 0xc5,
0xb9, 0x27, 0xe7, 0x42, 0x6a, 0x4a, 0x23, 0xdf, 0xac, 0xf1, 0x0d, 0x61, 0x1d, 0xca, 0x81, 0x10,
0x61, 0xdf, 0x93, 0x73, 0x2d, 0xdf, 0x20, 0xcd, 0x1a, 0xdf, 0x31, 0x5e, 0x40, 0x65, 0xba, 0xf0,
0x5f, 0xde, 0xd3, 0xa2, 0x9a, 0x16, 0x33, 0x41, 0x9f, 0x42, 0x75, 0x6f, 0x0c, 0x02, 0x14, 0xfb,
0xac, 0xd3, 0x65, 0x9c, 0xe6, 0xb0, 0x02, 0x85, 0xbb, 0xe1, 0xc8, 0x1c, 0x50, 0x82, 0x35, 0x28,
0xdb, 0x6c, 0xb2, 0x26, 0x25, 0x21, 0xce, 0x7a, 0x96, 0xe3, 0x32, 0x4e, 0xf3, 0x78, 0x0a, 0xc7,
0x5b, 0x72, 0xad, 0x07, 0x36, 0x1a, 0xbb, 0x54, 0xc5, 0x2a, 0x94, 0xc6, 0xf6, 0xc0, 0x1e, 0x4d,
0x6c, 0x5a, 0xd0, 0xbf, 0x09, 0xe0, 0xbe, 0x3b, 0x19, 0xf8, 0x4b, 0x29, 0x50, 0x83, 0x52, 0xe0,
0xc5, 0x89, 0xa8, 0x91, 0xd4, 0xcd, 0x16, 0xb1, 0xb7, 0x49, 0x49, 0x49, 0x53, 0x6a, 0xff, 0x97,
0xd2, 0xba, 0x8f, 0xc1, 0xc5, 0xeb, 0x9b, 0x8c, 0x32, 0x21, 0xcb, 0x4b, 0xbf, 0x81, 0xb3, 0xbf,
0xaa, 0xc9, 0x7a, 0xce, 0xd8, 0x34, 0x99, 0xe3, 0xd0, 0x1c, 0x96, 0x41, 0xbd, 0xef, 0x58, 0x43,
0x4a, 0x12, 0xf7, 0x96, 0xed, 0x3c, 0xdb, 0x26, 0x55, 0x5a, 0x4f, 0x00, 0xd9, 0x2c, 0xec, 0x43,
0xe1, 0x71, 0x25, 0xc2, 0x18, 0x2f, 0x0f, 0x5e, 0x59, 0xfd, 0xea, 0xf0, 0xae, 0x7a, 0x6e, 0x5a,
0x4c, 0x9f, 0x4a, 0xfb, 0x27, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xfb, 0x70, 0x9d, 0x3e, 0x02, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.

@ -12,7 +12,10 @@ message DownloaderRequest {
enum RequestType {
HEADER = 0;
BLOCK = 1;
UNKOWN = 2;
NEWBLOCK = 2;
REGISTER = 3;
REGISTERTIMEOUT = 4;
UNKNOWN = 5;
}
// Request type.
@ -20,10 +23,19 @@ message DownloaderRequest {
// The hashes of the blocks we want to download.
repeated bytes hashes = 2;
bytes peerHash = 3;
bytes blockHash = 4;
}
// DownloaderResponse is the generic response of DownloaderRequest.
message DownloaderResponse {
enum RegisterResponseType {
SUCCESS = 0;
FAIL = 1;
INSYNC = 2; // node is now in sync, remove it from the broadcast list
}
// payload of Block.
repeated bytes payload = 1;
// response of registration request
RegisterResponseType type = 2;
}

@ -5,9 +5,8 @@ import (
"log"
"net"
"google.golang.org/grpc"
pb "github.com/harmony-one/harmony/api/services/syncing/downloader/proto"
"google.golang.org/grpc"
)
// Constants for downloader server.

@ -4,5 +4,8 @@ import "errors"
// Errors ...
var (
ErrSyncPeerConfigClientNotReady = errors.New("client is not ready")
ErrSyncPeerConfigClientNotReady = errors.New("[SYNC]: client is not ready")
ErrRegistrationFail = errors.New("[SYNC]: registration failed")
ErrGetBlock = errors.New("[SYNC]: get block failed")
ErrGetBlockHash = errors.New("[SYNC]: get blockhash failed")
)

@ -9,5 +9,5 @@ import (
type StateSyncInterface interface {
// Syncing blockchain from other peers.
// The returned channel is the signal of syncing finish.
ProcessStateSyncFromPeers(peers []p2p.Peer, bc *core.BlockChain) (chan struct{}, error)
ProcessStateSyncFromPeers(startHash []byte, peers []p2p.Peer, bc *core.BlockChain) (chan struct{}, error)
}

@ -2,16 +2,25 @@ package syncing
import (
"bytes"
"encoding/binary"
"fmt"
"reflect"
"sort"
"strconv"
"sync"
"time"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/Workiva/go-datastructures/queue"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/api/services/syncing/downloader"
pb "github.com/harmony-one/harmony/api/services/syncing/downloader/proto"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/p2p"
)
@ -20,6 +29,8 @@ const (
ConsensusRatio = float64(0.66)
SleepTimeAfterNonConsensusBlockHashes = time.Second * 30
TimesToFail = 5
RegistrationNumber = 3
SyncingPortDifference = 3000
)
// SyncPeerConfig is peer config to sync.
@ -27,7 +38,9 @@ type SyncPeerConfig struct {
ip string
port string
client *downloader.Client
blockHashes [][]byte
blockHashes [][]byte // block hashes before node doing sync
newBlocks []*types.Block // blocks after node doing sync
mux sync.Mutex
}
// GetClient returns client pointer of downloader.Client
@ -49,18 +62,67 @@ type SyncConfig struct {
peers []*SyncPeerConfig
}
// GetStateSync returns the implementation of StateSyncInterface interface.
func GetStateSync() *StateSync {
return &StateSync{}
// CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(ip string, port string) *StateSync {
stateSync := &StateSync{}
stateSync.selfip = ip
stateSync.selfport = port
stateSync.commonBlocks = make(map[int]*types.Block)
stateSync.lastMileBlocks = []*types.Block{}
return stateSync
}
// StateSync is the struct that implements StateSyncInterface.
type StateSync struct {
selfip string
selfport string
peerNumber int
activePeerNumber int
blockHeight int
commonBlocks map[int]*types.Block
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
syncConfig *SyncConfig
stateSyncTaskQueue *queue.Queue
syncMux sync.Mutex
}
// AddLastMileBlock add the lastest a few block into queue for syncing
func (ss *StateSync) AddLastMileBlock(block *types.Block) {
ss.lastMileBlocks = append(ss.lastMileBlocks, block)
}
// CloseConnections close grpc connections for state sync clients
func (ss *StateSync) CloseConnections() {
for _, pc := range ss.syncConfig.peers {
if pc.client != nil {
pc.client.Close()
}
}
}
// GetServicePort returns the service port from syncing port
// TODO: really need use a unique ID instead of ip/port
func GetServicePort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil {
return fmt.Sprintf("%d", port+SyncingPortDifference)
}
Log.Warn("unable to get service port")
return ""
}
// AddNewBlock will add newly received block into state syncing queue
func (ss *StateSync) AddNewBlock(peerHash []byte, block *types.Block) {
for i, pc := range ss.syncConfig.peers {
pid := utils.GetUniqueIDFromIPPort(pc.ip, GetServicePort(pc.port))
ph := make([]byte, 4)
binary.BigEndian.PutUint32(ph, pid)
if bytes.Compare(ph, peerHash) != 0 {
continue
}
pc.mux.Lock()
pc.newBlocks = append(pc.newBlocks, block)
pc.mux.Unlock()
Log.Debug("[SYNC] new block received", "total", len(ss.syncConfig.peers[i].newBlocks), "blockHeight", block.NumberU64())
}
}
// CreateTestSyncPeerConfig used for testing.
@ -87,46 +149,23 @@ func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) in
return 0
}
// GetBlockHashes gets block hashes by calling grpc request to the corresponding peer.
func (peerConfig *SyncPeerConfig) GetBlockHashes() error {
if peerConfig.client == nil {
return ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlockHashes()
peerConfig.blockHashes = make([][]byte, len(response.Payload))
for i := range response.Payload {
peerConfig.blockHashes[i] = make([]byte, len(response.Payload[i]))
copy(peerConfig.blockHashes[i], response.Payload[i])
}
return nil
}
// GetBlocks gets blocks by calling grpc request to the corresponding peer.
func (peerConfig *SyncPeerConfig) GetBlocks(hashes [][]byte) ([][]byte, error) {
if peerConfig.client == nil {
return nil, ErrSyncPeerConfigClientNotReady
}
response := peerConfig.client.GetBlocks(hashes)
if response == nil {
return nil, ErrGetBlock
}
return response.Payload, nil
}
// ProcessStateSyncFromPeers used to do state sync.
func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *core.BlockChain) (chan struct{}, error) {
// TODO: Validate peers.
done := make(chan struct{})
go func() {
ss.StartStateSync(peers, bc)
done <- struct{}{}
}()
return done, nil
}
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
Log.Debug("CreateSyncConfig: len of peers", "len", len(peers))
Log.Debug("CreateSyncConfig: len of peers", "peers", peers)
ss.peerNumber = len(peers)
Log.Debug("CreateSyncConfig: hello")
ss.syncConfig = &SyncConfig{
peers: make([]*SyncPeerConfig, ss.peerNumber),
}
@ -135,9 +174,9 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
ip: peers[id].IP,
port: peers[id].Port,
}
Log.Debug("CreateSyncConfig: peer port to connect", "port", peers[id].Port)
Log.Debug("[SYNC] CreateSyncConfig: peer port to connect", "port", peers[id].Port)
}
Log.Info("syncing: Finished creating SyncConfig.")
Log.Info("[SYNC] syncing: Finished creating SyncConfig.")
}
// MakeConnectionToPeers makes grpc connection to all peers.
@ -229,19 +268,21 @@ func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool {
}
// GetConsensusHashes gets all hashes needed to download.
func (ss *StateSync) GetConsensusHashes() bool {
func (ss *StateSync) GetConsensusHashes(startHash []byte) bool {
count := 0
for {
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
for id := range ss.syncConfig.peers {
if ss.syncConfig.peers[id].client == nil {
continue
}
wg.Add(1)
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
response := peerConfig.client.GetBlockHashes()
response := peerConfig.client.GetBlockHashes(startHash)
if response == nil {
return
}
peerConfig.blockHashes = response.Payload
}(ss.syncConfig.peers[id])
}
@ -260,25 +301,17 @@ func (ss *StateSync) GetConsensusHashes() bool {
return true
}
// getConsensusHashes gets all hashes needed to download.
func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers {
if configPeer.client != nil {
ss.blockHeight = len(configPeer.blockHashes)
// TODO (minh) rework the syncing for account model.
//bc.Blocks = append(bc.Blocks, make([]*blockchain.Block, ss.blockHeight-len(bc.Blocks))...)
//for id, blockHash := range configPeer.blockHashes {
// if bc.Blocks[id] == nil || !reflect.DeepEqual(bc.Blocks[id].Hash[:], blockHash) {
// ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
// // TODO(minhdoan): Check error
// }
//}
for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
}
break
}
}
Log.Info("syncing: Finished generateStateSyncTaskQueue.")
Log.Info("syncing: Finished generateStateSyncTaskQueue", "length", ss.stateSyncTaskQueue.Len())
}
// downloadBlocks downloads blocks from state sync task queue.
@ -286,6 +319,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
// Initialize blockchain
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
count := 0
for i := range ss.syncConfig.peers {
if ss.syncConfig.peers[i].client == nil {
continue
@ -293,45 +327,241 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
go func(peerConfig *SyncPeerConfig, stateSyncTaskQueue *queue.Queue, bc *core.BlockChain) {
defer wg.Done()
for !stateSyncTaskQueue.Empty() {
task, err := stateSyncTaskQueue.Poll(1, time.Millisecond)
task, err := ss.stateSyncTaskQueue.Poll(1, time.Millisecond)
if err == queue.ErrTimeout {
Log.Debug("[SYNC] ss.stateSyncTaskQueue poll timeout", "error", err)
break
}
syncTask := task[0].(SyncBlockTask)
for {
//id := syncTask.index
_, err := peerConfig.GetBlocks([][]byte{syncTask.blockHash})
if err == nil {
// As of now, only send and ask for one block.
// TODO (minh) rework the syncing for account model.
//bc.Blocks[id], err = blockchain.DeserializeBlock(payload[0])
//_, err = blockchain.DeserializeBlock(payload[0])
if err == nil {
break
}
//id := syncTask.index
payload, err := peerConfig.GetBlocks([][]byte{syncTask.blockHash})
if err != nil {
count++
Log.Debug("[SYNC] GetBlocks failed", "failNumber", count)
if count > TimesToFail {
break
}
ss.stateSyncTaskQueue.Put(syncTask)
continue
}
var blockObj types.Block
// currently only send one block a time
err = rlp.DecodeBytes(payload[0], &blockObj)
if err != nil {
count++
Log.Debug("[SYNC] downloadBlocks: failed to DecodeBytes from received new block")
if count > TimesToFail {
break
}
ss.stateSyncTaskQueue.Put(syncTask)
continue
}
ss.syncMux.Lock()
ss.commonBlocks[syncTask.index] = &blockObj
ss.syncMux.Unlock()
}
}(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc)
}
wg.Wait()
Log.Info("syncing: Finished downloadBlocks.")
Log.Info("[SYNC] Finished downloadBlocks.")
}
// CompareBlockByHash compares two block by hash, it will be used in sort the blocks
func CompareBlockByHash(a *types.Block, b *types.Block) int {
ha := a.Hash()
hb := b.Hash()
return bytes.Compare(ha[:], hb[:])
}
// GetHowManyMaxConsensus will get the most common blocks and the first such blockID
func GetHowManyMaxConsensus(blocks []*types.Block) (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
curCount := 0
curFirstID := -1
maxCount := 0
maxFirstID := -1
for i := range blocks {
if curFirstID == -1 || CompareBlockByHash(blocks[curFirstID], blocks[i]) != 0 {
curCount = 1
curFirstID = i
} else {
curCount++
}
if curCount > maxCount {
maxCount = curCount
maxFirstID = curFirstID
}
}
return maxFirstID, maxCount
}
func (ss *StateSync) getMaxConsensusBlockFromParentHash(parentHash common.Hash) *types.Block {
candidateBlocks := []*types.Block{}
ss.syncMux.Lock()
for id := range ss.syncConfig.peers {
peerConfig := ss.syncConfig.peers[id]
for _, block := range peerConfig.newBlocks {
ph := block.ParentHash()
if bytes.Compare(ph[:], parentHash[:]) == 0 {
candidateBlocks = append(candidateBlocks, block)
break
}
}
}
ss.syncMux.Unlock()
if len(candidateBlocks) == 0 {
return nil
}
// Sort by blockHashes.
sort.Slice(candidateBlocks, func(i, j int) bool {
return CompareBlockByHash(candidateBlocks[i], candidateBlocks[j]) == -1
})
maxFirstID, maxCount := GetHowManyMaxConsensus(candidateBlocks)
Log.Debug("[SYNC] Find block with matching parenthash", "parentHash", parentHash, "hash", candidateBlocks[maxFirstID].Hash(), "maxCount", maxCount)
return candidateBlocks[maxFirstID]
}
func (ss *StateSync) getBlockFromOldBlocksByParentHash(parentHash common.Hash) *types.Block {
for _, block := range ss.commonBlocks {
ph := block.ParentHash()
if bytes.Compare(ph[:], parentHash[:]) == 0 {
return block
}
}
return nil
}
func (ss *StateSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block {
for _, block := range ss.lastMileBlocks {
ph := block.ParentHash()
if bytes.Compare(ph[:], parentHash[:]) == 0 {
return block
}
}
return nil
}
func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChain, worker *worker.Worker) bool {
Log.Info("[SYNC] Current Block", "blockHex", bc.CurrentBlock().Hash().Hex())
_, err := bc.InsertChain([]*types.Block{block})
if err != nil {
Log.Debug("Error adding new block to blockchain", "Error", err)
return false
}
Log.Info("[SYNC] new block added to blockchain", "blockHeight", bc.CurrentBlock().NumberU64(), "blockHex", bc.CurrentBlock().Hash().Hex(), "parentHex", bc.CurrentBlock().ParentHash().Hex())
ss.syncMux.Lock()
worker.UpdateCurrent()
ss.syncMux.Unlock()
return true
}
// generateNewState will construct most recent state from downloaded blocks
func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker) {
// update blocks created before node start sync
parentHash := bc.CurrentBlock().Hash()
for {
block := ss.getBlockFromOldBlocksByParentHash(parentHash)
if block == nil {
break
}
ok := ss.updateBlockAndStatus(block, bc, worker)
if !ok {
break
}
parentHash = block.Hash()
}
ss.syncMux.Lock()
ss.commonBlocks = make(map[int]*types.Block)
ss.syncMux.Unlock()
// update blocks after node start sync
parentHash = bc.CurrentBlock().Hash()
for {
block := ss.getMaxConsensusBlockFromParentHash(parentHash)
if block == nil {
break
}
ok := ss.updateBlockAndStatus(block, bc, worker)
if !ok {
break
}
parentHash = block.Hash()
}
ss.syncMux.Lock()
for id := range ss.syncConfig.peers {
ss.syncConfig.peers[id].newBlocks = []*types.Block{}
}
ss.syncMux.Unlock()
// update last mile blocks if any
parentHash = bc.CurrentBlock().Hash()
for {
block := ss.getBlockFromLastMileBlocksByParentHash(parentHash)
if block == nil {
break
}
ok := ss.updateBlockAndStatus(block, bc, worker)
if !ok {
break
}
parentHash = block.Hash()
}
}
// StartStateSync starts state sync.
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *core.BlockChain) bool {
// Creates sync config.
ss.CreateSyncConfig(peers)
// Makes connections to peers.
ss.MakeConnectionToPeers()
func (ss *StateSync) StartStateSync(startHash []byte, bc *core.BlockChain, worker *worker.Worker) {
ss.RegisterNodeInfo()
// Gets consensus hashes.
if !ss.GetConsensusHashes() {
return false
if !ss.GetConsensusHashes(startHash) {
Log.Debug("[SYNC] StartStateSync unable to reach consensus on ss.GetConsensusHashes")
return
}
Log.Debug("[SYNC] StartStateSync reach consensus on ss.GetConsensusHashes")
ss.generateStateSyncTaskQueue(bc)
// Download blocks.
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
}
return true
ss.generateNewState(bc, worker)
}
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte) error {
response := peerConfig.client.Register(peerHash)
if response == nil || response.Type == pb.DownloaderResponse_FAIL {
return ErrRegistrationFail
} else if response.Type == pb.DownloaderResponse_SUCCESS {
return nil
}
return ErrRegistrationFail
}
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
// return number of successfull registration
func (ss *StateSync) RegisterNodeInfo() int {
ss.CleanUpNilPeers()
registrationNumber := RegistrationNumber
Log.Debug("[SYNC] node registration to peers", "registrationNumber", registrationNumber, "activePeerNumber", ss.activePeerNumber)
peerID := utils.GetUniqueIDFromIPPort(ss.selfip, ss.selfport)
peerHash := make([]byte, 4)
binary.BigEndian.PutUint32(peerHash[:], peerID)
count := 0
for id := range ss.syncConfig.peers {
peerConfig := ss.syncConfig.peers[id]
if count >= registrationNumber {
break
}
if peerConfig.client == nil {
continue
}
err := peerConfig.registerToBroadcast(peerHash)
if err != nil {
Log.Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "peerHash", peerHash)
continue
}
Log.Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port)
count++
}
return count
}

@ -0,0 +1,12 @@
### Full state syncing
A node downloads all the missing blocks until it catches up with the block that is in the process of consensus.
### Node states
The states of a node have the following options:
NodeInit, NodeWaitToJoin, NodeNotInSync, NodeOffline, NodeReadyForConsensus, NodeDoingConsensus
When any node joins the network, it will join the shard and try to participate in the consensus process. It will assume its status is NodeReadyForConsensus until it finds it is not able to verify the new block. Then it will move its status into NodeNotInSync. After finish the syncing process, its status becomes NodeReadyForConsensus again. Simply speaking, most of the time, its status is jumping between these two states.
### Doing syncing
Syncing process consists of 3 parts: download the old blocks that have timestamps before state syncing beginning time; register to a few peers and accept new blocks that have timestampes after state syncing beginning time; catch the last mile blocks from consensus process when its latest block is only 1~2 blocks behind the current consensus block.

@ -165,11 +165,12 @@ func main() {
log.Debug("Client Join Shard", "leader", leader)
clientNode.GetHost().AddPeer(&leader)
go clientNode.JoinShard(leader)
clientNode.State = node.NodeReadyForConsensus
}
// wait for 1 seconds for client to send ping message to leader
time.Sleep(time.Second)
clientNode.StopPing <- struct{}{}
clientNode.State = node.NodeJoinedShard
clientNode.State = node.NodeReadyForConsensus
// Transaction generation process
time.Sleep(2 * time.Second) // wait for nodes to be ready

@ -97,6 +97,11 @@ type Consensus struct {
// Called when consensus on a new block is done
OnConsensusDone func(*types.Block)
// current consensus block to check if out of sync
ConsensusBlock chan *types.Block
// verified block to state sync broadcast
VerifiedNewBlock chan *types.Block
Log log.Logger
uniqueIDInstance *utils.UniqueValidatorID
@ -280,6 +285,7 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int {
consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
consensus.pubKeyLock.Unlock()
consensus.Log.Debug("[SYNC] new peer added", "pubKey", peer.PubKey, "ip", peer.IP, "port", peer.Port)
}
count++
}
@ -481,6 +487,19 @@ func (consensus *Consensus) GetNodeID() uint32 {
return consensus.nodeID
}
// GetPeerFromID will get peer from peerID, bool value in return true means success and false means fail
func (consensus *Consensus) GetPeerFromID(peerID uint32) (p2p.Peer, bool) {
v, ok := consensus.validators.Load(peerID)
if !ok {
return p2p.Peer{}, false
}
value, ok := v.(p2p.Peer)
if !ok {
return p2p.Peer{}, false
}
return value, true
}
// SendMessage sends message thru p2p host to peer.
func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) {
host.SendMessage(consensus.host, peer, message, nil)

@ -176,6 +176,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
if len((*commitments)) >= ((len(consensus.PublicKeys)*2)/3 + 1) {
shouldProcess = false
}
if shouldProcess {
point := crypto.Ed25519Curve.Point()
point.UnmarshalBinary(commitment)
@ -379,6 +380,12 @@ func (consensus *Consensus) processResponseMessage(message consensus_proto.Messa
copy(blockObj.Header().Bitmap[:], bitmap)
consensus.OnConsensusDone(&blockObj)
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
consensus.Log.Info("[SYNC] consensus verified block send to chan failed", "blockHash", blockObj.Hash())
}
consensus.reportMetrics(blockObj)
// Dump new block into level db.

@ -57,3 +57,20 @@ func TestRemovePeers(t *testing.T) {
consensus.DebugPrintPublicKeys()
}
}
func TestGetPeerFromID(t *testing.T) {
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"}
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"}
host := p2pimpl.NewHost(leader)
consensus := New(host, "0", []p2p.Peer{leader, validator}, leader)
leaderID := utils.GetUniqueIDFromIPPort(leader.IP, leader.Port)
validatorID := utils.GetUniqueIDFromIPPort(validator.IP, validator.Port)
l, _ := consensus.GetPeerFromID(leaderID)
v, _ := consensus.GetPeerFromID(validatorID)
if l.IP != leader.IP || l.Port != leader.Port {
t.Errorf("leader IP not equal")
}
if v.IP != validator.IP || v.Port != validator.Port {
t.Errorf("validator IP not equal")
}
}

@ -21,7 +21,6 @@ func (consensus *Consensus) ProcessMessageValidator(payload []byte) {
if err != nil {
consensus.Log.Error("Failed to unmarshal message payload.", "err", err, "consensus", consensus)
}
switch message.Type {
case consensus_proto.MessageType_ANNOUNCE:
consensus.processAnnounceMessage(message)
@ -74,6 +73,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
consensus.Log.Warn("Unparseable block header data", "error", err)
return
}
consensus.block = block
// Add block to received block cache
@ -81,7 +81,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
consensus.blocksReceived[consensusID] = &BlockConsensusStatus{block, consensus.state}
consensus.mutex.Unlock()
// Add attack model of IncorrectResponse.
// Add attack model of IncorrectResponse
if attack.GetInstance().IncorrectResponse() {
consensus.Log.Warn("IncorrectResponse attacked")
return
@ -243,6 +243,14 @@ func (consensus *Consensus) processChallengeMessage(message consensus_proto.Mess
}
consensus.Log.Info("Finished Response. Adding block to chain", "numTx", len(blockObj.Transactions()))
consensus.OnConsensusDone(&blockObj)
select {
case consensus.VerifiedNewBlock <- &blockObj:
default:
consensus.Log.Info("[SYNC] consensus verified block send to chan failed", "blockHash", blockObj.Hash())
continue
}
} else {
break
}
@ -298,7 +306,9 @@ func (consensus *Consensus) processCollectiveSigMessage(message consensus_proto.
// check consensus Id
if consensusID != consensus.consensusID {
// hack for new node state syncing
consensus.Log.Warn("Received message with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
consensus.consensusID = consensusID
return
}

@ -316,6 +316,26 @@ func (m *Mask) SetBit(i int, enable bool) error {
return nil
}
// GetPubKeyFromMask will return pubkeys which masked either zero or one depending on the flag
// it is used to show which signers are signed or not in the cosign message
func (m *Mask) GetPubKeyFromMask(flag bool) []kyber.Point {
pubKeys := []kyber.Point{}
for i := range m.publics {
byt := i >> 3
msk := byte(1) << uint(i&7)
if flag == true {
if (m.mask[byt] & msk) != 0 {
pubKeys = append(pubKeys, m.publics[i])
}
} else {
if (m.mask[byt] & msk) == 0 {
pubKeys = append(pubKeys, m.publics[i])
}
}
}
return pubKeys
}
// IndexEnabled checks whether the given index is enabled in the mask or not.
func (m *Mask) IndexEnabled(i int) (bool, error) {
if i >= len(m.publics) {

@ -3,6 +3,7 @@ package node
import (
"bytes"
"crypto/ecdsa"
"encoding/binary"
"encoding/gob"
"encoding/hex"
"fmt"
@ -11,7 +12,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@ -32,6 +32,7 @@ import (
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
@ -44,7 +45,7 @@ type State byte
const (
NodeInit State = iota // Node just started, before contacting BeaconChain
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeJoinedShard // Node joined Shard, ready for consensus
NodeNotInSync // Node out of sync, might be just joined Shard or offline for a period of time
NodeOffline // Node is offline
NodeReadyForConsensus // Node is ready for doing consensus
NodeDoingConsensus // Node is already doing consensus
@ -57,8 +58,8 @@ func (state State) String() string {
return "NodeInit"
case NodeWaitToJoin:
return "NodeWaitToJoin"
case NodeJoinedShard:
return "NodeJoinedShard"
case NodeNotInSync:
return "NodeNotInSync"
case NodeOffline:
return "NodeOffline"
case NodeReadyForConsensus:
@ -73,18 +74,25 @@ func (state State) String() string {
// Constants related to doing syncing.
const (
NotDoingSyncing uint32 = iota
DoingSyncing
lastMileThreshold = 4
inSyncThreshold = 2
)
const (
syncingPortDifference = 3000
waitBeforeJoinShard = time.Second * 3
timeOutToJoinShard = time.Minute * 10
waitBeforeJoinShard = time.Second * 3
timeOutToJoinShard = time.Minute * 10
// ClientServicePortDiff is the positive port diff for client service
ClientServicePortDiff = 5555
ClientServicePortDiff = 5555
maxBroadcastNodes = 10 // broadcast at most maxBroadcastNodes peers that need in sync
broadcastTimeout int64 = 3 * 60 * 1000000000 // 3 mins
)
// use to push new block to outofsync node
type syncConfig struct {
timestamp int64
client *downloader.Client
}
// NetworkNode ...
type NetworkNode struct {
SelfPeer p2p.Peer
@ -109,8 +117,9 @@ type Node struct {
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State State // State of the Node
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State State // State of the Node
stateMutex sync.Mutex // mutex for change node state
TxPool *core.TxPool
Worker *worker.Worker
@ -119,9 +128,9 @@ type Node struct {
clientServer *clientService.Server
// Syncing component.
downloaderServer *downloader.Server
stateSync *syncing.StateSync
syncingState uint32
downloaderServer *downloader.Server
stateSync *syncing.StateSync
peerRegistrationRecord map[uint32]*syncConfig // record registration time (unixtime) of peers begin in syncing
// The p2p host used to send/receive p2p messages
host host.Host
@ -263,8 +272,10 @@ func New(host host.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
}
// Setup initial state of syncing.
node.syncingState = NotDoingSyncing
node.StopPing = make(chan struct{})
node.Consensus.ConsensusBlock = make(chan *types.Block)
node.Consensus.VerifiedNewBlock = make(chan *types.Block)
node.peerRegistrationRecord = make(map[uint32]*syncConfig)
node.OfflinePeers = make(chan p2p.Peer)
go node.RemovePeersHandler()
@ -272,23 +283,58 @@ func New(host host.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
return &node
}
// DoSyncing starts syncing.
func (node *Node) DoSyncing() {
// If this node is currently doing sync, another call for syncing will be returned immediately.
if !atomic.CompareAndSwapUint32(&node.syncingState, NotDoingSyncing, DoingSyncing) {
return
// IsOutOfSync checks whether the node is out of sync by comparing latest block with consensus block
func (node *Node) IsOutOfSync(consensusBlock *types.Block) bool {
myHeight := node.blockchain.CurrentBlock().NumberU64()
newHeight := consensusBlock.NumberU64()
node.log.Debug("[SYNC]", "myHeight", myHeight, "newHeight", newHeight)
if newHeight-myHeight <= inSyncThreshold {
return false
}
defer atomic.StoreUint32(&node.syncingState, NotDoingSyncing)
if node.stateSync == nil {
node.stateSync = syncing.GetStateSync()
// cache latest blocks for last mile catch up
if newHeight-myHeight <= lastMileThreshold && node.stateSync != nil {
node.stateSync.AddLastMileBlock(consensusBlock)
}
if node.stateSync.StartStateSync(node.GetSyncingPeers(), node.blockchain) {
node.log.Debug("DoSyncing: successfully sync")
if node.State == NodeJoinedShard {
return true
}
// DoSyncing wait for check status and starts syncing if out of sync
func (node *Node) DoSyncing() {
for {
select {
// in current implementation logic, timeout means in sync
case <-time.After(5 * time.Second):
node.stateMutex.Lock()
node.log.Info("[SYNC] Node is now IN SYNC!")
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
continue
case consensusBlock := <-node.Consensus.ConsensusBlock:
if !node.IsOutOfSync(consensusBlock) {
if node.State == NodeNotInSync {
node.log.Info("[SYNC] Node is now IN SYNC!")
node.stateSync.CloseConnections()
node.stateSync = nil
}
node.stateMutex.Lock()
node.State = NodeReadyForConsensus
node.stateMutex.Unlock()
continue
} else {
node.log.Debug("[SYNC] node is out of sync")
node.stateMutex.Lock()
node.State = NodeNotInSync
node.stateMutex.Unlock()
}
if node.stateSync == nil {
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port)
node.stateSync.CreateSyncConfig(node.GetSyncingPeers())
node.stateSync.MakeConnectionToPeers()
}
startHash := node.blockchain.CurrentBlock().Hash()
node.stateSync.StartStateSync(startHash[:], node.blockchain, node.Worker)
}
} else {
node.log.Debug("DoSyncing: failed to sync")
}
}
@ -318,28 +364,29 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
// GetSyncingPort returns the syncing port.
func GetSyncingPort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil {
return fmt.Sprintf("%d", port-syncingPortDifference)
return fmt.Sprintf("%d", port-syncing.SyncingPortDifference)
}
os.Exit(1)
return ""
}
// GetSyncingPeers returns list of peers.
// Right now, the list length is only 1 for testing.
func (node *Node) GetSyncingPeers() []p2p.Peer {
res := []p2p.Peer{}
node.Neighbors.Range(func(k, v interface{}) bool {
node.log.Debug("GetSyncingPeers-Range: ", "k", k, "v", v)
if len(res) == 0 {
res = append(res, v.(p2p.Peer))
}
res = append(res, v.(p2p.Peer))
return true
})
removeID := -1
for i := range res {
if res[i].Port == node.SelfPeer.Port {
removeID = i
}
res[i].Port = GetSyncingPort(res[i].Port)
}
node.log.Debug("GetSyncingPeers: ", "res", res)
res = append(res[:removeID], res[removeID+1:]...)
node.log.Debug("GetSyncingPeers: ", "res", res, "self", node.SelfPeer)
return res
}
@ -436,6 +483,8 @@ func (node *Node) StartClientServer() {
func (node *Node) SupportSyncing() {
node.InitSyncingServer()
node.StartSyncingServer()
go node.DoSyncing()
go node.SendNewBlockToUnsync()
}
// InitSyncingServer starts downloader server.
@ -453,26 +502,119 @@ func (node *Node) StartSyncingServer() {
// CalculateResponse implements DownloadInterface on Node object.
func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) {
response := &downloader_pb.DownloaderResponse{}
if request.Type == downloader_pb.DownloaderRequest_HEADER {
switch request.Type {
case downloader_pb.DownloaderRequest_HEADER:
node.log.Debug("[SYNC] CalculateResponse DownloaderRequest_HEADER", "request.BlockHash", request.BlockHash)
var startHeaderHash []byte
if request.BlockHash == nil {
tmp := node.blockchain.Genesis().Hash()
startHeaderHash = tmp[:]
} else {
startHeaderHash = request.BlockHash
}
for block := node.blockchain.CurrentBlock(); block != nil; block = node.blockchain.GetBlockByHash(block.Header().ParentHash) {
blockHash := block.Hash()
if bytes.Compare(blockHash[:], startHeaderHash) == 0 {
break
}
response.Payload = append(response.Payload, blockHash[:])
}
} else {
case downloader_pb.DownloaderRequest_BLOCK:
for _, bytes := range request.Hashes {
var hash common.Hash
hash.SetBytes(bytes)
block := node.blockchain.GetBlockByHash(hash)
encodedBlock, err := rlp.EncodeToBytes(block)
if err != nil {
if err == nil {
response.Payload = append(response.Payload, encodedBlock)
}
}
case downloader_pb.DownloaderRequest_NEWBLOCK:
if node.State != NodeNotInSync {
node.log.Debug("[SYNC] new block received, but state is", "state", node.State.String())
response.Type = downloader_pb.DownloaderResponse_INSYNC
return response, nil
}
var blockObj types.Block
err := rlp.DecodeBytes(request.BlockHash, &blockObj)
if err != nil {
node.log.Warn("[SYNC] unable to decode received new block")
return response, err
}
node.stateSync.AddNewBlock(request.PeerHash, &blockObj)
case downloader_pb.DownloaderRequest_REGISTER:
peerID := binary.BigEndian.Uint32(request.PeerHash)
if _, ok := node.peerRegistrationRecord[peerID]; ok {
response.Type = downloader_pb.DownloaderResponse_FAIL
return response, nil
} else if len(node.peerRegistrationRecord) >= maxBroadcastNodes {
response.Type = downloader_pb.DownloaderResponse_FAIL
return response, nil
} else {
peer, ok := node.Consensus.GetPeerFromID(peerID)
if !ok {
node.log.Warn("[SYNC] unable to get peer from peerID", "peerID", peerID)
}
client := downloader.ClientSetup(peer.IP, GetSyncingPort(peer.Port))
if client == nil {
node.log.Warn("[SYNC] unable to setup client")
return response, nil
}
node.log.Debug("[SYNC] client setup correctly", "client", client)
config := &syncConfig{timestamp: time.Now().UnixNano(), client: client}
node.stateMutex.Lock()
node.peerRegistrationRecord[peerID] = config
node.stateMutex.Unlock()
node.log.Debug("[SYNC] register peerID success", "peerID", peerID)
response.Type = downloader_pb.DownloaderResponse_SUCCESS
}
case downloader_pb.DownloaderRequest_REGISTERTIMEOUT:
if node.State == NodeNotInSync {
count := node.stateSync.RegisterNodeInfo()
node.log.Debug("[SYNC] extra node registered", "number", count)
}
}
return response, nil
}
// SendNewBlockToUnsync send latest verified block to unsync, registered nodes
func (node *Node) SendNewBlockToUnsync() {
for {
block := <-node.Consensus.VerifiedNewBlock
blockHash, err := rlp.EncodeToBytes(block)
if err != nil {
node.log.Warn("[SYNC] unable to encode block to hashes")
continue
}
// really need to have a unique id independent of ip/port
selfPeerID := utils.GetUniqueIDFromIPPort(node.SelfPeer.IP, node.SelfPeer.Port)
node.log.Debug("[SYNC] peerRegistration Record", "peerID", selfPeerID, "number", len(node.peerRegistrationRecord))
for peerID, config := range node.peerRegistrationRecord {
elapseTime := time.Now().UnixNano() - config.timestamp
if elapseTime > broadcastTimeout {
node.log.Warn("[SYNC] SendNewBlockToUnsync to peer timeout", "peerID", peerID)
// send last time and delete
config.client.PushNewBlock(selfPeerID, blockHash, true)
node.stateMutex.Lock()
node.peerRegistrationRecord[peerID].client.Close()
delete(node.peerRegistrationRecord, peerID)
node.stateMutex.Unlock()
continue
}
response := config.client.PushNewBlock(selfPeerID, blockHash, false)
if response.Type == downloader_pb.DownloaderResponse_INSYNC {
node.stateMutex.Lock()
node.peerRegistrationRecord[peerID].client.Close()
delete(node.peerRegistrationRecord, peerID)
node.stateMutex.Unlock()
}
}
}
}
// RemovePeersHandler is a goroutine to wait on the OfflinePeers channel
// and remove the peers from validator list
func (node *Node) RemovePeersHandler() {

@ -253,8 +253,17 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool {
err := node.blockchain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err != nil {
node.log.Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0])
// send consensus block to state syncing
select {
case node.Consensus.ConsensusBlock <- newBlock:
default:
node.log.Warn("consensus block unable to sent to state sync", "height", newBlock.NumberU64(), "blockHash", newBlock.Hash().Hex())
}
return false
}
return true
}
@ -378,7 +387,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
}
if node.State == NodeWaitToJoin {
node.State = NodeJoinedShard
node.State = NodeReadyForConsensus
// Notify JoinShard to stop sending Ping messages
if node.StopPing != nil {
node.StopPing <- struct{}{}

@ -0,0 +1,12 @@
127.0.0.1 9000 leader 0
127.0.0.1 9001 validator 0
127.0.0.1 9002 validator 0
127.0.0.1 9003 validator 0
127.0.0.1 9004 validator 0
127.0.0.1 9006 newnode 0
127.0.0.1 9007 newnode 0
127.0.0.1 9008 newnode 0
127.0.0.1 9009 newnode 0
127.0.0.1 9010 newnode 0
127.0.0.1 9011 newnode 0
127.0.0.1 19999 client 0

@ -54,6 +54,7 @@ USAGE: $ME [OPTIONS] config_file_name
-s shards number of shards (default: $SHARDS)
-k nodeport kill the node with specified port number (default: $KILLPORT)
-n dryrun mode (default: $DRYRUN)
-S enable sync test (default: $SYNC)
This script will build all the binaries and start harmony and txgen based on the configuration file.
@ -72,9 +73,10 @@ DURATION=90
MIN=5
SHARDS=2
KILLPORT=9004
SYNC=false
DRYRUN=
while getopts "hdtD:m:s:k:n" option; do
while getopts "hdtD:m:s:k:nS" option; do
case $option in
h) usage ;;
d) DB='-db_supported' ;;
@ -84,6 +86,7 @@ while getopts "hdtD:m:s:k:n" option; do
s) SHARDS=$OPTARG ;;
k) KILLPORT=$OPTARG ;;
n) DRYRUN=echo ;;
S) SYNC=true ;;
esac
done
@ -94,6 +97,10 @@ if [ -z "$config" ]; then
usage
fi
if [ "$SYNC" == "true" ]; then
DURATION=300
SHARDS=1
fi
# Kill nodes if any
cleanup
@ -130,23 +137,41 @@ fi
while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardID <<< $line
#echo $ip $port $mode
if [ "$mode" != "client" ]; then
if [[ "$mode" == "leader" || "$mode" == "validator" ]]; then
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT 2>&1 | tee -a $LOG_FILE &
sleep 0.5
fi
done < $config
# Emulate node offline
(sleep 45; killnode $KILLPORT) &
if [ "$SYNC" == "false" ]; then
(sleep 45; killnode $KILLPORT) &
fi
if [ "$TXGEN" == "true" ]; then
echo "launching txgen ..."
line=$(grep client $config)
IFS=' ' read ip port mode shardID <<< $line
if [ "$mode" == "client" ]; then
$DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT 2>&1 | tee -a $LOG_FILE
$DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT 2>&1 | tee -a $LOG_FILE &
fi
fi
# sleep enough time before consensus reached then add new node for state syncing
if [ "$SYNC" == "true" ]; then
sleep 45
echo "launching new node..."
while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardID <<< $line
echo launching newnode $ip $port $mode
if [ "$mode" == "newnode" ]; then
$DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -min_peers $MIN $HMY_OPT 2>&1 | tee -a $LOG_FILE &
sleep 25
fi
done < $config
fi
wait
cleanup
check_result

Loading…
Cancel
Save