commit
ae94397fe0
@ -1,97 +0,0 @@ |
||||
package resharding |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/rpc" |
||||
msg_pb "github.com/harmony-one/harmony/api/proto/message" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
) |
||||
|
||||
// Constants for resharding service.
|
||||
const ( |
||||
ReshardingCheckTime = time.Second |
||||
) |
||||
|
||||
// Service is the role conversion service.
|
||||
type Service struct { |
||||
stopChan chan struct{} |
||||
stoppedChan chan struct{} |
||||
messageChan chan *msg_pb.Message |
||||
beaconChain *core.BlockChain |
||||
} |
||||
|
||||
// New returns role conversion service.
|
||||
func New(beaconChain *core.BlockChain) *Service { |
||||
return &Service{beaconChain: beaconChain} |
||||
} |
||||
|
||||
// StartService starts role conversion service.
|
||||
func (s *Service) StartService() { |
||||
s.stopChan = make(chan struct{}) |
||||
s.stoppedChan = make(chan struct{}) |
||||
|
||||
s.Init() |
||||
s.Run(s.stopChan, s.stoppedChan) |
||||
} |
||||
|
||||
// Init initializes role conversion service.
|
||||
func (s *Service) Init() { |
||||
} |
||||
|
||||
// Run runs role conversion.
|
||||
func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { |
||||
go func() { |
||||
defer close(stoppedChan) |
||||
for { |
||||
select { |
||||
default: |
||||
utils.Logger().Info().Msg("Running role conversion") |
||||
// TODO: Write some logic here.
|
||||
s.DoService() |
||||
case <-stopChan: |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
|
||||
// DoService does role conversion.
|
||||
func (s *Service) DoService() { |
||||
tick := time.NewTicker(ReshardingCheckTime) |
||||
// Get current shard state hash.
|
||||
currentShardStateHash := s.beaconChain.CurrentBlock().Header().ShardStateHash() |
||||
for { |
||||
select { |
||||
case <-tick.C: |
||||
LatestShardStateHash := s.beaconChain.CurrentBlock().Header().ShardStateHash() |
||||
if currentShardStateHash != LatestShardStateHash { |
||||
// TODO(minhdoan): Add resharding logic later after modifying the resharding func as it current doesn't calculate the role (leader/validator)
|
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// StopService stops role conversion service.
|
||||
func (s *Service) StopService() { |
||||
utils.Logger().Info().Msg("Stopping role conversion service") |
||||
s.stopChan <- struct{}{} |
||||
<-s.stoppedChan |
||||
utils.Logger().Info().Msg("Role conversion stopped") |
||||
} |
||||
|
||||
// NotifyService notify service
|
||||
func (s *Service) NotifyService(params map[string]interface{}) { |
||||
return |
||||
} |
||||
|
||||
// SetMessageChan sets up message channel to service.
|
||||
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { |
||||
s.messageChan = messageChan |
||||
} |
||||
|
||||
// APIs for the services.
|
||||
func (s *Service) APIs() []rpc.API { |
||||
return nil |
||||
} |
@ -1,16 +1,421 @@ |
||||
package v3 |
||||
|
||||
import ( |
||||
v2 "github.com/harmony-one/harmony/block/v2" |
||||
"io" |
||||
"math/big" |
||||
"unsafe" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/common/hexutil" |
||||
ethtypes "github.com/ethereum/go-ethereum/core/types" |
||||
"github.com/ethereum/go-ethereum/rlp" |
||||
"github.com/rs/zerolog" |
||||
|
||||
blockif "github.com/harmony-one/harmony/block/interface" |
||||
"github.com/harmony-one/harmony/crypto/hash" |
||||
"github.com/harmony-one/harmony/shard" |
||||
) |
||||
|
||||
// Header v3 has the same structure as v2 header
|
||||
// It is used to identify the body v3 which including staking txs
|
||||
// Header is the V3 block header.
|
||||
// V3 block header is exactly the same
|
||||
// we copy the code instead of embedded v2 header into v3
|
||||
// when we do type checking in NewBodyForMatchingHeader
|
||||
// the embedded structure will return v2 header type instead of v3 type
|
||||
type Header struct { |
||||
v2.Header |
||||
fields headerFields |
||||
} |
||||
|
||||
// EncodeRLP encodes the header fields into RLP format.
|
||||
func (h *Header) EncodeRLP(w io.Writer) error { |
||||
return rlp.Encode(w, &h.fields) |
||||
} |
||||
|
||||
// DecodeRLP decodes the given RLP decode stream into the header fields.
|
||||
func (h *Header) DecodeRLP(s *rlp.Stream) error { |
||||
return s.Decode(&h.fields) |
||||
} |
||||
|
||||
// NewHeader creates a new header object.
|
||||
func NewHeader() *Header { |
||||
return &Header{*v2.NewHeader()} |
||||
return &Header{headerFields{ |
||||
Number: new(big.Int), |
||||
Time: new(big.Int), |
||||
ViewID: new(big.Int), |
||||
Epoch: new(big.Int), |
||||
}} |
||||
} |
||||
|
||||
type headerFields struct { |
||||
ParentHash common.Hash `json:"parentHash" gencodec:"required"` |
||||
Coinbase common.Address `json:"miner" gencodec:"required"` |
||||
Root common.Hash `json:"stateRoot" gencodec:"required"` |
||||
TxHash common.Hash `json:"transactionsRoot" gencodec:"required"` |
||||
ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"` |
||||
OutgoingReceiptHash common.Hash `json:"outgoingReceiptsRoot" gencodec:"required"` |
||||
IncomingReceiptHash common.Hash `json:"incomingReceiptsRoot" gencodec:"required"` |
||||
Bloom ethtypes.Bloom `json:"logsBloom" gencodec:"required"` |
||||
Number *big.Int `json:"number" gencodec:"required"` |
||||
GasLimit uint64 `json:"gasLimit" gencodec:"required"` |
||||
GasUsed uint64 `json:"gasUsed" gencodec:"required"` |
||||
Time *big.Int `json:"timestamp" gencodec:"required"` |
||||
Extra []byte `json:"extraData" gencodec:"required"` |
||||
MixDigest common.Hash `json:"mixHash" gencodec:"required"` |
||||
// Additional Fields
|
||||
ViewID *big.Int `json:"viewID" gencodec:"required"` |
||||
Epoch *big.Int `json:"epoch" gencodec:"required"` |
||||
ShardID uint32 `json:"shardID" gencodec:"required"` |
||||
LastCommitSignature [96]byte `json:"lastCommitSignature" gencodec:"required"` |
||||
LastCommitBitmap []byte `json:"lastCommitBitmap" gencodec:"required"` // Contains which validator signed
|
||||
ShardStateHash common.Hash `json:"shardStateRoot"` |
||||
Vrf []byte `json:"vrf"` |
||||
Vdf []byte `json:"vdf"` |
||||
ShardState []byte `json:"shardState"` |
||||
CrossLinks []byte `json:"crossLink"` |
||||
} |
||||
|
||||
// ParentHash is the header hash of the parent block. For the genesis block
|
||||
// which has no parent by definition, this field is zeroed out.
|
||||
func (h *Header) ParentHash() common.Hash { |
||||
return h.fields.ParentHash |
||||
} |
||||
|
||||
// SetParentHash sets the parent hash field.
|
||||
func (h *Header) SetParentHash(newParentHash common.Hash) { |
||||
h.fields.ParentHash = newParentHash |
||||
} |
||||
|
||||
// Coinbase is the address of the node that proposed this block and all
|
||||
// transactions in it.
|
||||
func (h *Header) Coinbase() common.Address { |
||||
return h.fields.Coinbase |
||||
} |
||||
|
||||
// SetCoinbase sets the coinbase address field.
|
||||
func (h *Header) SetCoinbase(newCoinbase common.Address) { |
||||
h.fields.Coinbase = newCoinbase |
||||
} |
||||
|
||||
// Root is the state (account) trie root hash.
|
||||
func (h *Header) Root() common.Hash { |
||||
return h.fields.Root |
||||
} |
||||
|
||||
// SetRoot sets the state trie root hash field.
|
||||
func (h *Header) SetRoot(newRoot common.Hash) { |
||||
h.fields.Root = newRoot |
||||
} |
||||
|
||||
// TxHash is the transaction trie root hash.
|
||||
func (h *Header) TxHash() common.Hash { |
||||
return h.fields.TxHash |
||||
} |
||||
|
||||
// SetTxHash sets the transaction trie root hash field.
|
||||
func (h *Header) SetTxHash(newTxHash common.Hash) { |
||||
h.fields.TxHash = newTxHash |
||||
} |
||||
|
||||
// ReceiptHash is the same-shard transaction receipt trie hash.
|
||||
func (h *Header) ReceiptHash() common.Hash { |
||||
return h.fields.ReceiptHash |
||||
} |
||||
|
||||
// SetReceiptHash sets the same-shard transaction receipt trie hash.
|
||||
func (h *Header) SetReceiptHash(newReceiptHash common.Hash) { |
||||
h.fields.ReceiptHash = newReceiptHash |
||||
} |
||||
|
||||
// OutgoingReceiptHash is the egress transaction receipt trie hash.
|
||||
func (h *Header) OutgoingReceiptHash() common.Hash { |
||||
return h.fields.OutgoingReceiptHash |
||||
} |
||||
|
||||
// SetOutgoingReceiptHash sets the egress transaction receipt trie hash.
|
||||
func (h *Header) SetOutgoingReceiptHash(newOutgoingReceiptHash common.Hash) { |
||||
h.fields.OutgoingReceiptHash = newOutgoingReceiptHash |
||||
} |
||||
|
||||
// IncomingReceiptHash is the ingress transaction receipt trie hash.
|
||||
func (h *Header) IncomingReceiptHash() common.Hash { |
||||
return h.fields.IncomingReceiptHash |
||||
} |
||||
|
||||
// SetIncomingReceiptHash sets the ingress transaction receipt trie hash.
|
||||
func (h *Header) SetIncomingReceiptHash(newIncomingReceiptHash common.Hash) { |
||||
h.fields.IncomingReceiptHash = newIncomingReceiptHash |
||||
} |
||||
|
||||
// Bloom is the Bloom filter that indexes accounts and topics logged by smart
|
||||
// contract transactions (executions) in this block.
|
||||
func (h *Header) Bloom() ethtypes.Bloom { |
||||
return h.fields.Bloom |
||||
} |
||||
|
||||
// SetBloom sets the smart contract log Bloom filter for this block.
|
||||
func (h *Header) SetBloom(newBloom ethtypes.Bloom) { |
||||
h.fields.Bloom = newBloom |
||||
} |
||||
|
||||
// Number is the block number.
|
||||
//
|
||||
// The returned instance is a copy; the caller may do anything with it.
|
||||
func (h *Header) Number() *big.Int { |
||||
return new(big.Int).Set(h.fields.Number) |
||||
} |
||||
|
||||
// SetNumber sets the block number.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetNumber(newNumber *big.Int) { |
||||
h.fields.Number = new(big.Int).Set(newNumber) |
||||
} |
||||
|
||||
// GasLimit is the gas limit for transactions in this block.
|
||||
func (h *Header) GasLimit() uint64 { |
||||
return h.fields.GasLimit |
||||
} |
||||
|
||||
// SetGasLimit sets the gas limit for transactions in this block.
|
||||
func (h *Header) SetGasLimit(newGasLimit uint64) { |
||||
h.fields.GasLimit = newGasLimit |
||||
} |
||||
|
||||
// GasUsed is the amount of gas used by transactions in this block.
|
||||
func (h *Header) GasUsed() uint64 { |
||||
return h.fields.GasUsed |
||||
} |
||||
|
||||
// SetGasUsed sets the amount of gas used by transactions in this block.
|
||||
func (h *Header) SetGasUsed(newGasUsed uint64) { |
||||
h.fields.GasUsed = newGasUsed |
||||
} |
||||
|
||||
// Time is the UNIX timestamp of this block.
|
||||
//
|
||||
// The returned instance is a copy; the caller may do anything with it.
|
||||
func (h *Header) Time() *big.Int { |
||||
return new(big.Int).Set(h.fields.Time) |
||||
} |
||||
|
||||
// SetTime sets the UNIX timestamp of this block.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetTime(newTime *big.Int) { |
||||
h.fields.Time = new(big.Int).Set(newTime) |
||||
} |
||||
|
||||
// Extra is the extra data field of this block.
|
||||
//
|
||||
// The returned slice is a copy; the caller may do anything with it.
|
||||
func (h *Header) Extra() []byte { |
||||
return append(h.fields.Extra[:0:0], h.fields.Extra...) |
||||
} |
||||
|
||||
// SetExtra sets the extra data field of this block.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetExtra(newExtra []byte) { |
||||
h.fields.Extra = append(newExtra[:0:0], newExtra...) |
||||
} |
||||
|
||||
// MixDigest is the mixhash.
|
||||
//
|
||||
// This field is a remnant from Ethereum, and Harmony does not use it and always
|
||||
// zeroes it out.
|
||||
func (h *Header) MixDigest() common.Hash { |
||||
return h.fields.MixDigest |
||||
} |
||||
|
||||
// SetMixDigest sets the mixhash of this block.
|
||||
func (h *Header) SetMixDigest(newMixDigest common.Hash) { |
||||
h.fields.MixDigest = newMixDigest |
||||
} |
||||
|
||||
// ViewID is the ID of the view in which this block was originally proposed.
|
||||
//
|
||||
// It normally increases by one for each subsequent block, or by more than one
|
||||
// if one or more PBFT/FBFT view changes have occurred.
|
||||
//
|
||||
// The returned instance is a copy; the caller may do anything with it.
|
||||
func (h *Header) ViewID() *big.Int { |
||||
return new(big.Int).Set(h.fields.ViewID) |
||||
} |
||||
|
||||
// SetViewID sets the view ID in which the block was originally proposed.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetViewID(newViewID *big.Int) { |
||||
h.fields.ViewID = new(big.Int).Set(newViewID) |
||||
} |
||||
|
||||
// Epoch is the epoch number of this block.
|
||||
//
|
||||
// The returned instance is a copy; the caller may do anything with it.
|
||||
func (h *Header) Epoch() *big.Int { |
||||
return new(big.Int).Set(h.fields.Epoch) |
||||
} |
||||
|
||||
// SetEpoch sets the epoch number of this block.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetEpoch(newEpoch *big.Int) { |
||||
h.fields.Epoch = new(big.Int).Set(newEpoch) |
||||
} |
||||
|
||||
// ShardID is the shard ID to which this block belongs.
|
||||
func (h *Header) ShardID() uint32 { |
||||
return h.fields.ShardID |
||||
} |
||||
|
||||
// SetShardID sets the shard ID to which this block belongs.
|
||||
func (h *Header) SetShardID(newShardID uint32) { |
||||
h.fields.ShardID = newShardID |
||||
} |
||||
|
||||
// LastCommitSignature is the FBFT commit group signature for the last block.
|
||||
func (h *Header) LastCommitSignature() [96]byte { |
||||
return h.fields.LastCommitSignature |
||||
} |
||||
|
||||
// SetLastCommitSignature sets the FBFT commit group signature for the last
|
||||
// block.
|
||||
func (h *Header) SetLastCommitSignature(newLastCommitSignature [96]byte) { |
||||
h.fields.LastCommitSignature = newLastCommitSignature |
||||
} |
||||
|
||||
// LastCommitBitmap is the signatory bitmap of the previous block. Bit
|
||||
// positions index into committee member array.
|
||||
//
|
||||
// The returned slice is a copy; the caller may do anything with it.
|
||||
func (h *Header) LastCommitBitmap() []byte { |
||||
return append(h.fields.LastCommitBitmap[:0:0], h.fields.LastCommitBitmap...) |
||||
} |
||||
|
||||
// SetLastCommitBitmap sets the signatory bitmap of the previous block.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetLastCommitBitmap(newLastCommitBitmap []byte) { |
||||
h.fields.LastCommitBitmap = append(newLastCommitBitmap[:0:0], newLastCommitBitmap...) |
||||
} |
||||
|
||||
// ShardStateHash is the shard state hash.
|
||||
func (h *Header) ShardStateHash() common.Hash { |
||||
return h.fields.ShardStateHash |
||||
} |
||||
|
||||
// SetShardStateHash sets the shard state hash.
|
||||
func (h *Header) SetShardStateHash(newShardStateHash common.Hash) { |
||||
h.fields.ShardStateHash = newShardStateHash |
||||
} |
||||
|
||||
// Vrf is the output of the VRF for the epoch.
|
||||
//
|
||||
// The returned slice is a copy; the caller may do anything with it.
|
||||
func (h *Header) Vrf() []byte { |
||||
return append(h.fields.Vrf[:0:0], h.fields.Vrf...) |
||||
} |
||||
|
||||
// SetVrf sets the output of the VRF for the epoch.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetVrf(newVrf []byte) { |
||||
h.fields.Vrf = append(newVrf[:0:0], newVrf...) |
||||
} |
||||
|
||||
// Vdf is the output of the VDF for the epoch.
|
||||
//
|
||||
// The returned slice is a copy; the caller may do anything with it.
|
||||
func (h *Header) Vdf() []byte { |
||||
return append(h.fields.Vdf[:0:0], h.fields.Vdf...) |
||||
} |
||||
|
||||
// SetVdf sets the output of the VDF for the epoch.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetVdf(newVdf []byte) { |
||||
h.fields.Vdf = append(newVdf[:0:0], newVdf...) |
||||
} |
||||
|
||||
// ShardState is the RLP-encoded form of shard state (list of committees) for
|
||||
// the next epoch.
|
||||
//
|
||||
// The returned slice is a copy; the caller may do anything with it.
|
||||
func (h *Header) ShardState() []byte { |
||||
return append(h.fields.ShardState[:0:0], h.fields.ShardState...) |
||||
} |
||||
|
||||
// SetShardState sets the RLP-encoded form of shard state
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetShardState(newShardState []byte) { |
||||
h.fields.ShardState = append(newShardState[:0:0], newShardState...) |
||||
} |
||||
|
||||
// CrossLinks is the RLP-encoded form of non-beacon block headers chosen to be
|
||||
// canonical by the beacon committee. This field is present only on beacon
|
||||
// chain block headers.
|
||||
//
|
||||
// The returned slice is a copy; the caller may do anything with it.
|
||||
func (h *Header) CrossLinks() []byte { |
||||
return append(h.fields.CrossLinks[:0:0], h.fields.CrossLinks...) |
||||
} |
||||
|
||||
// SetCrossLinks sets the RLP-encoded form of non-beacon block headers chosen to
|
||||
// be canonical by the beacon committee.
|
||||
//
|
||||
// It stores a copy; the caller may freely modify the original.
|
||||
func (h *Header) SetCrossLinks(newCrossLinks []byte) { |
||||
h.fields.CrossLinks = append(newCrossLinks[:0:0], newCrossLinks...) |
||||
} |
||||
|
||||
// field type overrides for gencodec
|
||||
type headerMarshaling struct { |
||||
Difficulty *hexutil.Big |
||||
Number *hexutil.Big |
||||
GasLimit hexutil.Uint64 |
||||
GasUsed hexutil.Uint64 |
||||
Time *hexutil.Big |
||||
Extra hexutil.Bytes |
||||
Hash common.Hash `json:"hash"` // adds call to Hash() in MarshalJSON
|
||||
} |
||||
|
||||
// Hash returns the block hash of the header, which is simply the keccak256 hash of its
|
||||
// RLP encoding.
|
||||
func (h *Header) Hash() common.Hash { |
||||
return hash.FromRLP(h) |
||||
} |
||||
|
||||
// Size returns the approximate memory used by all internal contents. It is used
|
||||
// to approximate and limit the memory consumption of various caches.
|
||||
func (h *Header) Size() common.StorageSize { |
||||
// TODO: update with new fields
|
||||
return common.StorageSize(unsafe.Sizeof(*h)) + common.StorageSize(len(h.Extra())+(h.Number().BitLen()+h.Time().BitLen())/8) |
||||
} |
||||
|
||||
// Logger returns a sub-logger with block contexts added.
|
||||
func (h *Header) Logger(logger *zerolog.Logger) *zerolog.Logger { |
||||
nlogger := logger. |
||||
With(). |
||||
Str("blockHash", h.Hash().Hex()). |
||||
Uint32("blockShard", h.ShardID()). |
||||
Uint64("blockEpoch", h.Epoch().Uint64()). |
||||
Uint64("blockNumber", h.Number().Uint64()). |
||||
Logger() |
||||
return &nlogger |
||||
} |
||||
|
||||
// GetShardState returns the deserialized shard state object.
|
||||
func (h *Header) GetShardState() (shard.State, error) { |
||||
shardState := shard.State{} |
||||
err := rlp.DecodeBytes(h.ShardState(), &shardState) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return shardState, nil |
||||
} |
||||
|
||||
// Copy returns a copy of the given header.
|
||||
func (h *Header) Copy() blockif.Header { |
||||
cpy := *h |
||||
return &cpy |
||||
} |
||||
|
@ -1,329 +0,0 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"flag" |
||||
"fmt" |
||||
"math/big" |
||||
"math/rand" |
||||
"os" |
||||
"path" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/harmony-one/harmony/consensus" |
||||
"github.com/harmony-one/harmony/consensus/quorum" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/internal/shardchain" |
||||
|
||||
"github.com/ethereum/go-ethereum/crypto" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
bls2 "github.com/harmony-one/bls/ffi/go/bls" |
||||
"github.com/harmony-one/harmony/internal/params" |
||||
|
||||
"github.com/harmony-one/harmony/api/client" |
||||
proto_node "github.com/harmony-one/harmony/api/proto/node" |
||||
"github.com/harmony-one/harmony/common/denominations" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/harmony-one/harmony/crypto/bls" |
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
"github.com/harmony-one/harmony/internal/genesis" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/node" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
p2p_host "github.com/harmony-one/harmony/p2p/host" |
||||
"github.com/harmony-one/harmony/p2p/p2pimpl" |
||||
) |
||||
|
||||
var ( |
||||
version string |
||||
builtBy string |
||||
builtAt string |
||||
commit string |
||||
stateMutex sync.Mutex |
||||
) |
||||
|
||||
const ( |
||||
checkFrequency = 2 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions.
|
||||
) |
||||
|
||||
// Settings is the settings for TX generation. No Cross-Shard Support!
|
||||
type Settings struct { |
||||
NumOfAddress int |
||||
MaxNumTxsPerBatch int |
||||
} |
||||
|
||||
func printVersion(me string) { |
||||
fmt.Fprintf(os.Stderr, "Harmony (C) 2019. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt) |
||||
os.Exit(0) |
||||
} |
||||
|
||||
// The main entrance for the transaction generator program which simulate transactions and send to the network for
|
||||
// processing.
|
||||
|
||||
var ( |
||||
ip = flag.String("ip", "127.0.0.1", "IP of the node") |
||||
port = flag.String("port", "9999", "port of the node.") |
||||
numTxns = flag.Int("numTxns", 100, "number of transactions to send per message") |
||||
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution") |
||||
duration = flag.Int("duration", 30, "duration of the tx generation in second. If it's negative, the experiment runs forever.") |
||||
versionFlag = flag.Bool("version", false, "Output version info") |
||||
crossShardRatio = flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility
|
||||
shardIDFlag = flag.Int("shardID", 0, "The shardID the node belongs to.") |
||||
// Key file to store the private key
|
||||
keyFile = flag.String("key", "./.txgenkey", "the private key file of the txgen") |
||||
// logging verbosity
|
||||
verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)") |
||||
) |
||||
|
||||
func setUpTXGen() *node.Node { |
||||
nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
peerPubKey := bls.RandPrivateKey().GetPublicKey() |
||||
if peerPubKey == nil { |
||||
panic(fmt.Errorf("generate key error")) |
||||
} |
||||
shardID := *shardIDFlag |
||||
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey} |
||||
|
||||
// Nodes containing blockchain data to mirror the shards' data in the network
|
||||
|
||||
myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey) |
||||
if err != nil { |
||||
panic("unable to new host in txgen") |
||||
} |
||||
if err != nil { |
||||
fmt.Fprintf(os.Stderr, "Error :%v \n", err) |
||||
os.Exit(1) |
||||
} |
||||
decider := quorum.NewDecider(quorum.SuperMajorityVote) |
||||
consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil, decider) |
||||
chainDBFactory := &shardchain.MemDBFactory{} |
||||
txGen := node.New(myhost, consensusObj, chainDBFactory, false) //Changed it : no longer archival node.
|
||||
txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID)) |
||||
consensusObj.ChainReader = txGen.Blockchain() |
||||
genesisShardingConfig := core.ShardingSchedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch)) |
||||
startIdx := 0 |
||||
endIdx := startIdx + genesisShardingConfig.NumNodesPerShard() |
||||
pubs := []*bls2.PublicKey{} |
||||
for _, acct := range genesis.HarmonyAccounts[startIdx:endIdx] { |
||||
pub := &bls2.PublicKey{} |
||||
if err := pub.DeserializeHexStr(acct.BlsPublicKey); err != nil { |
||||
fmt.Printf("Can not deserialize public key. err: %v", err) |
||||
os.Exit(1) |
||||
} |
||||
pubs = append(pubs, pub) |
||||
} |
||||
consensusObj.Decider.UpdateParticipants(pubs) |
||||
txGen.NodeConfig.SetRole(nodeconfig.ClientNode) |
||||
if shardID == 0 { |
||||
txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon) |
||||
} else { |
||||
txGen.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID))) |
||||
} |
||||
|
||||
txGen.NodeConfig.SetIsClient(true) |
||||
|
||||
return txGen |
||||
} |
||||
|
||||
func main() { |
||||
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress") |
||||
flag.Parse() |
||||
if *versionFlag { |
||||
printVersion(os.Args[0]) |
||||
} |
||||
// Logging setup
|
||||
utils.SetLogContext(*port, *ip) |
||||
utils.SetLogVerbosity(log.Lvl(*verbosity)) |
||||
if len(utils.BootNodes) == 0 { |
||||
bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
utils.BootNodes = bootNodeAddrs |
||||
} |
||||
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
|
||||
setting := Settings{ |
||||
NumOfAddress: 10000, |
||||
MaxNumTxsPerBatch: *numTxns, |
||||
} |
||||
shardID := *shardIDFlag |
||||
utils.Logger().Debug(). |
||||
Int("cx ratio", *crossShardRatio). |
||||
Msg("Cross Shard Ratio Is Set But not used") |
||||
|
||||
// TODO(Richard): refactor this chuck to a single method
|
||||
// Setup a logger to stdout and log file.
|
||||
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder) |
||||
h := log.MultiHandler( |
||||
log.StreamHandler(os.Stdout, log.TerminalFormat(false)), |
||||
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
|
||||
) |
||||
log.Root().SetHandler(h) |
||||
txGen := setUpTXGen() |
||||
txGen.ServiceManagerSetup() |
||||
txGen.RunServices() |
||||
start := time.Now() |
||||
totalTime := float64(*duration) |
||||
utils.Logger().Debug(). |
||||
Float64("totalTime", totalTime). |
||||
Bool("RunForever", isDurationForever(totalTime)). |
||||
Msg("Total Duration") |
||||
ticker := time.NewTicker(checkFrequency * time.Second) |
||||
txGen.DoSyncWithoutConsensus() |
||||
syncLoop: |
||||
for { |
||||
t := time.Now() |
||||
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { |
||||
utils.Logger().Debug(). |
||||
Int("duration", (int(t.Sub(start)))). |
||||
Time("startTime", start). |
||||
Float64("totalTime", totalTime). |
||||
Msg("Generator timer ended in syncLoop.") |
||||
break syncLoop |
||||
} |
||||
select { |
||||
case <-ticker.C: |
||||
if txGen.State.String() == "NodeReadyForConsensus" { |
||||
utils.Logger().Debug(). |
||||
Str("txgen node", txGen.SelfPeer.String()). |
||||
Str("Node State", txGen.State.String()). |
||||
Msg("Generator is now in Sync.") |
||||
ticker.Stop() |
||||
break syncLoop |
||||
} |
||||
} |
||||
} |
||||
readySignal := make(chan uint32) |
||||
// This func is used to update the client's blockchain when new blocks are received from the leaders
|
||||
updateBlocksFunc := func(blocks []*types.Block) { |
||||
utils.Logger().Info(). |
||||
Uint64("block num", blocks[0].NumberU64()). |
||||
Msg("[Txgen] Received new block") |
||||
for _, block := range blocks { |
||||
shardID := block.ShardID() |
||||
if txGen.Consensus.ShardID == shardID { |
||||
utils.Logger().Info(). |
||||
Int("txNum", len(block.Transactions())). |
||||
Uint32("shardID", shardID). |
||||
Str("preHash", block.ParentHash().Hex()). |
||||
Uint64("currentBlock", txGen.Blockchain().CurrentBlock().NumberU64()). |
||||
Uint64("incoming block", block.NumberU64()). |
||||
Msg("Got block from leader") |
||||
if block.NumberU64()-txGen.Blockchain().CurrentBlock().NumberU64() == 1 { |
||||
if err := txGen.AddNewBlock(block); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("Error when adding new block") |
||||
} |
||||
stateMutex.Lock() |
||||
if err := txGen.Worker.UpdateCurrent(block.Coinbase()); err != nil { |
||||
utils.Logger().Warn().Err(err).Msg("(*Worker).UpdateCurrent failed") |
||||
} |
||||
stateMutex.Unlock() |
||||
readySignal <- shardID |
||||
} |
||||
} else { |
||||
continue |
||||
} |
||||
} |
||||
} |
||||
txGen.Client.UpdateBlocks = updateBlocksFunc |
||||
// Start the client server to listen to leader's message
|
||||
go func() { |
||||
// wait for 3 seconds for client to send ping message to leader
|
||||
// FIXME (leo) the readySignal should be set once we really sent ping message to leader
|
||||
time.Sleep(1 * time.Second) // wait for nodes to be ready
|
||||
readySignal <- uint32(shardID) |
||||
}() |
||||
pushLoop: |
||||
for { |
||||
t := time.Now() |
||||
utils.Logger().Debug(). |
||||
Float64("running time", t.Sub(start).Seconds()). |
||||
Float64("totalTime", totalTime). |
||||
Msg("Current running time") |
||||
if !isDurationForever(totalTime) && t.Sub(start).Seconds() >= totalTime { |
||||
utils.Logger().Debug(). |
||||
Int("duration", (int(t.Sub(start)))). |
||||
Time("startTime", start). |
||||
Float64("totalTime", totalTime). |
||||
Msg("Generator timer ended.") |
||||
break pushLoop |
||||
} |
||||
if shardID != 0 { |
||||
if otherHeight, flag := txGen.IsSameHeight(); flag { |
||||
if otherHeight >= 1 { |
||||
go func() { |
||||
readySignal <- uint32(shardID) |
||||
utils.Logger().Debug().Msg("Same blockchain height so readySignal generated") |
||||
time.Sleep(3 * time.Second) // wait for nodes to be ready
|
||||
}() |
||||
} |
||||
} |
||||
} |
||||
select { |
||||
case shardID := <-readySignal: |
||||
lock := sync.Mutex{} |
||||
txs, err := GenerateSimulatedTransactionsAccount(uint32(shardID), txGen, setting) |
||||
if err != nil { |
||||
utils.Logger().Debug(). |
||||
Err(err). |
||||
Msg("Error in Generating Txns") |
||||
} |
||||
lock.Lock() |
||||
SendTxsToShard(txGen, txs, uint32(shardID)) |
||||
lock.Unlock() |
||||
case <-time.After(10 * time.Second): |
||||
utils.Logger().Warn().Msg("No new block is received so far") |
||||
} |
||||
} |
||||
} |
||||
|
||||
// SendTxsToShard sends txs to shard, currently just to beacon shard
|
||||
func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint32) { |
||||
msg := proto_node.ConstructTransactionListMessageAccount(txs) |
||||
var err error |
||||
if shardID == 0 { |
||||
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) |
||||
} else { |
||||
clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID)) |
||||
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg)) |
||||
} |
||||
if err != nil { |
||||
utils.Logger().Debug(). |
||||
Err(err). |
||||
Msg("Error in Sending Txns") |
||||
} |
||||
} |
||||
|
||||
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
|
||||
func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) { |
||||
TxnsToGenerate := setting.MaxNumTxsPerBatch // TODO: make use of settings
|
||||
txs := make([]*types.Transaction, TxnsToGenerate) |
||||
rounds := (TxnsToGenerate / 100) |
||||
remainder := TxnsToGenerate % 100 |
||||
for i := 0; i < 100; i++ { |
||||
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey)) |
||||
for j := 0; j < rounds; j++ { |
||||
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey) |
||||
randAmount := rand.Float32() |
||||
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) |
||||
txs[100*j+i] = tx |
||||
} |
||||
if i < remainder { |
||||
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey) |
||||
randAmount := rand.Float32() |
||||
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(rounds), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) |
||||
txs[100*rounds+i] = tx |
||||
} |
||||
} |
||||
return txs, nil |
||||
} |
||||
|
||||
func isDurationForever(duration float64) bool { |
||||
return duration <= 0 |
||||
} |
@ -1,525 +0,0 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"encoding/hex" |
||||
"flag" |
||||
"fmt" |
||||
"math/big" |
||||
"math/rand" |
||||
"os" |
||||
"path" |
||||
"runtime" |
||||
"strconv" |
||||
"time" |
||||
|
||||
ethCommon "github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/log" |
||||
"github.com/harmony-one/bls/ffi/go/bls" |
||||
|
||||
"github.com/harmony-one/harmony/api/service/syncing" |
||||
"github.com/harmony-one/harmony/consensus" |
||||
"github.com/harmony-one/harmony/consensus/quorum" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/internal/blsgen" |
||||
"github.com/harmony-one/harmony/internal/common" |
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" |
||||
"github.com/harmony-one/harmony/internal/genesis" |
||||
hmykey "github.com/harmony-one/harmony/internal/keystore" |
||||
"github.com/harmony-one/harmony/internal/memprofiling" |
||||
"github.com/harmony-one/harmony/internal/shardchain" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/node" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/p2pimpl" |
||||
) |
||||
|
||||
// Version string variables
|
||||
var ( |
||||
version string |
||||
builtBy string |
||||
builtAt string |
||||
commit string |
||||
) |
||||
|
||||
// Host
|
||||
var ( |
||||
myHost p2p.Host |
||||
) |
||||
|
||||
// InitLDBDatabase initializes a LDBDatabase. isGenesis=true will return the beacon chain database for normal shard nodes
|
||||
func InitLDBDatabase(ip string, port string, freshDB bool, isBeacon bool) (*ethdb.LDBDatabase, error) { |
||||
var dbFileName string |
||||
if isBeacon { |
||||
dbFileName = fmt.Sprintf("./db/harmony_beacon_%s_%s", ip, port) |
||||
} else { |
||||
dbFileName = fmt.Sprintf("./db/harmony_%s_%s", ip, port) |
||||
} |
||||
if freshDB { |
||||
var err = os.RemoveAll(dbFileName) |
||||
if err != nil { |
||||
fmt.Println(err.Error()) |
||||
} |
||||
} |
||||
return ethdb.NewLDBDatabase(dbFileName, 0, 0) |
||||
} |
||||
|
||||
func printVersion() { |
||||
fmt.Fprintln(os.Stderr, nodeconfig.GetVersion()) |
||||
os.Exit(0) |
||||
} |
||||
|
||||
// Flags
|
||||
var ( |
||||
ip = flag.String("ip", "127.0.0.1", "ip of the node") |
||||
port = flag.String("port", "9000", "port of the node.") |
||||
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution") |
||||
logMaxSize = flag.Int("log_max_size", 100, "the max size in megabytes of the log file before it gets rotated") |
||||
freshDB = flag.Bool("fresh_db", false, "true means the existing disk based db will be removed") |
||||
profile = flag.Bool("profile", false, "Turn on profiling (CPU, Memory).") |
||||
metricsReportURL = flag.String("metrics_report_url", "", "If set, reports metrics to this URL.") |
||||
versionFlag = flag.Bool("version", false, "Output version info") |
||||
onlyLogTps = flag.Bool("only_log_tps", false, "Only log TPS if true") |
||||
dnsZone = flag.String("dns_zone", "", "if given and not empty, use peers from the zone (default: use libp2p peer discovery instead)") |
||||
dnsFlag = flag.Bool("dns", true, "[deprecated] equivalent to -dns_zone t.hmny.io") |
||||
//Leader needs to have a minimal number of peers to start consensus
|
||||
minPeers = flag.Int("min_peers", 32, "Minimal number of Peers in shard") |
||||
// Key file to store the private key
|
||||
keyFile = flag.String("key", "./.hmykey", "the p2p key file of the harmony node") |
||||
// isGenesis indicates this node is a genesis node
|
||||
isGenesis = flag.Bool("is_genesis", true, "true means this node is a genesis node") |
||||
// isArchival indicates this node is an archival node that will save and archive current blockchain
|
||||
isArchival = flag.Bool("is_archival", true, "false makes node faster by turning caching off") |
||||
// delayCommit is the commit-delay timer, used by Harmony nodes
|
||||
delayCommit = flag.String("delay_commit", "0ms", "how long to delay sending commit messages in consensus, ex: 500ms, 1s") |
||||
// nodeType indicates the type of the node: validator, explorer
|
||||
nodeType = flag.String("node_type", "validator", "node type: validator, explorer") |
||||
// networkType indicates the type of the network
|
||||
networkType = flag.String("network_type", "mainnet", "type of the network: mainnet, testnet, devnet, localnet") |
||||
// syncFreq indicates sync frequency
|
||||
syncFreq = flag.Int("sync_freq", 60, "unit in seconds") |
||||
// beaconSyncFreq indicates beaconchain sync frequency
|
||||
beaconSyncFreq = flag.Int("beacon_sync_freq", 60, "unit in seconds") |
||||
|
||||
// blockPeriod indicates the how long the leader waits to propose a new block.
|
||||
blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.") |
||||
leaderOverride = flag.Bool("leader_override", false, "true means override the default leader role and acts as validator") |
||||
// shardID indicates the shard ID of this node
|
||||
shardID = flag.Int("shard_id", -1, "the shard ID of this node") |
||||
enableMemProfiling = flag.Bool("enableMemProfiling", false, "Enable memsize logging.") |
||||
enableGC = flag.Bool("enableGC", true, "Enable calling garbage collector manually .") |
||||
blsKeyFile = flag.String("blskey_file", "", "The encrypted file of bls serialized private key by passphrase.") |
||||
blsPass = flag.String("blspass", "", "The file containing passphrase to decrypt the encrypted bls file.") |
||||
blsPassphrase string |
||||
|
||||
// Sharding configuration parameters for devnet
|
||||
devnetNumShards = flag.Uint("dn_num_shards", 2, "number of shards for -network_type=devnet (default: 2)") |
||||
devnetShardSize = flag.Int("dn_shard_size", 10, "number of nodes per shard for -network_type=devnet (default 10)") |
||||
devnetHarmonySize = flag.Int("dn_hmy_size", -1, "number of Harmony-operated nodes per shard for -network_type=devnet; negative (default) means equal to -dn_shard_size") |
||||
|
||||
// logConn logs incoming/outgoing connections
|
||||
logConn = flag.Bool("log_conn", false, "log incoming/outgoing connections") |
||||
|
||||
keystoreDir = flag.String("keystore", hmykey.DefaultKeyStoreDir, "The default keystore directory") |
||||
|
||||
initialAccount = &genesis.DeployAccount{} |
||||
|
||||
// logging verbosity
|
||||
verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)") |
||||
|
||||
// dbDir is the database directory.
|
||||
dbDir = flag.String("db_dir", "", "blockchain database directory") |
||||
|
||||
// Disable view change.
|
||||
disableViewChange = flag.Bool("disable_view_change", false, "Do not propose view change (testing only)") |
||||
|
||||
// metrics flag to collct meetrics or not, pushgateway ip and port for metrics
|
||||
metricsFlag = flag.Bool("metrics", false, "Collect and upload node metrics") |
||||
pushgatewayIP = flag.String("pushgateway_ip", "grafana.harmony.one", "Metrics view ip") |
||||
pushgatewayPort = flag.String("pushgateway_port", "9091", "Metrics view port") |
||||
|
||||
publicRPC = flag.Bool("public_rpc", false, "Enable Public RPC Access (default: false)") |
||||
) |
||||
|
||||
func initSetup() { |
||||
|
||||
// maybe request passphrase for bls key.
|
||||
passphraseForBls() |
||||
|
||||
// Configure log parameters
|
||||
utils.SetLogContext(*port, *ip) |
||||
utils.SetLogVerbosity(log.Lvl(*verbosity)) |
||||
utils.AddLogFile(fmt.Sprintf("%v/validator-%v-%v.log", *logFolder, *ip, *port), *logMaxSize) |
||||
|
||||
if *onlyLogTps { |
||||
matchFilterHandler := log.MatchFilterHandler("msg", "TPS Report", utils.GetLogInstance().GetHandler()) |
||||
utils.GetLogInstance().SetHandler(matchFilterHandler) |
||||
} |
||||
|
||||
// Add GOMAXPROCS to achieve max performance.
|
||||
runtime.GOMAXPROCS(runtime.NumCPU() * 4) |
||||
|
||||
// Set port and ip to global config.
|
||||
nodeconfig.GetDefaultConfig().Port = *port |
||||
nodeconfig.GetDefaultConfig().IP = *ip |
||||
|
||||
// Setup mem profiling.
|
||||
memprofiling.GetMemProfiling().Config() |
||||
|
||||
// Set default keystore Dir
|
||||
hmykey.DefaultKeyStoreDir = *keystoreDir |
||||
|
||||
// Set up randomization seed.
|
||||
rand.Seed(int64(time.Now().Nanosecond())) |
||||
|
||||
if len(utils.BootNodes) == 0 { |
||||
bootNodeAddrs, err := utils.StringsToAddrs(utils.DefaultBootNodeAddrStrings) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
utils.BootNodes = bootNodeAddrs |
||||
} |
||||
} |
||||
|
||||
func passphraseForBls() { |
||||
// If FN node running, they should either specify blsPrivateKey or the file with passphrase
|
||||
// However, explorer or non-validator nodes need no blskey
|
||||
if *nodeType != "validator" { |
||||
return |
||||
} |
||||
|
||||
if *blsKeyFile == "" || *blsPass == "" { |
||||
fmt.Println("Internal nodes need to have pass to decrypt blskey") |
||||
os.Exit(101) |
||||
} |
||||
passphrase, err := utils.GetPassphraseFromSource(*blsPass) |
||||
if err != nil { |
||||
fmt.Fprintf(os.Stderr, "ERROR when reading passphrase file: %v\n", err) |
||||
os.Exit(100) |
||||
} |
||||
blsPassphrase = passphrase |
||||
} |
||||
|
||||
func setupInitialAccount() (isLeader bool) { |
||||
genesisShardingConfig := core.ShardingSchedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch)) |
||||
pubKey := setupConsensusKey(nodeconfig.GetDefaultConfig()) |
||||
|
||||
reshardingEpoch := genesisShardingConfig.ReshardingEpoch() |
||||
if reshardingEpoch != nil && len(reshardingEpoch) > 0 { |
||||
for _, epoch := range reshardingEpoch { |
||||
config := core.ShardingSchedule.InstanceForEpoch(epoch) |
||||
isLeader, initialAccount = config.FindAccount(pubKey.SerializeToHexStr()) |
||||
if initialAccount != nil { |
||||
break |
||||
} |
||||
} |
||||
} else { |
||||
isLeader, initialAccount = genesisShardingConfig.FindAccount(pubKey.SerializeToHexStr()) |
||||
} |
||||
|
||||
if initialAccount == nil { |
||||
fmt.Fprintf(os.Stderr, "ERROR cannot find your BLS key in the genesis/FN tables: %s\n", pubKey.SerializeToHexStr()) |
||||
os.Exit(100) |
||||
} |
||||
|
||||
fmt.Printf("My Genesis Account: %v\n", *initialAccount) |
||||
|
||||
return isLeader |
||||
} |
||||
|
||||
func setupConsensusKey(nodeConfig *nodeconfig.ConfigType) *bls.PublicKey { |
||||
consensusPriKey, err := blsgen.LoadBlsKeyWithPassPhrase(*blsKeyFile, blsPassphrase) |
||||
if err != nil { |
||||
fmt.Fprintf(os.Stderr, "ERROR when loading bls key, err :%v\n", err) |
||||
os.Exit(100) |
||||
} |
||||
pubKey := consensusPriKey.GetPublicKey() |
||||
|
||||
// Consensus keys are the BLS12-381 keys used to sign consensus messages
|
||||
nodeConfig.ConsensusPriKey, nodeConfig.ConsensusPubKey = consensusPriKey, consensusPriKey.GetPublicKey() |
||||
if nodeConfig.ConsensusPriKey == nil || nodeConfig.ConsensusPubKey == nil { |
||||
fmt.Println("error to get consensus keys.") |
||||
os.Exit(100) |
||||
} |
||||
return pubKey |
||||
} |
||||
|
||||
func createGlobalConfig() *nodeconfig.ConfigType { |
||||
var err error |
||||
|
||||
nodeConfig := nodeconfig.GetShardConfig(initialAccount.ShardID) |
||||
if *nodeType == "validator" { |
||||
// Set up consensus keys.
|
||||
setupConsensusKey(nodeConfig) |
||||
} else { |
||||
nodeConfig.ConsensusPriKey = &bls.SecretKey{} // set dummy bls key for consensus object
|
||||
} |
||||
|
||||
// Set network type
|
||||
netType := nodeconfig.NetworkType(*networkType) |
||||
switch netType { |
||||
case nodeconfig.Mainnet, nodeconfig.Testnet, nodeconfig.Pangaea, nodeconfig.Localnet, nodeconfig.Devnet: |
||||
nodeconfig.SetNetworkType(netType) |
||||
default: |
||||
panic(fmt.Sprintf("invalid network type: %s", *networkType)) |
||||
} |
||||
|
||||
nodeConfig.SetPushgatewayIP(*pushgatewayIP) |
||||
nodeConfig.SetPushgatewayPort(*pushgatewayPort) |
||||
nodeConfig.SetMetricsFlag(*metricsFlag) |
||||
|
||||
// P2p private key is used for secure message transfer between p2p nodes.
|
||||
nodeConfig.P2pPriKey, _, err = utils.LoadKeyFromFile(*keyFile) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} |
||||
|
||||
myHost, err = p2pimpl.NewHost(&selfPeer, nodeConfig.P2pPriKey) |
||||
if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet { |
||||
myHost.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogger())) |
||||
} |
||||
if err != nil { |
||||
panic("unable to new host in harmony") |
||||
} |
||||
|
||||
nodeConfig.DBDir = *dbDir |
||||
|
||||
return nodeConfig |
||||
} |
||||
|
||||
func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { |
||||
// Consensus object.
|
||||
// TODO: consensus object shouldn't start here
|
||||
// TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later.
|
||||
decider := quorum.NewDecider(quorum.SuperMajorityVote) |
||||
currentConsensus, err := consensus.New( |
||||
myHost, nodeConfig.ShardID, p2p.Peer{}, nodeConfig.ConsensusPriKey, decider, |
||||
) |
||||
currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address) |
||||
|
||||
if err != nil { |
||||
fmt.Fprintf(os.Stderr, "Error :%v \n", err) |
||||
os.Exit(1) |
||||
} |
||||
commitDelay, err := time.ParseDuration(*delayCommit) |
||||
if err != nil || commitDelay < 0 { |
||||
_, _ = fmt.Fprintf(os.Stderr, "ERROR invalid commit delay %#v", *delayCommit) |
||||
os.Exit(1) |
||||
} |
||||
currentConsensus.SetCommitDelay(commitDelay) |
||||
currentConsensus.MinPeers = *minPeers |
||||
|
||||
if *disableViewChange { |
||||
currentConsensus.DisableViewChangeForTestingOnly() |
||||
} |
||||
|
||||
// Current node.
|
||||
chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} |
||||
currentNode := node.New(myHost, currentConsensus, chainDBFactory, *isArchival) |
||||
|
||||
switch { |
||||
case *networkType == nodeconfig.Localnet: |
||||
epochConfig := core.ShardingSchedule.InstanceForEpoch(ethCommon.Big0) |
||||
selfPort, err := strconv.ParseUint(*port, 10, 16) |
||||
if err != nil { |
||||
utils.Logger().Fatal(). |
||||
Err(err). |
||||
Str("self_port_string", *port). |
||||
Msg("cannot convert self port string into port number") |
||||
} |
||||
currentNode.SyncingPeerProvider = node.NewLocalSyncingPeerProvider( |
||||
6000, uint16(selfPort), epochConfig.NumShards(), uint32(epochConfig.NumNodesPerShard())) |
||||
case *dnsZone != "": |
||||
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider(*dnsZone, syncing.GetSyncingPort(*port)) |
||||
case *dnsFlag: |
||||
currentNode.SyncingPeerProvider = node.NewDNSSyncingPeerProvider("t.hmny.io", syncing.GetSyncingPort(*port)) |
||||
default: |
||||
currentNode.SyncingPeerProvider = node.NewLegacySyncingPeerProvider(currentNode) |
||||
|
||||
} |
||||
|
||||
// TODO: refactor the creation of blockchain out of node.New()
|
||||
currentConsensus.ChainReader = currentNode.Blockchain() |
||||
|
||||
// Set up prometheus pushgateway for metrics monitoring serivce.
|
||||
currentNode.NodeConfig.SetPushgatewayIP(nodeConfig.PushgatewayIP) |
||||
currentNode.NodeConfig.SetPushgatewayPort(nodeConfig.PushgatewayPort) |
||||
currentNode.NodeConfig.SetMetricsFlag(nodeConfig.MetricsFlag) |
||||
|
||||
currentNode.NodeConfig.SetBeaconGroupID(nodeconfig.NewGroupIDByShardID(0)) |
||||
|
||||
switch *nodeType { |
||||
case "explorer": |
||||
currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) |
||||
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(*shardID))) |
||||
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(*shardID))) |
||||
case "validator": |
||||
currentNode.NodeConfig.SetRole(nodeconfig.Validator) |
||||
if nodeConfig.ShardID == 0 { |
||||
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(0)) |
||||
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(0)) |
||||
} else { |
||||
currentNode.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) |
||||
currentNode.NodeConfig.SetClientGroupID(nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(nodeConfig.ShardID))) |
||||
} |
||||
} |
||||
currentNode.NodeConfig.ConsensusPubKey = nodeConfig.ConsensusPubKey |
||||
currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey |
||||
|
||||
// Setup block period for currentNode.
|
||||
currentNode.BlockPeriod = time.Duration(*blockPeriod) * time.Second |
||||
|
||||
// TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection.
|
||||
// Enable it back after mainnet.
|
||||
// dRand := drand.New(nodeConfig.Host, nodeConfig.ShardID, []p2p.Peer{}, nodeConfig.Leader, currentNode.ConfirmedBlockChannel, nodeConfig.ConsensusPriKey)
|
||||
// currentNode.Consensus.RegisterPRndChannel(dRand.PRndChannel)
|
||||
// currentNode.Consensus.RegisterRndChannel(dRand.RndChannel)
|
||||
// currentNode.DRand = dRand
|
||||
|
||||
// This needs to be executed after consensus and drand are setup
|
||||
if err := currentNode.CalculateInitShardState(); err != nil { |
||||
utils.Logger().Warn(). |
||||
Int("shardID", *shardID). |
||||
Err(err). |
||||
Msg("CalculateInitShardState failed") |
||||
} |
||||
|
||||
// Set the consensus ID to be the current block number
|
||||
viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64() |
||||
currentConsensus.SetViewID(viewID) |
||||
utils.Logger().Info(). |
||||
Uint64("viewID", viewID). |
||||
Msg("Init Blockchain") |
||||
|
||||
// Assign closure functions to the consensus object
|
||||
currentConsensus.BlockVerifier = currentNode.VerifyNewBlock |
||||
currentConsensus.OnConsensusDone = currentNode.PostConsensusProcessing |
||||
currentNode.State = node.NodeWaitToJoin |
||||
|
||||
// update consensus information based on the blockchain
|
||||
mode := currentConsensus.UpdateConsensusInformation() |
||||
currentConsensus.SetMode(mode) |
||||
|
||||
// Watching currentNode and currentConsensus.
|
||||
memprofiling.GetMemProfiling().Add("currentNode", currentNode) |
||||
memprofiling.GetMemProfiling().Add("currentConsensus", currentConsensus) |
||||
return currentNode |
||||
} |
||||
|
||||
func main() { |
||||
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress (delimited by ,)") |
||||
flag.Parse() |
||||
|
||||
switch *nodeType { |
||||
case "validator": |
||||
case "explorer": |
||||
break |
||||
default: |
||||
fmt.Fprintf(os.Stderr, "Unknown node type: %s\n", *nodeType) |
||||
os.Exit(1) |
||||
} |
||||
|
||||
nodeconfig.SetPublicRPC(*publicRPC) |
||||
nodeconfig.SetVersion(fmt.Sprintf("Harmony (C) 2019. %v, version %v-%v (%v %v)", path.Base(os.Args[0]), version, commit, builtBy, builtAt)) |
||||
if *versionFlag { |
||||
printVersion() |
||||
} |
||||
|
||||
switch *networkType { |
||||
case nodeconfig.Mainnet: |
||||
core.ShardingSchedule = shardingconfig.MainnetSchedule |
||||
case nodeconfig.Testnet: |
||||
core.ShardingSchedule = shardingconfig.TestnetSchedule |
||||
case nodeconfig.Pangaea: |
||||
core.ShardingSchedule = shardingconfig.PangaeaSchedule |
||||
case nodeconfig.Localnet: |
||||
core.ShardingSchedule = shardingconfig.LocalnetSchedule |
||||
case nodeconfig.Devnet: |
||||
if *devnetHarmonySize < 0 { |
||||
*devnetHarmonySize = *devnetShardSize |
||||
} |
||||
// TODO (leo): use a passing list of accounts here
|
||||
devnetConfig, err := shardingconfig.NewInstance( |
||||
uint32(*devnetNumShards), *devnetShardSize, *devnetHarmonySize, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, nil) |
||||
if err != nil { |
||||
_, _ = fmt.Fprintf(os.Stderr, "ERROR invalid devnet sharding config: %s", |
||||
err) |
||||
os.Exit(1) |
||||
} |
||||
core.ShardingSchedule = shardingconfig.NewFixedSchedule(devnetConfig) |
||||
} |
||||
|
||||
initSetup() |
||||
|
||||
// Set up manual call for garbage collection.
|
||||
if *enableGC { |
||||
memprofiling.MaybeCallGCPeriodically() |
||||
} |
||||
|
||||
if *nodeType == "validator" { |
||||
setupInitialAccount() |
||||
} |
||||
|
||||
if *shardID >= 0 { |
||||
utils.Logger().Info(). |
||||
Uint32("original", initialAccount.ShardID). |
||||
Int("override", *shardID). |
||||
Msg("ShardID Override") |
||||
initialAccount.ShardID = uint32(*shardID) |
||||
} |
||||
|
||||
nodeConfig := createGlobalConfig() |
||||
currentNode := setupConsensusAndNode(nodeConfig) |
||||
//setup state syncing and beacon syncing frequency
|
||||
currentNode.SetSyncFreq(*syncFreq) |
||||
currentNode.SetBeaconSyncFreq(*beaconSyncFreq) |
||||
|
||||
if nodeConfig.ShardID != 0 && currentNode.NodeConfig.Role() != nodeconfig.ExplorerNode { |
||||
utils.Logger().Info().Uint32("shardID", currentNode.Blockchain().ShardID()).Uint32("shardID", nodeConfig.ShardID).Msg("SupportBeaconSyncing") |
||||
go currentNode.SupportBeaconSyncing() |
||||
} |
||||
|
||||
startMsg := "==== New Harmony Node ====" |
||||
if *nodeType == "explorer" { |
||||
startMsg = "==== New Explorer Node ====" |
||||
} |
||||
|
||||
utils.Logger().Info(). |
||||
Str("BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize())). |
||||
Uint32("ShardID", nodeConfig.ShardID). |
||||
Str("ShardGroupID", nodeConfig.GetShardGroupID().String()). |
||||
Str("BeaconGroupID", nodeConfig.GetBeaconGroupID().String()). |
||||
Str("ClientGroupID", nodeConfig.GetClientGroupID().String()). |
||||
Str("Role", currentNode.NodeConfig.Role().String()). |
||||
Str("multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, myHost.GetID().Pretty())). |
||||
Msg(startMsg) |
||||
|
||||
if *enableMemProfiling { |
||||
memprofiling.GetMemProfiling().Start() |
||||
} |
||||
go currentNode.SupportSyncing() |
||||
currentNode.ServiceManagerSetup() |
||||
|
||||
currentNode.RunServices() |
||||
// RPC for SDK not supported for mainnet.
|
||||
if err := currentNode.StartRPC(*port); err != nil { |
||||
utils.Logger().Warn(). |
||||
Err(err). |
||||
Msg("StartRPC failed") |
||||
} |
||||
|
||||
// Run additional node collectors
|
||||
// Collect node metrics if metrics flag is set
|
||||
if currentNode.NodeConfig.GetMetricsFlag() { |
||||
go currentNode.CollectMetrics() |
||||
} |
||||
// Commit committtee if node role is explorer
|
||||
if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { |
||||
go currentNode.CommitCommittee() |
||||
} |
||||
|
||||
currentNode.StartServer() |
||||
} |
@ -0,0 +1,73 @@ |
||||
package quorum |
||||
|
||||
import ( |
||||
"math/big" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/bls/ffi/go/bls" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
// "github.com/harmony-one/harmony/staking/effective"
|
||||
) |
||||
|
||||
type uniformVoteWeight struct { |
||||
SignatureReader |
||||
DependencyInjectionWriter |
||||
} |
||||
|
||||
// Policy ..
|
||||
func (v *uniformVoteWeight) Policy() Policy { |
||||
return SuperMajorityVote |
||||
} |
||||
|
||||
// IsQuorumAchieved ..
|
||||
func (v *uniformVoteWeight) IsQuorumAchieved(p Phase) bool { |
||||
r := v.SignersCount(p) >= v.QuorumThreshold().Int64() |
||||
utils.Logger().Info().Str("phase", p.String()). |
||||
Int64("signers-count", v.SignersCount(p)). |
||||
Int64("threshold", v.QuorumThreshold().Int64()). |
||||
Int64("participants", v.ParticipantsCount()). |
||||
Msg("Quorum details") |
||||
return r |
||||
} |
||||
|
||||
// QuorumThreshold ..
|
||||
func (v *uniformVoteWeight) QuorumThreshold() *big.Int { |
||||
return big.NewInt(v.ParticipantsCount()*2/3 + 1) |
||||
} |
||||
|
||||
// RewardThreshold ..
|
||||
func (v *uniformVoteWeight) IsRewardThresholdAchieved() bool { |
||||
return v.SignersCount(Commit) >= (v.ParticipantsCount() * 9 / 10) |
||||
} |
||||
|
||||
// func (v *uniformVoteWeight) UpdateVotingPower(effective.StakeKeeper) {
|
||||
// NO-OP do not add anything here
|
||||
// }
|
||||
|
||||
// ToggleActive for uniform vote is a no-op, always says that voter is active
|
||||
func (v *uniformVoteWeight) ToggleActive(*bls.PublicKey) bool { |
||||
// NO-OP do not add anything here
|
||||
return true |
||||
} |
||||
|
||||
// Award ..
|
||||
func (v *uniformVoteWeight) Award( |
||||
// Here hook is the callback which gets the amount the earner is due in just reward
|
||||
// up to the hook to do side-effects like write the statedb
|
||||
Pie *big.Int, earners []common.Address, hook func(earner common.Address, due *big.Int), |
||||
) *big.Int { |
||||
payout := big.NewInt(0) |
||||
last := big.NewInt(0) |
||||
count := big.NewInt(int64(len(earners))) |
||||
|
||||
for i, account := range earners { |
||||
cur := big.NewInt(0) |
||||
cur.Mul(Pie, big.NewInt(int64(i+1))).Div(cur, count) |
||||
diff := big.NewInt(0).Sub(cur, last) |
||||
hook(common.Address(account), diff) |
||||
payout = big.NewInt(0).Add(payout, diff) |
||||
last = cur |
||||
} |
||||
|
||||
return payout |
||||
} |
@ -0,0 +1,75 @@ |
||||
package quorum |
||||
|
||||
import ( |
||||
"math/big" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/bls/ffi/go/bls" |
||||
"github.com/harmony-one/harmony/numeric" |
||||
"github.com/harmony-one/harmony/shard" |
||||
) |
||||
|
||||
var ( |
||||
twoThirds = numeric.NewDec(2).QuoInt64(3).Int |
||||
) |
||||
|
||||
type stakedVoter struct { |
||||
isActive, isHarmonyNode bool |
||||
effective numeric.Dec |
||||
} |
||||
|
||||
type stakedVoteWeight struct { |
||||
SignatureReader |
||||
DependencyInjectionWriter |
||||
// EPOS based staking
|
||||
validatorStakes map[[shard.PublicKeySizeInBytes]byte]stakedVoter |
||||
totalEffectiveStakedAmount *big.Int |
||||
} |
||||
|
||||
// Policy ..
|
||||
func (v *stakedVoteWeight) Policy() Policy { |
||||
return SuperMajorityStake |
||||
} |
||||
|
||||
// We must maintain 2/3 quoroum, so whatever is 2/3 staked amount,
|
||||
// we divide that out & you
|
||||
// IsQuorumAchieved ..
|
||||
func (v *stakedVoteWeight) IsQuorumAchieved(p Phase) bool { |
||||
// TODO Implement this logic
|
||||
return true |
||||
} |
||||
|
||||
// QuorumThreshold ..
|
||||
func (v *stakedVoteWeight) QuorumThreshold() *big.Int { |
||||
return new(big.Int).Mul(v.totalEffectiveStakedAmount, twoThirds) |
||||
} |
||||
|
||||
// RewardThreshold ..
|
||||
func (v *stakedVoteWeight) IsRewardThresholdAchieved() bool { |
||||
// TODO Implement
|
||||
return false |
||||
} |
||||
|
||||
// HACK
|
||||
var ( |
||||
hSentinel = big.NewInt(0) |
||||
hEffectiveSentinel = numeric.ZeroDec() |
||||
) |
||||
|
||||
// Award ..
|
||||
func (v *stakedVoteWeight) Award( |
||||
Pie *big.Int, earners []common.Address, hook func(earner common.Address, due *big.Int), |
||||
) *big.Int { |
||||
// TODO Implement
|
||||
return nil |
||||
} |
||||
|
||||
// UpdateVotingPower called only at epoch change, prob need to move to CalculateShardState
|
||||
// func (v *stakedVoteWeight) UpdateVotingPower(keeper effective.StakeKeeper) {
|
||||
// TODO Implement
|
||||
// }
|
||||
|
||||
func (v *stakedVoteWeight) ToggleActive(*bls.PublicKey) bool { |
||||
// TODO Implement
|
||||
return true |
||||
} |
@ -0,0 +1,16 @@ |
||||
package reward |
||||
|
||||
import ( |
||||
"math/big" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
) |
||||
|
||||
// Distributor ..
|
||||
type Distributor interface { |
||||
Award( |
||||
Pie *big.Int, |
||||
earners []common.Address, |
||||
hook func(earner common.Address, due *big.Int), |
||||
) (payout *big.Int) |
||||
} |
@ -1,259 +0,0 @@ |
||||
package core |
||||
|
||||
import ( |
||||
"encoding/hex" |
||||
"errors" |
||||
"math/big" |
||||
"math/rand" |
||||
"sort" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/bls/ffi/go/bls" |
||||
common2 "github.com/harmony-one/harmony/internal/common" |
||||
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" |
||||
"github.com/harmony-one/harmony/internal/ctxerror" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/shard" |
||||
) |
||||
|
||||
const ( |
||||
// GenesisEpoch is the number of the genesis epoch.
|
||||
GenesisEpoch = 0 |
||||
// CuckooRate is the percentage of nodes getting reshuffled in the second step of cuckoo resharding.
|
||||
CuckooRate = 0.1 |
||||
) |
||||
|
||||
// ShardingState is data structure hold the sharding state
|
||||
type ShardingState struct { |
||||
epoch uint64 // current epoch
|
||||
rnd uint64 // random seed for resharding
|
||||
numShards int // TODO ek – equal to len(shardState); remove this
|
||||
shardState shard.State |
||||
} |
||||
|
||||
// sortedCommitteeBySize will sort shards by size
|
||||
// Suppose there are N shards, the first N/2 larger shards are called active committees
|
||||
// the rest N/2 smaller committees are called inactive committees
|
||||
// actually they are all just normal shards
|
||||
// TODO: sort the committee weighted by total staking instead of shard size
|
||||
func (ss *ShardingState) sortCommitteeBySize() { |
||||
sort.Slice(ss.shardState, func(i, j int) bool { |
||||
return len(ss.shardState[i].NodeList) > len(ss.shardState[j].NodeList) |
||||
}) |
||||
} |
||||
|
||||
// assignNewNodes add new nodes into the N/2 active committees evenly
|
||||
func (ss *ShardingState) assignNewNodes(newNodeList []shard.NodeID) { |
||||
ss.sortCommitteeBySize() |
||||
numActiveShards := ss.numShards / 2 |
||||
Shuffle(newNodeList) |
||||
for i, nid := range newNodeList { |
||||
id := 0 |
||||
if numActiveShards > 0 { |
||||
id = i % numActiveShards |
||||
} |
||||
if id < len(ss.shardState) { |
||||
ss.shardState[id].NodeList = append(ss.shardState[id].NodeList, nid) |
||||
} else { |
||||
utils.Logger().Error().Int("id", id).Int("shardState Count", len(ss.shardState)).Msg("assignNewNodes index out of range") |
||||
} |
||||
} |
||||
} |
||||
|
||||
// cuckooResharding uses cuckoo rule to reshard X% of active committee(shards) into inactive committee(shards)
|
||||
func (ss *ShardingState) cuckooResharding(percent float64) { |
||||
numActiveShards := ss.numShards / 2 |
||||
kickedNodes := []shard.NodeID{} |
||||
for i := range ss.shardState { |
||||
if i >= numActiveShards { |
||||
break |
||||
} |
||||
numKicked := int(percent * float64(len(ss.shardState[i].NodeList))) |
||||
if numKicked == 0 { |
||||
numKicked++ // At least kick one node out
|
||||
} |
||||
length := len(ss.shardState[i].NodeList) |
||||
if length-numKicked <= 0 { |
||||
continue // Never empty a shard
|
||||
} |
||||
tmp := ss.shardState[i].NodeList[length-numKicked:] |
||||
kickedNodes = append(kickedNodes, tmp...) |
||||
ss.shardState[i].NodeList = ss.shardState[i].NodeList[:length-numKicked] |
||||
} |
||||
|
||||
Shuffle(kickedNodes) |
||||
numInactiveShards := ss.numShards - numActiveShards |
||||
for i, nid := range kickedNodes { |
||||
id := numActiveShards |
||||
if numInactiveShards > 0 { |
||||
id += i % numInactiveShards |
||||
} |
||||
ss.shardState[id].NodeList = append(ss.shardState[id].NodeList, nid) |
||||
} |
||||
} |
||||
|
||||
// Reshard will first add new nodes into shards, then use cuckoo rule to reshard to get new shard state
|
||||
func (ss *ShardingState) Reshard(newNodeList []shard.NodeID, percent float64) { |
||||
rand.Seed(int64(ss.rnd)) |
||||
ss.sortCommitteeBySize() |
||||
|
||||
// Take out and preserve leaders
|
||||
leaders := []shard.NodeID{} |
||||
for i := 0; i < ss.numShards; i++ { |
||||
if len(ss.shardState[i].NodeList) > 0 { |
||||
leaders = append(leaders, ss.shardState[i].NodeList[0]) |
||||
ss.shardState[i].NodeList = ss.shardState[i].NodeList[1:] |
||||
// Also shuffle the rest of the nodes
|
||||
Shuffle(ss.shardState[i].NodeList) |
||||
} |
||||
} |
||||
|
||||
ss.assignNewNodes(newNodeList) |
||||
ss.cuckooResharding(percent) |
||||
|
||||
// Put leader back
|
||||
if len(leaders) < ss.numShards { |
||||
utils.Logger().Error().Msg("Not enough leaders to assign to shards") |
||||
} |
||||
for i := 0; i < ss.numShards; i++ { |
||||
ss.shardState[i].NodeList = append([]shard.NodeID{leaders[i]}, ss.shardState[i].NodeList...) |
||||
} |
||||
} |
||||
|
||||
// Shuffle will shuffle the list with result uniquely determined by seed, assuming there is no repeat items in the list
|
||||
func Shuffle(list []shard.NodeID) { |
||||
// Sort to make sure everyone will generate the same with the same rand seed.
|
||||
sort.Slice(list, func(i, j int) bool { |
||||
return shard.CompareNodeIDByBLSKey(list[i], list[j]) == -1 |
||||
}) |
||||
rand.Shuffle(len(list), func(i, j int) { |
||||
list[i], list[j] = list[j], list[i] |
||||
}) |
||||
} |
||||
|
||||
// GetEpochFromBlockNumber calculates the epoch number the block belongs to
|
||||
func GetEpochFromBlockNumber(blockNumber uint64) uint64 { |
||||
return ShardingSchedule.CalcEpochNumber(blockNumber).Uint64() |
||||
} |
||||
|
||||
// GetShardingStateFromBlockChain will retrieve random seed and shard map from beacon chain for given a epoch
|
||||
func GetShardingStateFromBlockChain(bc *BlockChain, epoch *big.Int) (*ShardingState, error) { |
||||
if bc == nil { |
||||
return nil, errors.New("no blockchain is supplied to get shard state") |
||||
} |
||||
shardState, err := bc.ReadShardState(epoch) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
shardState = shardState.DeepCopy() |
||||
|
||||
// TODO(RJ,HB): use real randomness for resharding
|
||||
//blockNumber := GetBlockNumberFromEpoch(epoch.Uint64())
|
||||
//rndSeedBytes := bc.GetVdfByNumber(blockNumber)
|
||||
rndSeed := uint64(0) |
||||
|
||||
return &ShardingState{epoch: epoch.Uint64(), rnd: rndSeed, shardState: shardState, numShards: len(shardState)}, nil |
||||
} |
||||
|
||||
// CalculateNewShardState get sharding state from previous epoch and calculate sharding state for new epoch
|
||||
func CalculateNewShardState(bc *BlockChain, epoch *big.Int) (shard.State, error) { |
||||
if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 { |
||||
return CalculateInitShardState(), nil |
||||
} |
||||
prevEpoch := new(big.Int).Sub(epoch, common.Big1) |
||||
ss, err := GetShardingStateFromBlockChain(bc, prevEpoch) |
||||
if err != nil { |
||||
return nil, ctxerror.New("cannot retrieve previous sharding state"). |
||||
WithCause(err) |
||||
} |
||||
utils.Logger().Info().Float64("percentage", CuckooRate).Msg("Cuckoo Rate") |
||||
return ss.shardState, nil |
||||
} |
||||
|
||||
// TODO ek – shardingSchedule should really be part of a general-purpose network
|
||||
// configuration. We are OK for the time being,
|
||||
// until the day we should let one node process join multiple networks.
|
||||
|
||||
// ShardingSchedule is the sharding configuration schedule.
|
||||
// Depends on the type of the network. Defaults to the mainnet schedule.
|
||||
var ShardingSchedule shardingconfig.Schedule = shardingconfig.MainnetSchedule |
||||
|
||||
// CalculateInitShardState returns the initial shard state at genesis.
|
||||
func CalculateInitShardState() shard.State { |
||||
return CalculateShardState(big.NewInt(GenesisEpoch)) |
||||
} |
||||
|
||||
// CalculateShardState returns the shard state based on epoch number
|
||||
// This api for getting shard state is what should be used to get shard state regardless of
|
||||
// current chain dependency (ex. getting shard state from block header received during cross-shard transaction)
|
||||
func CalculateShardState(epoch *big.Int) shard.State { |
||||
utils.Logger().Info().Int64("epoch", epoch.Int64()).Msg("Get Shard State of Epoch.") |
||||
shardingConfig := ShardingSchedule.InstanceForEpoch(epoch) |
||||
shardNum := int(shardingConfig.NumShards()) |
||||
shardHarmonyNodes := shardingConfig.NumHarmonyOperatedNodesPerShard() |
||||
shardSize := shardingConfig.NumNodesPerShard() |
||||
hmyAccounts := shardingConfig.HmyAccounts() |
||||
fnAccounts := shardingConfig.FnAccounts() |
||||
|
||||
shardState := shard.State{} |
||||
for i := 0; i < shardNum; i++ { |
||||
com := shard.Committee{ShardID: uint32(i)} |
||||
for j := 0; j < shardHarmonyNodes; j++ { |
||||
index := i + j*shardNum // The initial account to use for genesis nodes
|
||||
|
||||
pub := &bls.PublicKey{} |
||||
pub.DeserializeHexStr(hmyAccounts[index].BlsPublicKey) |
||||
pubKey := shard.BlsPublicKey{} |
||||
pubKey.FromLibBLSPublicKey(pub) |
||||
// TODO: directly read address for bls too
|
||||
curNodeID := shard.NodeID{ |
||||
EcdsaAddress: common2.ParseAddr(hmyAccounts[index].Address), |
||||
BlsPublicKey: pubKey, |
||||
} |
||||
com.NodeList = append(com.NodeList, curNodeID) |
||||
} |
||||
|
||||
// add FN runner's key
|
||||
for j := shardHarmonyNodes; j < shardSize; j++ { |
||||
index := i + (j-shardHarmonyNodes)*shardNum |
||||
|
||||
pub := &bls.PublicKey{} |
||||
pub.DeserializeHexStr(fnAccounts[index].BlsPublicKey) |
||||
|
||||
pubKey := shard.BlsPublicKey{} |
||||
pubKey.FromLibBLSPublicKey(pub) |
||||
// TODO: directly read address for bls too
|
||||
curNodeID := shard.NodeID{ |
||||
EcdsaAddress: common2.ParseAddr(fnAccounts[index].Address), |
||||
BlsPublicKey: pubKey, |
||||
} |
||||
com.NodeList = append(com.NodeList, curNodeID) |
||||
} |
||||
shardState = append(shardState, com) |
||||
} |
||||
return shardState |
||||
} |
||||
|
||||
// CalculatePublicKeys returns the publickeys given epoch and shardID
|
||||
func CalculatePublicKeys(epoch *big.Int, shardID uint32) []*bls.PublicKey { |
||||
shardState := CalculateShardState(epoch) |
||||
|
||||
// Update validator public keys
|
||||
committee := shardState.FindCommitteeByID(shardID) |
||||
if committee == nil { |
||||
utils.Logger().Warn().Uint32("shardID", shardID).Uint64("epoch", epoch.Uint64()).Msg("Cannot find committee") |
||||
return nil |
||||
} |
||||
pubKeys := []*bls.PublicKey{} |
||||
for _, node := range committee.NodeList { |
||||
pubKey := &bls.PublicKey{} |
||||
pubKeyBytes := node.BlsPublicKey[:] |
||||
err := pubKey.Deserialize(pubKeyBytes) |
||||
if err != nil { |
||||
utils.Logger().Warn().Str("pubKeyBytes", hex.EncodeToString(pubKeyBytes)).Msg("Cannot Deserialize pubKey") |
||||
return nil |
||||
} |
||||
pubKeys = append(pubKeys, pubKey) |
||||
} |
||||
return pubKeys |
||||
} |
@ -1,12 +0,0 @@ |
||||
## Resharding |
||||
|
||||
In current design, the epoch is defined to be fixed length, the epoch length is a constant parameter BlocksPerEpoch. In future, it will be dynamically adjustable according to security parameter. During the epoch transition, suppose there are N shards, we sort the shards according to the size of active nodes (that had staking for next epoch). The first N/2 larger shards will be called active committees, and the last N/2 smaller shards will be called inactive committees. Don't be confused by |
||||
the name, they are all normal shards with same function. |
||||
|
||||
All the information about sharding will be stored in BeaconChain. A sharding state is defined as a map which maps each NodeID to the ShardID the node belongs to. Every node will have a unique NodeID and be mapped to one ShardID. At the beginning of a new epoch, the BeaconChain leader will propose a new block containing the new sharding state, the new sharding state is uniquely determined by the randomness generated by distributed randomness protocol. During the consensus process, all the validators will perform the same calculation and verify the proposed sharding state is valid. After consensus is reached, each node will write the new sharding state into the block. This block is called epoch block. In current code, it's the first block of each epoch in BeaconChain. |
||||
|
||||
The main function of resharding is CalculcateNewShardState. It will take 3 inputs: newNodeList, oldShardState, randomSeed and output newShardState. |
||||
The newNodeList will be retrieved from BeaconChain staking transaction during the previous epoch. The randomSeed and oldShardState is stored in previous epoch block. It should be noticed that the randomSeed generation currently is mocked. After the distributed randomness protocol(drand) is ready, the drand service will generate the random seed for resharding. |
||||
|
||||
The resharding process is as follows: we first get newNodeList from staking transactions from previous epoch and assign the new nodes evenly into the N/2 active committees. Then, we kick out X% of nodes from each active committees and put these kicked out nodes into inactive committees evenly. The percentage X roughly equals to the percentage of new nodes into active committee in order to balance the committee size. |
||||
|
@ -1,149 +0,0 @@ |
||||
package core |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math/rand" |
||||
"strconv" |
||||
"testing" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
|
||||
"github.com/harmony-one/harmony/shard" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
var ( |
||||
blsPubKey1 = [48]byte{} |
||||
blsPubKey2 = [48]byte{} |
||||
blsPubKey3 = [48]byte{} |
||||
blsPubKey4 = [48]byte{} |
||||
blsPubKey5 = [48]byte{} |
||||
blsPubKey6 = [48]byte{} |
||||
blsPubKey7 = [48]byte{} |
||||
blsPubKey8 = [48]byte{} |
||||
blsPubKey9 = [48]byte{} |
||||
blsPubKey10 = [48]byte{} |
||||
) |
||||
|
||||
func init() { |
||||
copy(blsPubKey1[:], []byte("random key 1")) |
||||
copy(blsPubKey2[:], []byte("random key 2")) |
||||
copy(blsPubKey3[:], []byte("random key 3")) |
||||
copy(blsPubKey4[:], []byte("random key 4")) |
||||
copy(blsPubKey5[:], []byte("random key 5")) |
||||
copy(blsPubKey6[:], []byte("random key 6")) |
||||
copy(blsPubKey7[:], []byte("random key 7")) |
||||
copy(blsPubKey8[:], []byte("random key 8")) |
||||
copy(blsPubKey9[:], []byte("random key 9")) |
||||
copy(blsPubKey10[:], []byte("random key 10")) |
||||
} |
||||
|
||||
func fakeGetInitShardState(numberOfShards, numOfNodes int) shard.State { |
||||
rand.Seed(int64(42)) |
||||
shardState := shard.State{} |
||||
for i := 0; i < numberOfShards; i++ { |
||||
sid := uint32(i) |
||||
com := shard.Committee{ShardID: sid} |
||||
for j := 0; j < numOfNodes; j++ { |
||||
nid := strconv.Itoa(int(rand.Int63())) |
||||
blsPubKey := [48]byte{} |
||||
copy(blsPubKey1[:], []byte(nid)) |
||||
com.NodeList = append(com.NodeList, shard.NodeID{ |
||||
EcdsaAddress: common.BytesToAddress([]byte(nid)), |
||||
BlsPublicKey: blsPubKey, |
||||
}) |
||||
} |
||||
shardState = append(shardState, com) |
||||
} |
||||
return shardState |
||||
} |
||||
|
||||
func fakeNewNodeList(seed int64) []shard.NodeID { |
||||
rand.Seed(seed) |
||||
numNewNodes := rand.Intn(10) |
||||
nodeList := []shard.NodeID{} |
||||
for i := 0; i < numNewNodes; i++ { |
||||
nid := strconv.Itoa(int(rand.Int63())) |
||||
blsPubKey := [48]byte{} |
||||
copy(blsPubKey1[:], []byte(nid)) |
||||
nodeList = append(nodeList, shard.NodeID{ |
||||
EcdsaAddress: common.BytesToAddress([]byte(nid)), |
||||
BlsPublicKey: blsPubKey, |
||||
}) |
||||
} |
||||
return nodeList |
||||
} |
||||
|
||||
func TestFakeNewNodeList(t *testing.T) { |
||||
nodeList := fakeNewNodeList(42) |
||||
fmt.Println("newNodeList: ", nodeList) |
||||
} |
||||
|
||||
func TestShuffle(t *testing.T) { |
||||
nodeList := []shard.NodeID{ |
||||
{EcdsaAddress: common.Address{0x12}, BlsPublicKey: blsPubKey1}, |
||||
{EcdsaAddress: common.Address{0x22}, BlsPublicKey: blsPubKey2}, |
||||
{EcdsaAddress: common.Address{0x32}, BlsPublicKey: blsPubKey3}, |
||||
{EcdsaAddress: common.Address{0x42}, BlsPublicKey: blsPubKey4}, |
||||
{EcdsaAddress: common.Address{0x52}, BlsPublicKey: blsPubKey5}, |
||||
{EcdsaAddress: common.Address{0x62}, BlsPublicKey: blsPubKey6}, |
||||
{EcdsaAddress: common.Address{0x72}, BlsPublicKey: blsPubKey7}, |
||||
{EcdsaAddress: common.Address{0x82}, BlsPublicKey: blsPubKey8}, |
||||
{EcdsaAddress: common.Address{0x92}, BlsPublicKey: blsPubKey9}, |
||||
{EcdsaAddress: common.Address{0x02}, BlsPublicKey: blsPubKey10}, |
||||
} |
||||
|
||||
cpList := []shard.NodeID{} |
||||
cpList = append(cpList, nodeList...) |
||||
Shuffle(nodeList) |
||||
cnt := 0 |
||||
for i := 0; i < 10; i++ { |
||||
if cpList[i] == nodeList[i] { |
||||
cnt++ |
||||
} |
||||
} |
||||
if cnt == 10 { |
||||
t.Error("Shuffle list is the same as original list") |
||||
} |
||||
return |
||||
} |
||||
|
||||
func TestSortCommitteeBySize(t *testing.T) { |
||||
shardState := fakeGetInitShardState(6, 10) |
||||
ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} |
||||
ss.sortCommitteeBySize() |
||||
for i := 0; i < ss.numShards-1; i++ { |
||||
assert.Equal(t, true, len(ss.shardState[i].NodeList) >= len(ss.shardState[i+1].NodeList)) |
||||
} |
||||
} |
||||
|
||||
func TestUpdateShardState(t *testing.T) { |
||||
shardState := fakeGetInitShardState(6, 10) |
||||
ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} |
||||
newNodeList := []shard.NodeID{ |
||||
{EcdsaAddress: common.Address{0x12}, BlsPublicKey: blsPubKey1}, |
||||
{EcdsaAddress: common.Address{0x22}, BlsPublicKey: blsPubKey2}, |
||||
{EcdsaAddress: common.Address{0x32}, BlsPublicKey: blsPubKey3}, |
||||
{EcdsaAddress: common.Address{0x42}, BlsPublicKey: blsPubKey4}, |
||||
{EcdsaAddress: common.Address{0x52}, BlsPublicKey: blsPubKey5}, |
||||
{EcdsaAddress: common.Address{0x62}, BlsPublicKey: blsPubKey6}, |
||||
} |
||||
|
||||
ss.Reshard(newNodeList, 0.2) |
||||
assert.Equal(t, 6, ss.numShards) |
||||
} |
||||
|
||||
func TestAssignNewNodes(t *testing.T) { |
||||
shardState := fakeGetInitShardState(2, 2) |
||||
ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} |
||||
newNodes := []shard.NodeID{ |
||||
{EcdsaAddress: common.Address{0x12}, BlsPublicKey: blsPubKey1}, |
||||
{EcdsaAddress: common.Address{0x22}, BlsPublicKey: blsPubKey2}, |
||||
{EcdsaAddress: common.Address{0x32}, BlsPublicKey: blsPubKey3}, |
||||
} |
||||
|
||||
ss.assignNewNodes(newNodes) |
||||
assert.Equal(t, 2, ss.numShards) |
||||
assert.Equal(t, 5, len(ss.shardState[0].NodeList)) |
||||
} |
@ -1,10 +0,0 @@ |
||||
package values |
||||
|
||||
const ( |
||||
// BeaconChainShardID is the ShardID of the BeaconChain
|
||||
BeaconChainShardID = 0 |
||||
// VotingPowerReduceBlockThreshold roughly corresponds to 3 hours
|
||||
VotingPowerReduceBlockThreshold = 1350 |
||||
// VotingPowerFullReduce roughly corresponds to 12 hours
|
||||
VotingPowerFullReduce = 4 * VotingPowerReduceBlockThreshold |
||||
) |
@ -1,684 +0,0 @@ |
||||
package node |
||||
|
||||
import ( |
||||
"crypto/ecdsa" |
||||
"fmt" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/harmony/accounts" |
||||
"github.com/harmony-one/harmony/api/client" |
||||
clientService "github.com/harmony-one/harmony/api/client/service" |
||||
msg_pb "github.com/harmony-one/harmony/api/proto/message" |
||||
proto_node "github.com/harmony-one/harmony/api/proto/node" |
||||
"github.com/harmony-one/harmony/api/service" |
||||
"github.com/harmony-one/harmony/api/service/syncing" |
||||
"github.com/harmony-one/harmony/api/service/syncing/downloader" |
||||
"github.com/harmony-one/harmony/block" |
||||
"github.com/harmony-one/harmony/consensus" |
||||
"github.com/harmony-one/harmony/contracts" |
||||
"github.com/harmony-one/harmony/core" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/harmony-one/harmony/core/values" |
||||
"github.com/harmony-one/harmony/drand" |
||||
"github.com/harmony-one/harmony/internal/chain" |
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
"github.com/harmony-one/harmony/internal/ctxerror" |
||||
"github.com/harmony-one/harmony/internal/params" |
||||
"github.com/harmony-one/harmony/internal/shardchain" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/msgq" |
||||
"github.com/harmony-one/harmony/node/worker" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
p2p_host "github.com/harmony-one/harmony/p2p/host" |
||||
"github.com/harmony-one/harmony/shard" |
||||
staking "github.com/harmony-one/harmony/staking/types" |
||||
) |
||||
|
||||
// State is a state of a node.
|
||||
type State byte |
||||
|
||||
// All constants except the NodeLeader below are for validators only.
|
||||
const ( |
||||
NodeInit State = iota // Node just started, before contacting BeaconChain
|
||||
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
|
||||
NodeNotInSync // Node out of sync, might be just joined Shard or offline for a period of time
|
||||
NodeOffline // Node is offline
|
||||
NodeReadyForConsensus // Node is ready for doing consensus
|
||||
NodeDoingConsensus // Node is already doing consensus
|
||||
NodeLeader // Node is the leader of some shard.
|
||||
) |
||||
|
||||
const ( |
||||
// TxPoolLimit is the limit of transaction pool.
|
||||
TxPoolLimit = 20000 |
||||
// NumTryBroadCast is the number of times trying to broadcast
|
||||
NumTryBroadCast = 3 |
||||
// ClientRxQueueSize is the number of client messages to queue before tail-dropping.
|
||||
ClientRxQueueSize = 16384 |
||||
// ShardRxQueueSize is the number of shard messages to queue before tail-dropping.
|
||||
ShardRxQueueSize = 16384 |
||||
// GlobalRxQueueSize is the number of global messages to queue before tail-dropping.
|
||||
GlobalRxQueueSize = 16384 |
||||
// ClientRxWorkers is the number of concurrent client message handlers.
|
||||
ClientRxWorkers = 8 |
||||
// ShardRxWorkers is the number of concurrent shard message handlers.
|
||||
ShardRxWorkers = 32 |
||||
// GlobalRxWorkers is the number of concurrent global message handlers.
|
||||
GlobalRxWorkers = 32 |
||||
) |
||||
|
||||
func (state State) String() string { |
||||
switch state { |
||||
case NodeInit: |
||||
return "NodeInit" |
||||
case NodeWaitToJoin: |
||||
return "NodeWaitToJoin" |
||||
case NodeNotInSync: |
||||
return "NodeNotInSync" |
||||
case NodeOffline: |
||||
return "NodeOffline" |
||||
case NodeReadyForConsensus: |
||||
return "NodeReadyForConsensus" |
||||
case NodeDoingConsensus: |
||||
return "NodeDoingConsensus" |
||||
case NodeLeader: |
||||
return "NodeLeader" |
||||
} |
||||
return "Unknown" |
||||
} |
||||
|
||||
const ( |
||||
maxBroadcastNodes = 10 // broadcast at most maxBroadcastNodes peers that need in sync
|
||||
broadcastTimeout int64 = 60 * 1000000000 // 1 mins
|
||||
//SyncIDLength is the length of bytes for syncID
|
||||
SyncIDLength = 20 |
||||
) |
||||
|
||||
// use to push new block to outofsync node
|
||||
type syncConfig struct { |
||||
timestamp int64 |
||||
client *downloader.Client |
||||
} |
||||
|
||||
// Node represents a protocol-participating node in the network
|
||||
type Node struct { |
||||
Consensus *consensus.Consensus // Consensus object containing all Consensus related data (e.g. committee members, signatures, commits)
|
||||
BlockChannel chan *types.Block // The channel to send newly proposed blocks
|
||||
ConfirmedBlockChannel chan *types.Block // The channel to send confirmed blocks
|
||||
BeaconBlockChannel chan *types.Block // The channel to send beacon blocks for non-beaconchain nodes
|
||||
DRand *drand.DRand // The instance for distributed randomness protocol
|
||||
pendingCrossLinks []*block.Header |
||||
pendingClMutex sync.Mutex |
||||
|
||||
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
|
||||
pendingCXMutex sync.Mutex |
||||
|
||||
// Shard databases
|
||||
shardChains shardchain.Collection |
||||
|
||||
Client *client.Client // The presence of a client object means this node will also act as a client
|
||||
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
|
||||
BCPeers []p2p.Peer // list of Beacon Chain Peers. This is needed by all nodes.
|
||||
|
||||
// TODO: Neighbors should store only neighbor nodes in the same shard
|
||||
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
|
||||
numPeers int // Number of Peers
|
||||
State State // State of the Node
|
||||
stateMutex sync.Mutex // mutex for change node state
|
||||
|
||||
// BeaconNeighbors store only neighbor nodes in the beacon chain shard
|
||||
BeaconNeighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
|
||||
|
||||
TxPool *core.TxPool // TODO migrate to TxPool from pendingTransactions list below
|
||||
|
||||
CxPool *core.CxPool // pool for missing cross shard receipts resend
|
||||
|
||||
pendingTransactions map[common.Hash]*types.Transaction // All the transactions received but not yet processed for Consensus
|
||||
pendingTxMutex sync.Mutex |
||||
recentTxsStats types.RecentTxsStats |
||||
|
||||
pendingStakingTransactions map[common.Hash]*staking.StakingTransaction // All the staking transactions received but not yet processed for Consensus
|
||||
pendingStakingTxMutex sync.Mutex |
||||
|
||||
Worker *worker.Worker |
||||
BeaconWorker *worker.Worker // worker for beacon chain
|
||||
|
||||
// Client server (for wallet requests)
|
||||
clientServer *clientService.Server |
||||
|
||||
// Syncing component.
|
||||
syncID [SyncIDLength]byte // a unique ID for the node during the state syncing process with peers
|
||||
downloaderServer *downloader.Server |
||||
stateSync *syncing.StateSync |
||||
beaconSync *syncing.StateSync |
||||
peerRegistrationRecord map[string]*syncConfig // record registration time (unixtime) of peers begin in syncing
|
||||
SyncingPeerProvider SyncingPeerProvider |
||||
|
||||
// syncing frequency parameters
|
||||
syncFreq int |
||||
beaconSyncFreq int |
||||
|
||||
// The p2p host used to send/receive p2p messages
|
||||
host p2p.Host |
||||
|
||||
// Incoming messages to process.
|
||||
clientRxQueue *msgq.Queue |
||||
shardRxQueue *msgq.Queue |
||||
globalRxQueue *msgq.Queue |
||||
|
||||
// Service manager.
|
||||
serviceManager *service.Manager |
||||
|
||||
// Demo account.
|
||||
DemoContractAddress common.Address |
||||
LotteryManagerPrivateKey *ecdsa.PrivateKey |
||||
|
||||
// Puzzle account.
|
||||
PuzzleContractAddress common.Address |
||||
PuzzleManagerPrivateKey *ecdsa.PrivateKey |
||||
|
||||
// For test only; TODO ek – remove this
|
||||
TestBankKeys []*ecdsa.PrivateKey |
||||
|
||||
ContractDeployerKey *ecdsa.PrivateKey |
||||
ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block
|
||||
ContractAddresses []common.Address |
||||
|
||||
// For puzzle contracts
|
||||
AddressNonce sync.Map |
||||
|
||||
// Shard group Message Receiver
|
||||
shardGroupReceiver p2p.GroupReceiver |
||||
|
||||
// Global group Message Receiver, communicate with beacon chain, or cross-shard TX
|
||||
globalGroupReceiver p2p.GroupReceiver |
||||
|
||||
// Client Message Receiver to handle light client messages
|
||||
// Beacon leader needs to use this receiver to talk to new node
|
||||
clientReceiver p2p.GroupReceiver |
||||
|
||||
// Duplicated Ping Message Received
|
||||
duplicatedPing sync.Map |
||||
|
||||
// Channel to notify consensus service to really start consensus
|
||||
startConsensus chan struct{} |
||||
|
||||
// node configuration, including group ID, shard ID, etc
|
||||
NodeConfig *nodeconfig.ConfigType |
||||
|
||||
// Chain configuration.
|
||||
chainConfig params.ChainConfig |
||||
|
||||
// map of service type to its message channel.
|
||||
serviceMessageChan map[service.Type]chan *msg_pb.Message |
||||
|
||||
// Used to call smart contract locally
|
||||
ContractCaller *contracts.ContractCaller |
||||
|
||||
accountManager *accounts.Manager |
||||
|
||||
// Next shard state
|
||||
nextShardState struct { |
||||
// The received master shard state
|
||||
master *shard.EpochShardState |
||||
|
||||
// When for a leader to propose the next shard state,
|
||||
// or for a validator to wait for a proposal before view change.
|
||||
// TODO ek – replace with retry-based logic instead of delay
|
||||
proposeTime time.Time |
||||
} |
||||
|
||||
isFirstTime bool // the node was started with a fresh database
|
||||
// How long in second the leader needs to wait to propose a new block.
|
||||
BlockPeriod time.Duration |
||||
|
||||
// last time consensus reached for metrics
|
||||
lastConsensusTime int64 |
||||
} |
||||
|
||||
// Blockchain returns the blockchain for the node's current shard.
|
||||
func (node *Node) Blockchain() *core.BlockChain { |
||||
shardID := node.NodeConfig.ShardID |
||||
bc, err := node.shardChains.ShardChain(shardID) |
||||
if err != nil { |
||||
utils.Logger().Error(). |
||||
Uint32("shardID", shardID). |
||||
Err(err). |
||||
Msg("cannot get shard chain") |
||||
} |
||||
return bc |
||||
} |
||||
|
||||
// Beaconchain returns the beaconchain from node.
|
||||
func (node *Node) Beaconchain() *core.BlockChain { |
||||
bc, err := node.shardChains.ShardChain(0) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("cannot get beaconchain") |
||||
} |
||||
return bc |
||||
} |
||||
|
||||
func (node *Node) tryBroadcast(tx *types.Transaction) { |
||||
msg := proto_node.ConstructTransactionListMessageAccount(types.Transactions{tx}) |
||||
|
||||
shardGroupID := nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(tx.ShardID())) |
||||
utils.Logger().Info().Str("shardGroupID", string(shardGroupID)).Msg("tryBroadcast") |
||||
|
||||
for attempt := 0; attempt < NumTryBroadCast; attempt++ { |
||||
if err := node.host.SendMessageToGroups([]nodeconfig.GroupID{shardGroupID}, p2p_host.ConstructP2pMessage(byte(0), msg)); err != nil && attempt < NumTryBroadCast { |
||||
utils.Logger().Error().Int("attempt", attempt).Msg("Error when trying to broadcast tx") |
||||
} else { |
||||
break |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Add new transactions to the pending transaction list.
|
||||
func (node *Node) addPendingTransactions(newTxs types.Transactions) { |
||||
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() |
||||
node.pendingTxMutex.Lock() |
||||
for _, tx := range newTxs { |
||||
if _, ok := node.pendingTransactions[tx.Hash()]; !ok { |
||||
node.pendingTransactions[tx.Hash()] = tx |
||||
} |
||||
if len(node.pendingTransactions) > txPoolLimit { |
||||
break |
||||
} |
||||
} |
||||
node.pendingTxMutex.Unlock() |
||||
utils.Logger().Info().Int("length of newTxs", len(newTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more transactions") |
||||
} |
||||
|
||||
// Add new staking transactions to the pending staking transaction list.
|
||||
func (node *Node) addPendingStakingTransactions(newStakingTxs staking.StakingTransactions) { |
||||
txPoolLimit := core.ShardingSchedule.MaxTxPoolSizeLimit() |
||||
node.pendingStakingTxMutex.Lock() |
||||
for _, tx := range newStakingTxs { |
||||
if _, ok := node.pendingStakingTransactions[tx.Hash()]; !ok { |
||||
node.pendingStakingTransactions[tx.Hash()] = tx |
||||
} |
||||
if len(node.pendingStakingTransactions) > txPoolLimit { |
||||
break |
||||
} |
||||
} |
||||
node.pendingStakingTxMutex.Unlock() |
||||
utils.Logger().Info().Int("length of newStakingTxs", len(newStakingTxs)).Int("totalPending", len(node.pendingTransactions)).Msg("Got more staking transactions") |
||||
} |
||||
|
||||
// AddPendingStakingTransaction staking transactions
|
||||
func (node *Node) AddPendingStakingTransaction( |
||||
newStakingTx *staking.StakingTransaction) { |
||||
node.addPendingStakingTransactions(staking.StakingTransactions{newStakingTx}) |
||||
} |
||||
|
||||
// AddPendingTransaction adds one new transaction to the pending transaction list.
|
||||
// This is only called from SDK.
|
||||
func (node *Node) AddPendingTransaction(newTx *types.Transaction) { |
||||
if node.Consensus.IsLeader() && newTx.ShardID() == node.NodeConfig.ShardID { |
||||
node.addPendingTransactions(types.Transactions{newTx}) |
||||
} else { |
||||
utils.Logger().Info().Str("Hash", newTx.Hash().Hex()).Msg("Broadcasting Tx") |
||||
node.tryBroadcast(newTx) |
||||
} |
||||
utils.Logger().Debug().Int("totalPending", len(node.pendingTransactions)).Msg("Got ONE more transaction") |
||||
} |
||||
|
||||
// AddPendingReceipts adds one receipt message to pending list.
|
||||
func (node *Node) AddPendingReceipts(receipts *types.CXReceiptsProof) { |
||||
node.pendingCXMutex.Lock() |
||||
defer node.pendingCXMutex.Unlock() |
||||
|
||||
if receipts.ContainsEmptyField() { |
||||
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("CXReceiptsProof contains empty field") |
||||
return |
||||
} |
||||
|
||||
blockNum := receipts.Header.Number().Uint64() |
||||
shardID := receipts.Header.ShardID() |
||||
key := utils.GetPendingCXKey(shardID, blockNum) |
||||
|
||||
if _, ok := node.pendingCXReceipts[key]; ok { |
||||
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Already Got Same Receipt message") |
||||
return |
||||
} |
||||
node.pendingCXReceipts[key] = receipts |
||||
utils.Logger().Info().Int("totalPendingReceipts", len(node.pendingCXReceipts)).Msg("Got ONE more receipt message") |
||||
} |
||||
|
||||
// Take out a subset of valid transactions from the pending transaction list
|
||||
// Note the pending transaction list will then contain the rest of the txs
|
||||
func (node *Node) getTransactionsForNewBlock(coinbase common.Address) (types.Transactions, staking.StakingTransactions) { |
||||
txsThrottleConfig := core.ShardingSchedule.TxsThrottleConfig() |
||||
|
||||
// the next block number to be added in consensus protocol, which is always one more than current chain header block
|
||||
newBlockNum := node.Blockchain().CurrentBlock().NumberU64() + 1 |
||||
// remove old (> txsThrottleConfigRecentTxDuration) blockNum keys from recentTxsStats and initiailize for the new block
|
||||
for blockNum := range node.recentTxsStats { |
||||
recentTxsBlockNumGap := uint64(txsThrottleConfig.RecentTxDuration / node.BlockPeriod) |
||||
if recentTxsBlockNumGap < newBlockNum-blockNum { |
||||
delete(node.recentTxsStats, blockNum) |
||||
} |
||||
} |
||||
node.recentTxsStats[newBlockNum] = make(types.BlockTxsCounts) |
||||
// Must update to the correct current state before processing potential txns
|
||||
if err := node.Worker.UpdateCurrent(coinbase); err != nil { |
||||
utils.Logger().Error(). |
||||
Err(err). |
||||
Msg("Failed updating worker's state before txn selection") |
||||
return types.Transactions{}, staking.StakingTransactions{} |
||||
} |
||||
|
||||
node.pendingTxMutex.Lock() |
||||
defer node.pendingTxMutex.Unlock() |
||||
node.pendingStakingTxMutex.Lock() |
||||
defer node.pendingStakingTxMutex.Unlock() |
||||
pendingTransactions := types.Transactions{} |
||||
pendingStakingTransactions := staking.StakingTransactions{} |
||||
for _, tx := range node.pendingTransactions { |
||||
pendingTransactions = append(pendingTransactions, tx) |
||||
} |
||||
for _, tx := range node.pendingStakingTransactions { |
||||
pendingStakingTransactions = append(pendingStakingTransactions, tx) |
||||
} |
||||
|
||||
selected, unselected, invalid := node.Worker.SelectTransactionsForNewBlock(newBlockNum, pendingTransactions, node.recentTxsStats, txsThrottleConfig, coinbase) |
||||
|
||||
selectedStaking, unselectedStaking, invalidStaking := |
||||
node.Worker.SelectStakingTransactionsForNewBlock(newBlockNum, pendingStakingTransactions, coinbase) |
||||
|
||||
node.pendingTransactions = make(map[common.Hash]*types.Transaction) |
||||
for _, unselectedTx := range unselected { |
||||
node.pendingTransactions[unselectedTx.Hash()] = unselectedTx |
||||
} |
||||
utils.Logger().Info(). |
||||
Int("remainPending", len(node.pendingTransactions)). |
||||
Int("selected", len(selected)). |
||||
Int("invalidDiscarded", len(invalid)). |
||||
Msg("Selecting Transactions") |
||||
|
||||
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction) |
||||
for _, unselectedStakingTx := range unselectedStaking { |
||||
node.pendingStakingTransactions[unselectedStakingTx.Hash()] = unselectedStakingTx |
||||
} |
||||
utils.Logger().Info(). |
||||
Int("remainPending", len(node.pendingStakingTransactions)). |
||||
Int("selected", len(unselectedStaking)). |
||||
Int("invalidDiscarded", len(invalidStaking)). |
||||
Msg("Selecting Transactions") |
||||
|
||||
return selected, selectedStaking |
||||
} |
||||
|
||||
func (node *Node) startRxPipeline( |
||||
receiver p2p.GroupReceiver, queue *msgq.Queue, numWorkers int, |
||||
) { |
||||
// consumers
|
||||
for i := 0; i < numWorkers; i++ { |
||||
go queue.HandleMessages(node) |
||||
} |
||||
// provider
|
||||
go node.receiveGroupMessage(receiver, queue) |
||||
} |
||||
|
||||
// StartServer starts a server and process the requests by a handler.
|
||||
func (node *Node) StartServer() { |
||||
|
||||
// client messages are sent by clients, like txgen, wallet
|
||||
node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers) |
||||
|
||||
// start the goroutine to receive group message
|
||||
node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers) |
||||
|
||||
// start the goroutine to receive global message, used for cross-shard TX
|
||||
// FIXME (leo): we use beacon client topic as the global topic for now
|
||||
node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers) |
||||
|
||||
select {} |
||||
} |
||||
|
||||
// Count the total number of transactions in the blockchain
|
||||
// Currently used for stats reporting purpose
|
||||
func (node *Node) countNumTransactionsInBlockchain() int { |
||||
count := 0 |
||||
for block := node.Blockchain().CurrentBlock(); block != nil; block = node.Blockchain().GetBlockByHash(block.Header().ParentHash()) { |
||||
count += len(block.Transactions()) |
||||
} |
||||
return count |
||||
} |
||||
|
||||
// GetSyncID returns the syncID of this node
|
||||
func (node *Node) GetSyncID() [SyncIDLength]byte { |
||||
return node.syncID |
||||
} |
||||
|
||||
// New creates a new node.
|
||||
func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node { |
||||
node := Node{} |
||||
|
||||
node.syncFreq = SyncFrequency |
||||
node.beaconSyncFreq = SyncFrequency |
||||
|
||||
// Get the node config that's created in the harmony.go program.
|
||||
if consensusObj != nil { |
||||
node.NodeConfig = nodeconfig.GetShardConfig(consensusObj.ShardID) |
||||
} else { |
||||
node.NodeConfig = nodeconfig.GetDefaultConfig() |
||||
} |
||||
|
||||
copy(node.syncID[:], GenerateRandomString(SyncIDLength)) |
||||
if host != nil { |
||||
node.host = host |
||||
node.SelfPeer = host.GetSelfPeer() |
||||
} |
||||
|
||||
chainConfig := *params.TestnetChainConfig |
||||
switch node.NodeConfig.GetNetworkType() { |
||||
case nodeconfig.Mainnet: |
||||
chainConfig = *params.MainnetChainConfig |
||||
case nodeconfig.Pangaea: |
||||
chainConfig = *params.PangaeaChainConfig |
||||
} |
||||
node.chainConfig = chainConfig |
||||
|
||||
collection := shardchain.NewCollection( |
||||
chainDBFactory, &genesisInitializer{&node}, chain.Engine, &chainConfig) |
||||
if isArchival { |
||||
collection.DisableCache() |
||||
} |
||||
node.shardChains = collection |
||||
|
||||
if host != nil && consensusObj != nil { |
||||
// Consensus and associated channel to communicate blocks
|
||||
node.Consensus = consensusObj |
||||
|
||||
// Load the chains.
|
||||
blockchain := node.Blockchain() // this also sets node.isFirstTime if the DB is fresh
|
||||
beaconChain := node.Beaconchain() |
||||
|
||||
node.BlockChannel = make(chan *types.Block) |
||||
node.ConfirmedBlockChannel = make(chan *types.Block) |
||||
node.BeaconBlockChannel = make(chan *types.Block) |
||||
node.recentTxsStats = make(types.RecentTxsStats) |
||||
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, node.Blockchain().Config(), blockchain) |
||||
node.CxPool = core.NewCxPool(core.CxPoolSize) |
||||
node.Worker = worker.New(node.Blockchain().Config(), blockchain, chain.Engine) |
||||
|
||||
if node.Blockchain().ShardID() != values.BeaconChainShardID { |
||||
node.BeaconWorker = worker.New(node.Beaconchain().Config(), beaconChain, chain.Engine) |
||||
} |
||||
|
||||
node.pendingCXReceipts = make(map[string]*types.CXReceiptsProof) |
||||
node.pendingTransactions = make(map[common.Hash]*types.Transaction) |
||||
node.pendingStakingTransactions = make(map[common.Hash]*staking.StakingTransaction) |
||||
node.Consensus.VerifiedNewBlock = make(chan *types.Block) |
||||
// the sequence number is the next block number to be added in consensus protocol, which is always one more than current chain header block
|
||||
node.Consensus.SetBlockNum(blockchain.CurrentBlock().NumberU64() + 1) |
||||
|
||||
// Add Faucet contract to all shards, so that on testnet, we can demo wallet in explorer
|
||||
// TODO (leo): we need to have support of cross-shard tx later so that the token can be transferred from beacon chain shard to other tx shards.
|
||||
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { |
||||
if node.isFirstTime { |
||||
// Setup one time smart contracts
|
||||
node.AddFaucetContractToPendingTransactions() |
||||
} else { |
||||
node.AddContractKeyAndAddress(scFaucet) |
||||
} |
||||
node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), node.Blockchain().Config()) |
||||
// Create test keys. Genesis will later need this.
|
||||
var err error |
||||
node.TestBankKeys, err = CreateTestBankKeys(TestAccountNumber) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Error while creating test keys") |
||||
} |
||||
} |
||||
} |
||||
|
||||
utils.Logger().Info(). |
||||
Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)). |
||||
Msg("Genesis block hash") |
||||
|
||||
node.clientRxQueue = msgq.New(ClientRxQueueSize) |
||||
node.shardRxQueue = msgq.New(ShardRxQueueSize) |
||||
node.globalRxQueue = msgq.New(GlobalRxQueueSize) |
||||
|
||||
// Setup initial state of syncing.
|
||||
node.peerRegistrationRecord = make(map[string]*syncConfig) |
||||
|
||||
node.startConsensus = make(chan struct{}) |
||||
|
||||
go node.bootstrapConsensus() |
||||
|
||||
return &node |
||||
} |
||||
|
||||
// CalculateInitShardState initialize shard state from latest epoch and update committee pub keys for consensus and drand
|
||||
func (node *Node) CalculateInitShardState() (err error) { |
||||
if node.Consensus == nil { |
||||
utils.Logger().Error().Msg("[CalculateInitShardState] consenus is nil; Cannot figure out shardID") |
||||
return ctxerror.New("[CalculateInitShardState] consenus is nil; Cannot figure out shardID") |
||||
} |
||||
shardID := node.Consensus.ShardID |
||||
|
||||
// Get genesis epoch shard state from chain
|
||||
blockNum := node.Blockchain().CurrentBlock().NumberU64() |
||||
node.Consensus.SetMode(consensus.Listening) |
||||
epoch := core.ShardingSchedule.CalcEpochNumber(blockNum) |
||||
utils.Logger().Info(). |
||||
Uint64("blockNum", blockNum). |
||||
Uint32("shardID", shardID). |
||||
Uint64("epoch", epoch.Uint64()). |
||||
Msg("[CalculateInitShardState] Try To Get PublicKeys from database") |
||||
pubKeys := core.CalculatePublicKeys(epoch, shardID) |
||||
if len(pubKeys) == 0 { |
||||
utils.Logger().Error(). |
||||
Uint32("shardID", shardID). |
||||
Uint64("blockNum", blockNum). |
||||
Msg("[CalculateInitShardState] PublicKeys is Empty, Cannot update public keys") |
||||
return ctxerror.New( |
||||
"[CalculateInitShardState] PublicKeys is Empty, Cannot update public keys", |
||||
"shardID", shardID, |
||||
"blockNum", blockNum) |
||||
} |
||||
|
||||
for _, key := range pubKeys { |
||||
if key.IsEqual(node.Consensus.PubKey) { |
||||
utils.Logger().Info(). |
||||
Uint64("blockNum", blockNum). |
||||
Int("numPubKeys", len(pubKeys)). |
||||
Msg("[CalculateInitShardState] Successfully updated public keys") |
||||
node.Consensus.UpdatePublicKeys(pubKeys) |
||||
node.Consensus.SetMode(consensus.Normal) |
||||
return nil |
||||
} |
||||
} |
||||
// TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection.
|
||||
// node.DRand.UpdatePublicKeys(pubKeys)
|
||||
return nil |
||||
} |
||||
|
||||
// AddPeers adds neighbors nodes
|
||||
func (node *Node) AddPeers(peers []*p2p.Peer) int { |
||||
count := 0 |
||||
for _, p := range peers { |
||||
key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID) |
||||
_, ok := node.Neighbors.LoadOrStore(key, *p) |
||||
if !ok { |
||||
// !ok means new peer is stored
|
||||
count++ |
||||
node.host.AddPeer(p) |
||||
node.numPeers++ |
||||
continue |
||||
} |
||||
} |
||||
|
||||
return count |
||||
} |
||||
|
||||
// AddBeaconPeer adds beacon chain neighbors nodes
|
||||
// Return false means new neighbor peer was added
|
||||
// Return true means redundant neighbor peer wasn't added
|
||||
func (node *Node) AddBeaconPeer(p *p2p.Peer) bool { |
||||
key := fmt.Sprintf("%s:%s:%s", p.IP, p.Port, p.PeerID) |
||||
_, ok := node.BeaconNeighbors.LoadOrStore(key, *p) |
||||
return ok |
||||
} |
||||
|
||||
// isBeacon = true if the node is beacon node
|
||||
// isClient = true if the node light client(wallet)
|
||||
func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { |
||||
chanPeer := make(chan p2p.Peer) |
||||
|
||||
nodeConfig := service.NodeConfig{ |
||||
PushgatewayIP: node.NodeConfig.GetPushgatewayIP(), |
||||
PushgatewayPort: node.NodeConfig.GetPushgatewayPort(), |
||||
IsClient: node.NodeConfig.IsClient(), |
||||
Beacon: nodeconfig.NewGroupIDByShardID(0), |
||||
ShardGroupID: node.NodeConfig.GetShardGroupID(), |
||||
Actions: make(map[nodeconfig.GroupID]nodeconfig.ActionType), |
||||
} |
||||
|
||||
if nodeConfig.IsClient { |
||||
nodeConfig.Actions[nodeconfig.NewClientGroupIDByShardID(0)] = nodeconfig.ActionStart |
||||
} else { |
||||
nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = nodeconfig.ActionStart |
||||
} |
||||
|
||||
var err error |
||||
node.shardGroupReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetShardGroupID()) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Failed to create shard receiver") |
||||
} |
||||
|
||||
node.globalGroupReceiver, err = node.host.GroupReceiver(nodeconfig.NewClientGroupIDByShardID(0)) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Failed to create global receiver") |
||||
} |
||||
|
||||
node.clientReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetClientGroupID()) |
||||
if err != nil { |
||||
utils.Logger().Error().Err(err).Msg("Failed to create client receiver") |
||||
} |
||||
return nodeConfig, chanPeer |
||||
} |
||||
|
||||
// AccountManager ...
|
||||
func (node *Node) AccountManager() *accounts.Manager { |
||||
return node.accountManager |
||||
} |
||||
|
||||
// ServiceManager ...
|
||||
func (node *Node) ServiceManager() *service.Manager { |
||||
return node.serviceManager |
||||
} |
||||
|
||||
// SetSyncFreq sets the syncing frequency in the loop
|
||||
func (node *Node) SetSyncFreq(syncFreq int) { |
||||
node.syncFreq = syncFreq |
||||
} |
||||
|
||||
// SetBeaconSyncFreq sets the syncing frequency in the loop
|
||||
func (node *Node) SetBeaconSyncFreq(syncFreq int) { |
||||
node.beaconSyncFreq = syncFreq |
||||
} |
@ -0,0 +1,261 @@ |
||||
package committee |
||||
|
||||
import ( |
||||
"math/big" |
||||
"sort" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/harmony-one/bls/ffi/go/bls" |
||||
"github.com/harmony-one/harmony/block" |
||||
common2 "github.com/harmony-one/harmony/internal/common" |
||||
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" |
||||
"github.com/harmony-one/harmony/internal/ctxerror" |
||||
"github.com/harmony-one/harmony/internal/params" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/numeric" |
||||
"github.com/harmony-one/harmony/shard" |
||||
staking "github.com/harmony-one/harmony/staking/types" |
||||
) |
||||
|
||||
// StateID means reading off whole network when using calls that accept
|
||||
// a shardID parameter
|
||||
const StateID = -1 |
||||
|
||||
// ValidatorList ..
|
||||
type ValidatorList interface { |
||||
Compute( |
||||
epoch *big.Int, config params.ChainConfig, reader StakingCandidatesReader, |
||||
) (shard.State, error) |
||||
ReadFromDB(epoch *big.Int, reader ChainReader) (shard.State, error) |
||||
} |
||||
|
||||
// PublicKeys per epoch
|
||||
type PublicKeys interface { |
||||
// If call shardID with StateID then only superCommittee is non-nil,
|
||||
// otherwise get back the shardSpecific slice as well.
|
||||
ComputePublicKeys( |
||||
epoch *big.Int, reader ChainReader, shardID int, |
||||
) (superCommittee, shardSpecific []*bls.PublicKey) |
||||
|
||||
ReadPublicKeysFromDB( |
||||
hash common.Hash, reader ChainReader, |
||||
) ([]*bls.PublicKey, error) |
||||
} |
||||
|
||||
// Reader ..
|
||||
type Reader interface { |
||||
PublicKeys |
||||
ValidatorList |
||||
} |
||||
|
||||
// StakingCandidatesReader ..
|
||||
type StakingCandidatesReader interface { |
||||
ValidatorInformation(addr common.Address) (*staking.Validator, error) |
||||
ValidatorStakingWithDelegation(addr common.Address) numeric.Dec |
||||
ValidatorCandidates() []common.Address |
||||
} |
||||
|
||||
// ChainReader is a subset of Engine.ChainReader, just enough to do assignment
|
||||
type ChainReader interface { |
||||
// ReadShardState retrieves sharding state given the epoch number.
|
||||
// This api reads the shard state cached or saved on the chaindb.
|
||||
// Thus, only should be used to read the shard state of the current chain.
|
||||
ReadShardState(epoch *big.Int) (shard.State, error) |
||||
// GetHeader retrieves a block header from the database by hash and number.
|
||||
GetHeaderByHash(common.Hash) *block.Header |
||||
// Config retrieves the blockchain's chain configuration.
|
||||
Config() *params.ChainConfig |
||||
} |
||||
|
||||
type partialStakingEnabled struct{} |
||||
|
||||
var ( |
||||
// WithStakingEnabled ..
|
||||
WithStakingEnabled Reader = partialStakingEnabled{} |
||||
) |
||||
|
||||
func preStakingEnabledCommittee(s shardingconfig.Instance) shard.State { |
||||
shardNum := int(s.NumShards()) |
||||
shardHarmonyNodes := s.NumHarmonyOperatedNodesPerShard() |
||||
shardSize := s.NumNodesPerShard() |
||||
hmyAccounts := s.HmyAccounts() |
||||
fnAccounts := s.FnAccounts() |
||||
shardState := shard.State{} |
||||
for i := 0; i < shardNum; i++ { |
||||
com := shard.Committee{ShardID: uint32(i)} |
||||
for j := 0; j < shardHarmonyNodes; j++ { |
||||
index := i + j*shardNum // The initial account to use for genesis nodes
|
||||
pub := &bls.PublicKey{} |
||||
pub.DeserializeHexStr(hmyAccounts[index].BlsPublicKey) |
||||
pubKey := shard.BlsPublicKey{} |
||||
pubKey.FromLibBLSPublicKey(pub) |
||||
// TODO: directly read address for bls too
|
||||
curNodeID := shard.NodeID{ |
||||
common2.ParseAddr(hmyAccounts[index].Address), |
||||
pubKey, |
||||
nil, |
||||
} |
||||
com.NodeList = append(com.NodeList, curNodeID) |
||||
} |
||||
// add FN runner's key
|
||||
for j := shardHarmonyNodes; j < shardSize; j++ { |
||||
index := i + (j-shardHarmonyNodes)*shardNum |
||||
pub := &bls.PublicKey{} |
||||
pub.DeserializeHexStr(fnAccounts[index].BlsPublicKey) |
||||
pubKey := shard.BlsPublicKey{} |
||||
pubKey.FromLibBLSPublicKey(pub) |
||||
// TODO: directly read address for bls too
|
||||
curNodeID := shard.NodeID{ |
||||
common2.ParseAddr(fnAccounts[index].Address), |
||||
pubKey, |
||||
nil, |
||||
} |
||||
com.NodeList = append(com.NodeList, curNodeID) |
||||
} |
||||
shardState = append(shardState, com) |
||||
} |
||||
return shardState |
||||
} |
||||
|
||||
func with400Stakers( |
||||
s shardingconfig.Instance, stakerReader StakingCandidatesReader, |
||||
) (shard.State, error) { |
||||
// TODO Nervous about this because overtime the list will become quite large
|
||||
candidates := stakerReader.ValidatorCandidates() |
||||
stakers := make([]*staking.Validator, len(candidates)) |
||||
for i := range candidates { |
||||
// TODO Should be using .ValidatorStakingWithDelegation, not implemented yet
|
||||
validator, err := stakerReader.ValidatorInformation(candidates[i]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
stakers[i] = validator |
||||
} |
||||
|
||||
sort.SliceStable( |
||||
stakers, |
||||
func(i, j int) bool { return stakers[i].Stake.Cmp(stakers[j].Stake) >= 0 }, |
||||
) |
||||
const sCount = 401 |
||||
top := stakers[:sCount] |
||||
shardCount := int(s.NumShards()) |
||||
superComm := make(shard.State, shardCount) |
||||
fillCount := make([]int, shardCount) |
||||
// TODO Finish this logic, not correct, need to operate EPoS on slot level,
|
||||
// not validator level
|
||||
|
||||
for i := 0; i < shardCount; i++ { |
||||
superComm[i] = shard.Committee{} |
||||
superComm[i].NodeList = make(shard.NodeIDList, s.NumNodesPerShard()) |
||||
} |
||||
|
||||
scratchPad := &bls.PublicKey{} |
||||
|
||||
for i := range top { |
||||
spot := int(top[i].Address.Big().Int64()) % shardCount |
||||
fillCount[spot]++ |
||||
// scratchPad.DeserializeHexStr()
|
||||
pubKey := shard.BlsPublicKey{} |
||||
pubKey.FromLibBLSPublicKey(scratchPad) |
||||
superComm[spot].NodeList = append( |
||||
superComm[spot].NodeList, |
||||
shard.NodeID{ |
||||
top[i].Address, |
||||
pubKey, |
||||
&shard.StakedMember{big.NewInt(0)}, |
||||
}, |
||||
) |
||||
} |
||||
|
||||
utils.Logger().Info().Ints("distribution of Stakers in Shards", fillCount) |
||||
return superComm, nil |
||||
} |
||||
|
||||
func (def partialStakingEnabled) ReadPublicKeysFromDB( |
||||
h common.Hash, reader ChainReader, |
||||
) ([]*bls.PublicKey, error) { |
||||
header := reader.GetHeaderByHash(h) |
||||
shardID := header.ShardID() |
||||
superCommittee, err := reader.ReadShardState(header.Epoch()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
subCommittee := superCommittee.FindCommitteeByID(shardID) |
||||
if subCommittee == nil { |
||||
return nil, ctxerror.New("cannot find shard in the shard state", |
||||
"blockNumber", header.Number(), |
||||
"shardID", header.ShardID(), |
||||
) |
||||
} |
||||
committerKeys := []*bls.PublicKey{} |
||||
|
||||
for i := range subCommittee.NodeList { |
||||
committerKey := new(bls.PublicKey) |
||||
err := subCommittee.NodeList[i].BlsPublicKey.ToLibBLSPublicKey(committerKey) |
||||
if err != nil { |
||||
return nil, ctxerror.New("cannot convert BLS public key", |
||||
"blsPublicKey", subCommittee.NodeList[i].BlsPublicKey).WithCause(err) |
||||
} |
||||
committerKeys = append(committerKeys, committerKey) |
||||
} |
||||
return committerKeys, nil |
||||
|
||||
return nil, nil |
||||
} |
||||
|
||||
// ReadPublicKeysFromChain produces publicKeys of entire supercommittee per epoch, optionally providing a
|
||||
// shard specific subcommittee
|
||||
func (def partialStakingEnabled) ComputePublicKeys( |
||||
epoch *big.Int, reader ChainReader, shardID int, |
||||
) ([]*bls.PublicKey, []*bls.PublicKey) { |
||||
config := reader.Config() |
||||
instance := shard.Schedule.InstanceForEpoch(epoch) |
||||
if !config.IsStaking(epoch) { |
||||
superComm := preStakingEnabledCommittee(instance) |
||||
spot := 0 |
||||
allIdentities := make([]*bls.PublicKey, int(instance.NumShards())*instance.NumNodesPerShard()) |
||||
for i := range superComm { |
||||
for j := range superComm[i].NodeList { |
||||
identity := &bls.PublicKey{} |
||||
superComm[i].NodeList[j].BlsPublicKey.ToLibBLSPublicKey(identity) |
||||
allIdentities[spot] = identity |
||||
spot++ |
||||
} |
||||
} |
||||
|
||||
if shardID == StateID { |
||||
return allIdentities, nil |
||||
} |
||||
|
||||
subCommittee := superComm.FindCommitteeByID(uint32(shardID)) |
||||
subCommitteeIdentities := make([]*bls.PublicKey, len(subCommittee.NodeList)) |
||||
spot = 0 |
||||
for i := range subCommittee.NodeList { |
||||
identity := &bls.PublicKey{} |
||||
subCommittee.NodeList[i].BlsPublicKey.ToLibBLSPublicKey(identity) |
||||
subCommitteeIdentities[spot] = identity |
||||
spot++ |
||||
} |
||||
|
||||
return allIdentities, subCommitteeIdentities |
||||
} |
||||
// TODO Implement for the staked case
|
||||
return nil, nil |
||||
} |
||||
|
||||
func (def partialStakingEnabled) ReadFromDB( |
||||
epoch *big.Int, reader ChainReader, |
||||
) (newSuperComm shard.State, err error) { |
||||
return reader.ReadShardState(epoch) |
||||
} |
||||
|
||||
// ReadFromComputation is single entry point for reading the State of the network
|
||||
func (def partialStakingEnabled) Compute( |
||||
epoch *big.Int, config params.ChainConfig, stakerReader StakingCandidatesReader, |
||||
) (newSuperComm shard.State, err error) { |
||||
instance := shard.Schedule.InstanceForEpoch(epoch) |
||||
if !config.IsStaking(epoch) { |
||||
return preStakingEnabledCommittee(instance), nil |
||||
} |
||||
return with400Stakers(instance, stakerReader) |
||||
} |
@ -0,0 +1,19 @@ |
||||
package shard |
||||
|
||||
import ( |
||||
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" |
||||
) |
||||
|
||||
const ( |
||||
// BeaconChainShardID is the ShardID of the BeaconChain
|
||||
BeaconChainShardID = 0 |
||||
) |
||||
|
||||
// TODO ek – Schedule should really be part of a general-purpose network
|
||||
// configuration. We are OK for the time being,
|
||||
// until the day we should let one node process join multiple networks.
|
||||
var ( |
||||
// Schedule is the sharding configuration schedule.
|
||||
// Depends on the type of the network. Defaults to the mainnet schedule.
|
||||
Schedule shardingconfig.Schedule = shardingconfig.MainnetSchedule |
||||
) |
Loading…
Reference in new issue