diff --git a/beaconchain/beaconchain.go b/beaconchain/beaconchain.go index b3eaddb43..77a2ebc68 100644 --- a/beaconchain/beaconchain.go +++ b/beaconchain/beaconchain.go @@ -29,7 +29,7 @@ type BeaconChain struct { NumberOfLeadersAdded int } -//Init +// New return BeaconChain. func New(filename string) *BeaconChain { idc := BeaconChain{} //idc.NumberOfShards = readConfigFile(filename) @@ -49,7 +49,7 @@ func generateIDCKeys() kyber.Point { return pubkey } -//AcceptConnections welcomes new connections +// AcceptConnections welcomes new connections func (IDC *BeaconChain) AcceptConnections(b []byte) { NewNode := node.DeserializeNode(b) fmt.Println(NewNode) @@ -62,10 +62,11 @@ func (IDC *BeaconChain) registerNode(Node *node.Node) { return } -func (IDC *BeaconChain) CommunicatePublicKeyToNode(Peer p2p.Peer) { +// CommunicatePublicKeyToNode communicates public key to node. +func (IDC *BeaconChain) CommunicatePublicKeyToNode(peer p2p.Peer) { pbkey := pki.GetBytesFromPublicKey(IDC.PubKey) msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, pbkey[:]) - p2p.SendMessage(Peer, msgToSend) + p2p.SendMessage(peer, msgToSend) } //StartServer a server and process the request by a handler. diff --git a/beaconchain/beaconchain_handler.go b/beaconchain/beaconchain_handler.go index 39f5f3ae2..a7a2f0127 100644 --- a/beaconchain/beaconchain_handler.go +++ b/beaconchain/beaconchain_handler.go @@ -10,16 +10,15 @@ import ( proto_identity "github.com/harmony-one/harmony/proto/identity" ) -//BeaconChainHandler handles registration of new Identities +// BeaconChainHandler handles registration of new Identities // This could have been its seperate package like consensus, but am avoiding creating a lot of packages. func (IDC *BeaconChain) BeaconChainHandler(conn net.Conn) { content, err := p2p.ReadMessageContent(conn) if err != nil { IDC.log.Error("Read p2p data failed") return - } else { - IDC.log.Info("received connection") } + IDC.log.Info("received connection") msgCategory, err := proto.GetMessageCategory(content) if err != nil { IDC.log.Error("Read message category failed", "err", err) @@ -48,7 +47,7 @@ func (IDC *BeaconChain) BeaconChainHandler(conn net.Conn) { } switch msgCategory { case proto.Identity: - actionType := proto_identity.IdentityMessageType(msgType) + actionType := proto_identity.IDMessageType(msgType) switch actionType { case proto_identity.Identity: idMsgType, err := proto_identity.GetIdentityMessageType(msgPayload) diff --git a/benchmark.go b/benchmark.go index 1451d5874..419d3d912 100644 --- a/benchmark.go +++ b/benchmark.go @@ -168,7 +168,7 @@ func main() { } // Consensus object. - consensus := consensus.NewConsensus(*ip, *port, shardID, peers, leader) + consensus := consensus.New(selfPeer, shardID, peers, leader) consensus.MinPeers = *minPeers // Start Profiler for leader if profile argument is on @@ -183,7 +183,7 @@ func main() { // Set logger to attack model. attack.GetInstance().SetLogger(consensus.Log) // Current node. - currentNode := node.New(consensus, ldb) + currentNode := node.New(consensus, ldb, selfPeer) // Add self peer. currentNode.SelfPeer = selfPeer // Add sync node configuration. @@ -224,9 +224,7 @@ func main() { } } else { if *peerDisvoery { - go func() { - currentNode.JoinShard(leader) - }() + go currentNode.JoinShard(leader) } } diff --git a/blockchain/merkle_tree_test.go b/blockchain/merkle_tree_test.go index 04a7adbee..70ce44ddc 100644 --- a/blockchain/merkle_tree_test.go +++ b/blockchain/merkle_tree_test.go @@ -13,7 +13,7 @@ func TestNewMerkleNode(t *testing.T) { []byte("node3"), } - fmt.Println("TEting") + fmt.Println("Testing") // Level 1 n1 := NewMerkleNode(nil, nil, data[0]) diff --git a/client/txgen/main.go b/client/txgen/main.go index a952ad1f7..425323bcf 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/client/txgen/txgen" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/p2pv2" "github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/client" @@ -79,18 +80,19 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} for shardID := range shardIDLeaderMap { - node := node.New(&consensus.Consensus{ShardID: shardID}, nil) + node := node.New(&consensus.Consensus{ShardID: shardID}, nil, p2p.Peer{}) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.NumOfAddress) nodes = append(nodes, node) } // Client/txgenerator server node setup - clientPort := config.GetClientPort() - consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) - clientNode := node.New(consensusObj, nil) + clientPeer := config.GetClientPeer() + consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{}) + clientNode := node.New(consensusObj, nil, *clientPeer) - if clientPort != "" { + if clientPeer != nil { + p2pv2.InitHost(clientPeer.IP, clientPeer.Port) // TODO: this should be moved into client node. clientNode.Client = client.NewClient(&shardIDLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders @@ -98,15 +100,15 @@ func main() { log.Debug("Received new block from leader", "len", len(blocks)) for _, block := range blocks { for _, node := range nodes { - shardId := block.ShardID + shardID := block.ShardID accountBlock := new(types.Block) err := rlp.DecodeBytes(block.AccountBlock, accountBlock) if err == nil { - shardId = accountBlock.ShardId() + shardID = accountBlock.ShardId() } - if node.Consensus.ShardID == shardId { - log.Debug("Adding block from leader", "shardID", shardId) + if node.Consensus.ShardID == shardID { + log.Debug("Adding block from leader", "shardID", shardID) // Add it to blockchain node.AddNewBlock(block) utxoPoolMutex.Lock() @@ -133,10 +135,9 @@ func main() { // Start the client server to listen to leader's message go func() { - clientNode.StartServer(clientPort) + clientNode.StartServer(clientPeer.Port) }() } - // Transaction generation process time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() @@ -159,7 +160,7 @@ func main() { utxoPoolMutex.Lock() log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions + for shardID := range shardIDLeaderMap { // Generate simulated transactions go func(shardID uint32) { txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) @@ -200,12 +201,12 @@ func main() { utxoPoolMutex.Lock() log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions + for shardID := range shardIDLeaderMap { // Generate simulated transactions go func(shardID uint32) { txs, crossTxs := txgen.GenerateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes, setting) // Put cross shard tx into a pending list waiting for proofs from leaders - if clientPort != "" { + if clientPeer != nil { clientNode.Client.PendingCrossTxsMutex.Lock() for _, tx := range crossTxs { clientNode.Client.PendingCrossTxs[tx.ID] = tx @@ -217,7 +218,7 @@ func main() { // Put txs into corresponding shards shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) for _, crossTx := range crossTxs { - for curShardID, _ := range client.GetInputShardIDsOfCrossShardTx(crossTx) { + for curShardID := range client.GetInputShardIDsOfCrossShardTx(crossTx) { shardIDTxsMap[curShardID] = append(shardIDTxsMap[curShardID], crossTx) } } @@ -248,12 +249,14 @@ func main() { time.Sleep(3000 * time.Millisecond) } +// SendTxsToLeader sends txs to leader. func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) { log.Debug("[Generator] Sending txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessage(txs) p2p.SendMessage(leader, msg) } +// SendTxsToLeaderAccount sends txs to leader account. func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) { log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessageAccount(txs) diff --git a/client/txgen/txgen/account_txs_generator.go b/client/txgen/txgen/account_txs_generator.go index 8b007a4c8..035c0a03c 100644 --- a/client/txgen/txgen/account_txs_generator.go +++ b/client/txgen/txgen/account_txs_generator.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/node" ) +// TxGenSettings is the settings for TX generation. type TxGenSettings struct { NumOfAddress int CrossShard bool @@ -16,6 +17,7 @@ type TxGenSettings struct { CrossShardRatio int } +// GenerateSimulatedTransactionsAccount generates simulated transaction for account model. func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, setting TxGenSettings) (types.Transactions, types.Transactions) { _ = setting // TODO: take use of settings node := dataNodes[shardID] diff --git a/client/txgen/txgen/utxo_txs_generator.go b/client/txgen/txgen/utxo_txs_generator.go index 852ca40f6..30bb4be54 100644 --- a/client/txgen/txgen/utxo_txs_generator.go +++ b/client/txgen/txgen/utxo_txs_generator.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/node" ) +// TxInfo is the transaction info. type TxInfo struct { // Global Input shardID int @@ -27,7 +28,7 @@ type TxInfo struct { txCount int } -// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. +// GenerateSimulatedTransactions generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. // The transactions are generated by going through the existing utxos and // randomly select a subset of them as the input for each new transaction. The output // address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses. @@ -39,13 +40,13 @@ type TxInfo struct { // token (1000) to each address in [0 - N). See node.AddTestingAddresses() // // Params: -// subsetId - the which subset of the utxo to work on (used to select addresses) +// subsetID - the which subset of the utxo to work on (used to select addresses) // shardID - the shardID for current shard // dataNodes - nodes containing utxopools of all shards // Returns: // all single-shard txs // all cross-shard txs -func GenerateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node, setting TxGenSettings) ([]*blockchain.Transaction, []*blockchain.Transaction) { +func GenerateSimulatedTransactions(subsetID, numSubset int, shardID int, dataNodes []*node.Node, setting TxGenSettings) ([]*blockchain.Transaction, []*blockchain.Transaction) { /* UTXO map structure: address - [ @@ -68,7 +69,7 @@ func GenerateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNod UTXOLOOP: // Loop over all addresses for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { - if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time + if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetID%numSubset { // Work on one subset of utxo at a time txInfo.address = address // Loop over all txIDs for the address for txIDStr, utxoMap := range txMap { @@ -82,7 +83,7 @@ UTXOLOOP: // Loop over all utxos for the txID utxoSize := len(utxoMap) batchSize := utxoSize / numSubset - i := subsetId % numSubset + i := subsetID % numSubset counter := 0 for index, value := range utxoMap { counter++ diff --git a/client/wallet/main.go b/client/wallet/main.go index 866dbe89c..fc6cfa79c 100644 --- a/client/wallet/main.go +++ b/client/wallet/main.go @@ -244,6 +244,7 @@ func getShardIDToLeaderMap() map[uint32]p2p.Peer { return shardIDLeaderMap } +// CreateWalletServerNode creates wallet server node. func CreateWalletServerNode() *node.Node { configr := client_config.NewConfig() var shardIDLeaderMap map[uint32]p2p.Peer @@ -256,13 +257,13 @@ func CreateWalletServerNode() *node.Node { shardIDLeaderMap = getShardIDToLeaderMap() clientPeer = &p2p.Peer{Port: "127.0.0.1", IP: "1234"} } - walletNode := node.New(nil, nil) + walletNode := node.New(nil, nil, *clientPeer) // TODO(ricl): shouldn't the selfPeer for client being clientPeer?? walletNode.Client = client.NewClient(&shardIDLeaderMap) walletNode.ClientPeer = clientPeer return walletNode } -// Issue the transaction to the Harmony network +// ExecuteTransaction issues the transaction to the Harmony network func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error { if tx.IsCrossShard() { walletNode.Client.PendingCrossTxsMutex.Lock() @@ -292,7 +293,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } } -// Fetch utxos of specified address from the Harmony network +// FetchUtxos fetches utxos of specified address from the Harmony network func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) @@ -316,6 +317,7 @@ func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockch } } +// PrintUtxoBalance prints utxo balance. func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { addressBalance := make(map[[20]byte]int) for _, utxoMap := range shardUtxoMap { @@ -338,7 +340,7 @@ func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { } } -// Read the addresses stored in local keystore +// ReadAddresses reads the addresses stored in local keystore func ReadAddresses() [][20]byte { priKeys := ReadPrivateKeys() addresses := [][20]byte{} @@ -348,7 +350,7 @@ func ReadAddresses() [][20]byte { return addresses } -// Store the specified private key in local keystore +// StorePrivateKey stores the specified private key in local keystore func StorePrivateKey(priKey []byte) { for _, address := range ReadAddresses() { if address == pki.GetAddressFromPrivateKey(crypto.Ed25519Curve.Scalar().SetBytes(priKey)) { @@ -369,12 +371,12 @@ func StorePrivateKey(priKey []byte) { f.Close() } -// Delete all data in the local keystore +// ClearKeystore deletes all data in the local keystore func ClearKeystore() { ioutil.WriteFile("keystore", []byte{}, 0644) } -// Read all the private key stored in local keystore +// ReadPrivateKeys reads all the private key stored in local keystore func ReadPrivateKeys() []kyber.Scalar { keys, err := ioutil.ReadFile("keystore") if err != nil { diff --git a/client/wallet_v2/main.go b/client/wallet_v2/main.go index 866dbe89c..d07912127 100644 --- a/client/wallet_v2/main.go +++ b/client/wallet_v2/main.go @@ -244,6 +244,7 @@ func getShardIDToLeaderMap() map[uint32]p2p.Peer { return shardIDLeaderMap } +// CreateWalletServerNode creates wallet server node. func CreateWalletServerNode() *node.Node { configr := client_config.NewConfig() var shardIDLeaderMap map[uint32]p2p.Peer @@ -256,13 +257,13 @@ func CreateWalletServerNode() *node.Node { shardIDLeaderMap = getShardIDToLeaderMap() clientPeer = &p2p.Peer{Port: "127.0.0.1", IP: "1234"} } - walletNode := node.New(nil, nil) + walletNode := node.New(nil, nil, *clientPeer) walletNode.Client = client.NewClient(&shardIDLeaderMap) walletNode.ClientPeer = clientPeer return walletNode } -// Issue the transaction to the Harmony network +// ExecuteTransaction issues the transaction to the Harmony network func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error { if tx.IsCrossShard() { walletNode.Client.PendingCrossTxsMutex.Lock() @@ -292,7 +293,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } } -// Fetch utxos of specified address from the Harmony network +// FetchUtxos fetches utxos of specified address from the Harmony network func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) @@ -316,6 +317,7 @@ func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockch } } +// PrintUtxoBalance prints UTXO balance. func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { addressBalance := make(map[[20]byte]int) for _, utxoMap := range shardUtxoMap { @@ -338,7 +340,7 @@ func PrintUtxoBalance(shardUtxoMap map[uint32]blockchain.UtxoMap) { } } -// Read the addresses stored in local keystore +// ReadAddresses reads the addresses stored in local keystore func ReadAddresses() [][20]byte { priKeys := ReadPrivateKeys() addresses := [][20]byte{} @@ -348,7 +350,7 @@ func ReadAddresses() [][20]byte { return addresses } -// Store the specified private key in local keystore +// StorePrivateKey stores the specified private key in local keystore func StorePrivateKey(priKey []byte) { for _, address := range ReadAddresses() { if address == pki.GetAddressFromPrivateKey(crypto.Ed25519Curve.Scalar().SetBytes(priKey)) { @@ -369,12 +371,12 @@ func StorePrivateKey(priKey []byte) { f.Close() } -// Delete all data in the local keystore +// ClearKeystore deletes all data in the local keystore func ClearKeystore() { ioutil.WriteFile("keystore", []byte{}, 0644) } -// Read all the private key stored in local keystore +// ReadPrivateKeys reads all the private key stored in local keystore func ReadPrivateKeys() []kyber.Scalar { keys, err := ioutil.ReadFile("keystore") if err != nil { diff --git a/consensus/bft.go b/consensus/bft.go new file mode 100644 index 000000000..9eb318f7d --- /dev/null +++ b/consensus/bft.go @@ -0,0 +1,121 @@ +package consensus + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto/sha3" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/state" + "github.com/harmony-one/harmony/core/types" +) + +// Bft is the struct for Bft protocol. +type Bft struct { +} + +// NewFaker returns Bft. +func NewFaker() *Bft { + return &Bft{} +} + +// Author implements Engine, returning the header's coinbase as the +// proof-of-work verified author of the block. +func (bft *Bft) Author(header *types.Header) (common.Address, error) { + return header.Coinbase, nil +} + +// VerifyHeader checks whether a header conforms to the consensus rules of the +// stock Ethereum bft engine. +func (bft *Bft) VerifyHeader(chain ChainReader, header *types.Header, seal bool) error { + return nil +} + +// VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers +// concurrently. The method returns a quit channel to abort the operations and +// a results channel to retrieve the async verifications. +func (bft *Bft) VerifyHeaders(chain ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { + abort, results := make(chan struct{}), make(chan error, len(headers)) + for i := 0; i < len(headers); i++ { + results <- nil + } + return abort, results +} + +func (bft *Bft) verifyHeaderWorker(chain ChainReader, headers []*types.Header, seals []bool, index int) error { + var parent *types.Header + if index == 0 { + parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1) + } else if headers[index-1].Hash() == headers[index].ParentHash { + parent = headers[index-1] + } + if parent == nil { + return ErrUnknownAncestor + } + if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil { + return nil // known block + } + return bft.verifyHeader(chain, headers[index], parent, false, seals[index]) +} + +// verifyHeader checks whether a header conforms to the consensus rules of the +// stock Ethereum bft engine. +// See YP section 4.3.4. "Block Header Validity" +func (bft *Bft) verifyHeader(chain ChainReader, header, parent *types.Header, uncle bool, seal bool) error { + return nil +} + +// VerifySeal implements consensus.Engine, checking whether the given block satisfies +// the PoW difficulty requirements. +func (bft *Bft) VerifySeal(chain ChainReader, header *types.Header) error { + return nil +} + +// Prepare implements consensus.Engine, initializing the difficulty field of a +// header to conform to the ethash protocol. The changes are done inline. +func (bft *Bft) Prepare(chain ChainReader, header *types.Header) error { + return nil +} + +// Finalize implements consensus.Engine, accumulating the block and uncle rewards, +// setting the final state and assembling the block. +func (bft *Bft) Finalize(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, receipts []*types.Receipt) (*types.Block, error) { + // Accumulate any block and uncle rewards and commit the final state root + // Header seems complete, assemble into a block and return + accumulateRewards(chain.Config(), state, header) + header.Root = state.IntermediateRoot(false) + return types.NewBlock(header, txs, receipts), nil +} + +// SealHash returns the hash of a block prior to it being sealed. +func (bft *Bft) SealHash(header *types.Header) (hash common.Hash) { + hasher := sha3.NewKeccak256() + + rlp.Encode(hasher, []interface{}{ + header.ParentHash, + header.Coinbase, + header.Root, + header.TxHash, + header.ReceiptHash, + header.Bloom, + header.Difficulty, + header.Number, + header.GasLimit, + header.GasUsed, + header.Time, + header.Extra, + }) + hasher.Sum(hash[:0]) + return hash +} + +// Seal ... +func (bft *Bft) Seal(chain ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { + return nil +} + +// AccumulateRewards credits the coinbase of the given block with the mining +// reward. The total reward consists of the static block reward and rewards for +// included uncles. The coinbase of each uncle block is also rewarded. +func accumulateRewards(config *params.ChainConfig, state *state.StateDB, header *types.Header) { + +} diff --git a/consensus/consensus.go b/consensus/consensus.go index cd5567a00..036ec281a 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -108,13 +108,11 @@ type BlockConsensusStatus struct { state State // the latest state of the consensus } -// NewConsensus creates a new Consensus object -// TODO(minhdoan): Maybe convert it into just New -// FYI, see https://golang.org/doc/effective_go.html?#package-names -func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { +// New creates a new Consensus object +func New(selfPeer p2p.Peer, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Consensus { consensus := Consensus{} - if leader.Port == port && leader.IP == ip { + if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { consensus.IsLeader = true } else { consensus.IsLeader = false @@ -127,7 +125,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * consensus.leader = leader for _, peer := range peers { - consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) + consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) } // Initialize cosign bitmap @@ -152,7 +150,7 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) * // For now use socket address as 16 byte Id // TODO: populate with correct Id - consensus.nodeID = utils.GetUniqueIdFromPeer(p2p.Peer{IP: ip, Port: port}) + consensus.nodeID = utils.GetUniqueIDFromPeer(selfPeer) // Set private key for myself so that I can sign messages. consensus.priKey = crypto.Ed25519Curve.Scalar().SetInt64(int64(consensus.nodeID)) @@ -245,12 +243,12 @@ func (consensus *Consensus) AddPeers(peers []p2p.Peer) int { count := 0 for _, peer := range peers { - _, ok := consensus.validators.Load(utils.GetUniqueIdFromPeer(peer)) + _, ok := consensus.validators.Load(utils.GetUniqueIDFromPeer(peer)) if !ok { if peer.ValidatorID == -1 { peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID()) } - consensus.validators.Store(utils.GetUniqueIdFromPeer(peer), peer) + consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) } count++ diff --git a/consensus/consensus_leader_msg_test.go b/consensus/consensus_leader_msg_test.go index f085881d0..b04830172 100644 --- a/consensus/consensus_leader_msg_test.go +++ b/consensus/consensus_leader_msg_test.go @@ -12,7 +12,7 @@ import ( func TestConstructAnnounceMessage(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} header := consensus.blockHeader msg := consensus.constructAnnounceMessage() @@ -35,7 +35,7 @@ func TestConstructChallengeMessage(test *testing.T) { validatorPubKey := pki.GetPublicKeyFromScalar(leaderPriKey) validator := p2p.Peer{IP: "3", Port: "5", PubKey: validatorPubKey} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} (*consensus.commitments)[0] = leaderPubKey (*consensus.commitments)[1] = validatorPubKey diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 9b2e834f0..a84182815 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -6,10 +6,10 @@ import ( "github.com/harmony-one/harmony/p2p" ) -func TestNewConsensus(test *testing.T) { +func TestNew(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) if consensus.consensusID != 0 { test.Errorf("Consensus Id is initialized to the wrong value: %d", consensus.consensusID) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index fa25af968..fa1bcfe65 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -72,7 +72,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) { // Verify block data // check leader Id - myLeaderID := utils.GetUniqueIdFromPeer(consensus.leader) + myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader) if leaderID != myLeaderID { consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus) return @@ -175,7 +175,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState // Verify block data and the aggregated signatures // check leader Id - myLeaderID := utils.GetUniqueIdFromPeer(consensus.leader) + myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader) if leaderID != myLeaderID { consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus) return @@ -325,7 +325,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) { // Verify block data // check leader Id - myLeaderID := utils.GetUniqueIdFromPeer(consensus.leader) + myLeaderID := utils.GetUniqueIDFromPeer(consensus.leader) if leaderID != myLeaderID { consensus.Log.Warn("Received message from wrong leader", "myLeaderID", myLeaderID, "receivedLeaderId", leaderID, "consensus", consensus) return diff --git a/consensus/consensus_validator_msg_test.go b/consensus/consensus_validator_msg_test.go index 34921b4b0..a96352462 100644 --- a/consensus/consensus_validator_msg_test.go +++ b/consensus/consensus_validator_msg_test.go @@ -11,7 +11,7 @@ import ( func TestConstructCommitMessage(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} _, msg := consensus.constructCommitMessage(consensus_proto.Commit) @@ -23,7 +23,7 @@ func TestConstructCommitMessage(test *testing.T) { func TestConstructResponseMessage(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := New(leader, "0", []p2p.Peer{leader, validator}, leader) consensus.blockHash = [32]byte{} msg := consensus.constructResponseMessage(consensus_proto.Response, crypto.Ed25519Curve.Scalar()) diff --git a/crypto/cosi.go b/crypto/cosi.go index 675616053..4c98c2c69 100644 --- a/crypto/cosi.go +++ b/crypto/cosi.go @@ -1,5 +1,5 @@ /* -Package cosi implements the collective signing (CoSi) algorithm as presented in +Package crypto implements the collective signing (CoSi) algorithm as presented in the paper "Keeping Authorities 'Honest or Bust' with Decentralized Witness Cosigning" by Ewa Syta et al. See https://arxiv.org/abs/1503.08768. This package only provides the functionality for the cryptographic operations of diff --git a/crypto/pki/utils.go b/crypto/pki/utils.go index 1ef2be212..c508c51e7 100644 --- a/crypto/pki/utils.go +++ b/crypto/pki/utils.go @@ -8,6 +8,7 @@ import ( "github.com/harmony-one/harmony/log" ) +// GetAddressFromPublicKey returns address given a public key. func GetAddressFromPublicKey(pubKey kyber.Point) [20]byte { bytes, err := pubKey.MarshalBinary() if err != nil { @@ -19,23 +20,27 @@ func GetAddressFromPublicKey(pubKey kyber.Point) [20]byte { return address } +// GetAddressFromPrivateKey returns address given a private key. func GetAddressFromPrivateKey(priKey kyber.Scalar) [20]byte { return GetAddressFromPublicKey(GetPublicKeyFromScalar(priKey)) } +// GetAddressFromPrivateKeyBytes returns address from private key in bytes. func GetAddressFromPrivateKeyBytes(priKey [32]byte) [20]byte { return GetAddressFromPublicKey(GetPublicKeyFromScalar(crypto.Ed25519Curve.Scalar().SetBytes(priKey[:]))) } -// Temporary helper function for benchmark use +// GetAddressFromInt is the temporary helper function for benchmark use func GetAddressFromInt(value int) [20]byte { return GetAddressFromPublicKey(GetPublicKeyFromScalar(GetPrivateKeyScalarFromInt(value))) } +// GetPrivateKeyScalarFromInt return private key scalar. func GetPrivateKeyScalarFromInt(value int) kyber.Scalar { return crypto.Ed25519Curve.Scalar().SetInt64(int64(value)) } +// GetPrivateKeyFromInt returns private key in bytes given an interger. func GetPrivateKeyFromInt(value int) [32]byte { priKey, err := crypto.Ed25519Curve.Scalar().SetInt64(int64(value)).MarshalBinary() priKeyBytes := [32]byte{} @@ -45,6 +50,7 @@ func GetPrivateKeyFromInt(value int) [32]byte { return priKeyBytes } +// GetPublicKeyFromPrivateKey return public key from private key. func GetPublicKeyFromPrivateKey(priKey [32]byte) kyber.Point { suite := crypto.Ed25519Curve scalar := suite.Scalar() @@ -52,12 +58,12 @@ func GetPublicKeyFromPrivateKey(priKey [32]byte) kyber.Point { return suite.Point().Mul(scalar, nil) } -// Same as GetPublicKeyFromPrivateKey, but it directly works on kyber.Scalar object. +// GetPublicKeyFromScalar is the same as GetPublicKeyFromPrivateKey, but it directly works on kyber.Scalar object. func GetPublicKeyFromScalar(priKey kyber.Scalar) kyber.Point { return crypto.Ed25519Curve.Point().Mul(priKey, nil) } -// Converts public key point to bytes +// GetBytesFromPublicKey converts public key point to bytes func GetBytesFromPublicKey(pubKey kyber.Point) [32]byte { bytes, err := pubKey.MarshalBinary() result := [32]byte{} diff --git a/discovery/discovery.go b/discovery/discovery.go index 7ed3f1023..1ad881454 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -7,6 +7,7 @@ import ( "github.com/harmony-one/harmony/p2p" ) +// ConfigEntry is the config entry. type ConfigEntry struct { IP string Port string @@ -24,6 +25,8 @@ func (config ConfigEntry) String() string { return fmt.Sprintf("idc: %v:%v", config.IP, config.Port) } +// New return new ConfigEntry. +// TODO: This should be change because this package is discovery and New here implies New Discovery. func New(priK kyber.Scalar, pubK kyber.Point) *ConfigEntry { var config ConfigEntry config.priK = priK @@ -34,6 +37,7 @@ func New(priK kyber.Scalar, pubK kyber.Point) *ConfigEntry { return &config } +// StartClientMode starts client mode. func (config *ConfigEntry) StartClientMode(idcIP, idcPort string) error { config.IP = "myip" config.Port = "myport" @@ -45,18 +49,22 @@ func (config *ConfigEntry) StartClientMode(idcIP, idcPort string) error { return nil } +// GetShardID ... func (config *ConfigEntry) GetShardID() string { return config.ShardID } +// GetPeers ... func (config *ConfigEntry) GetPeers() []p2p.Peer { return config.peers } +// GetLeader ... func (config *ConfigEntry) GetLeader() p2p.Peer { return config.leader } +// GetSelfPeer ... func (config *ConfigEntry) GetSelfPeer() p2p.Peer { return config.self } diff --git a/node/node.go b/node/node.go index 6cacfc31f..ab043748f 100644 --- a/node/node.go +++ b/node/node.go @@ -27,6 +27,7 @@ import ( "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/p2pv2" proto_node "github.com/harmony-one/harmony/proto/node" "github.com/harmony-one/harmony/syncing/downloader" downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" @@ -154,9 +155,16 @@ func (node *Node) StartServer(port string) { // Disable this temporarily. // node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers()) } - fmt.Println("going to start server on port:", port) - //node.log.Debug("Starting server", "node", node, "port", port) - node.listenOnPort(port) + if p2p.Version == 1 { + fmt.Println("going to start server on port:", port) + //node.log.Debug("Starting server", "node", node, "port", port) + node.listenOnPort(port) + } else { + p2pv2.InitHost(node.SelfPeer.IP, port) + p2pv2.BindHandler(node.NodeHandlerV1) + // Hang forever + <-make(chan struct{}) + } } // SetLog sets log for Node. @@ -165,6 +173,7 @@ func (node *Node) SetLog() *Node { return node } +// Version 0 p2p. Going to be deprecated. func (node *Node) listenOnPort(port string) { addr := net.JoinHostPort("", port) listen, err := net.Listen("tcp4", addr) @@ -244,7 +253,7 @@ func DeserializeNode(d []byte) *NetworkNode { } // New creates a new node. -func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { +func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node { node := Node{} if consensus != nil { @@ -301,6 +310,9 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node.BlockChannelAccount = make(chan *types.Block) node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus) } + + node.SelfPeer = selfPeer + // Logger node.log = log.New() if consensus.IsLeader { diff --git a/node/node_handler.go b/node/node_handler.go index 556b06fcf..eff9d4fe6 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -1,7 +1,6 @@ package node import ( - "bufio" "bytes" "encoding/gob" "fmt" @@ -10,6 +9,8 @@ import ( "strconv" "time" + "github.com/harmony-one/harmony/p2pv2" + "github.com/dedis/kyber" "github.com/ethereum/go-ethereum/rlp" "github.com/harmony-one/harmony/blockchain" @@ -22,6 +23,7 @@ import ( "github.com/harmony-one/harmony/proto/consensus" proto_identity "github.com/harmony-one/harmony/proto/identity" proto_node "github.com/harmony-one/harmony/proto/node" + netp2p "github.com/libp2p/go-libp2p-net" ) const ( @@ -76,7 +78,7 @@ func (node *Node) NodeHandler(conn net.Conn) { switch msgCategory { case proto.Identity: - actionType := proto_identity.IdentityMessageType(msgType) + actionType := proto_identity.IDMessageType(msgType) switch actionType { case proto_identity.Identity: messageType := proto_identity.MessageType(msgPayload[0]) @@ -91,7 +93,7 @@ func (node *Node) NodeHandler(conn net.Conn) { } } case proto.Consensus: - actionType := consensus.ConsensusMessageType(msgType) + actionType := consensus.ConMessageType(msgType) switch actionType { case consensus.Consensus: if consensusObj.IsLeader { @@ -120,9 +122,6 @@ func (node *Node) NodeHandler(conn net.Conn) { node.Client.UpdateBlocks(*blocks) } } - case proto_node.BlockchainSync: - node.log.Info("NET: received message: Node/BlockchainSync") - node.handleBlockchainSync(msgPayload, conn) case proto_node.Client: node.log.Info("NET: received message: Node/Client") clientMsgType := proto_node.ClientMessageType(msgPayload[0]) @@ -193,7 +192,7 @@ func (node *Node) NodeHandler(conn net.Conn) { node.pongMessageHandler(msgPayload) } case proto.Client: - actionType := client.ClientMessageType(msgType) + actionType := client.MessageType(msgType) node.log.Info("NET: received message: Client/Transaction") switch actionType { case client.Transaction: @@ -206,55 +205,167 @@ func (node *Node) NodeHandler(conn net.Conn) { } } -// Refactor by moving this code into a sync package. -func (node *Node) handleBlockchainSync(payload []byte, conn net.Conn) { - // TODO(minhdoan): Looking to removing this. - w := bufio.NewWriter(conn) -FOR_LOOP: - for { - syncMsgType := proto_node.BlockchainSyncMessageType(payload[0]) - switch syncMsgType { - case proto_node.GetBlock: - block := node.blockchain.FindBlock(payload[1:33]) - w.Write(block.Serialize()) - w.Flush() - case proto_node.GetLastBlockHashes: - blockchainSyncMessage := proto_node.BlockchainSyncMessage{ - BlockHeight: len(node.blockchain.Blocks), - BlockHashes: node.blockchain.GetBlockHashes(), +// NodeHandler handles a new incoming connection. +func (node *Node) NodeHandlerV1(s netp2p.Stream) { + defer s.Close() + + // Read p2p message payload + content, err := p2pv2.ReadData(s) + + if err != nil { + node.log.Error("Read p2p data failed", "err", err, "node", node) + return + } + // TODO: this is tree broadcasting. this needs to be removed later. Actually the whole logic needs to be replaced by p2p. + node.MaybeBroadcastAsValidator(content) + + consensusObj := node.Consensus + + msgCategory, err := proto.GetMessageCategory(content) + if err != nil { + node.log.Error("Read node type failed", "err", err, "node", node) + return + } + + msgType, err := proto.GetMessageType(content) + if err != nil { + node.log.Error("Read action type failed", "err", err, "node", node) + return + } + + msgPayload, err := proto.GetMessagePayload(content) + if err != nil { + node.log.Error("Read message payload failed", "err", err, "node", node) + return + } + + switch msgCategory { + case proto.Identity: + actionType := proto_identity.IDMessageType(msgType) + switch actionType { + case proto_identity.Identity: + messageType := proto_identity.MessageType(msgPayload[0]) + switch messageType { + case proto_identity.Register: + fmt.Println("received a identity message") + // TODO(ak): fix it. + // node.processPOWMessage(msgPayload) + node.log.Info("NET: received message: IDENTITY/REGISTER") + default: + node.log.Error("Announce message should be sent to IdentityChain") } - w.Write(proto_node.SerializeBlockchainSyncMessage(&blockchainSyncMessage)) - w.Flush() - case proto_node.Done: - break FOR_LOOP } - content, err := p2p.ReadMessageContent(conn) - - if err != nil { - node.log.Error("Failed in reading message content from syncing node", err) - return + case proto.Consensus: + actionType := consensus.ConMessageType(msgType) + switch actionType { + case consensus.Consensus: + if consensusObj.IsLeader { + node.log.Info("NET: received message: Consensus/Leader") + consensusObj.ProcessMessageLeader(msgPayload) + } else { + node.log.Info("NET: received message: Consensus/Validator") + consensusObj.ProcessMessageValidator(msgPayload) + } } + case proto.Node: + actionType := proto_node.MessageType(msgType) + switch actionType { + case proto_node.Transaction: + node.log.Info("NET: received message: Node/Transaction") + node.transactionMessageHandler(msgPayload) + case proto_node.Block: + node.log.Info("NET: received message: Node/Block") + blockMsgType := proto_node.BlockMessageType(msgPayload[0]) + switch blockMsgType { + case proto_node.Sync: + decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type + blocks := new([]*blockchain.Block) + decoder.Decode(blocks) + if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { + node.Client.UpdateBlocks(*blocks) + } + } + case proto_node.Client: + node.log.Info("NET: received message: Node/Client") + clientMsgType := proto_node.ClientMessageType(msgPayload[0]) + switch clientMsgType { + case proto_node.LookupUtxo: + decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the LookupUtxo messge type - msgCategory, _ := proto.GetMessageCategory(content) - if err != nil || msgCategory != proto.Node { - node.log.Error("Failed in reading message category from syncing node", err) - return - } + fetchUtxoMessage := new(proto_node.FetchUtxoMessage) + decoder.Decode(fetchUtxoMessage) - msgType, err := proto.GetMessageType(content) - actionType := proto_node.MessageType(msgType) - if err != nil || actionType != proto_node.BlockchainSync { - node.log.Error("Failed in reading message type from syncing node", err) - return - } + utxoMap := node.UtxoPool.GetUtxoMapByAddresses(fetchUtxoMessage.Addresses) - payload, err = proto.GetMessagePayload(content) - if err != nil { - node.log.Error("Failed in reading payload from syncing node", err) - return + p2p.SendMessage(fetchUtxoMessage.Sender, client.ConstructFetchUtxoResponseMessage(&utxoMap, node.UtxoPool.ShardID)) + } + case proto_node.Control: + node.log.Info("NET: received message: Node/Control") + controlType := msgPayload[0] + if proto_node.ControlMessageType(controlType) == proto_node.STOP { + if node.Chain == nil { + node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) + + sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() + node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) + + avgBlockSizeInBytes := 0 + txCount := 0 + blockCount := 0 + totalTxCount := 0 + totalBlockCount := 0 + avgTxSize := 0 + + for _, block := range node.blockchain.Blocks { + if block.IsStateBlock() { + totalTxCount += int(block.State.NumTransactions) + totalBlockCount += int(block.State.NumBlocks) + } else { + byteBuffer := bytes.NewBuffer([]byte{}) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(block) + avgBlockSizeInBytes += len(byteBuffer.Bytes()) + + txCount += len(block.Transactions) + blockCount++ + totalTxCount += len(block.TransactionIds) + totalBlockCount++ + + byteBuffer = bytes.NewBuffer([]byte{}) + encoder = gob.NewEncoder(byteBuffer) + encoder.Encode(block.Transactions) + avgTxSize += len(byteBuffer.Bytes()) + } + } + if blockCount != 0 { + avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount + avgTxSize = avgTxSize / txCount + } + + node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) + } else { + node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) + } + + os.Exit(0) + } + case proto_node.PING: + node.pingMessageHandler(msgPayload) + case proto_node.PONG: + node.pongMessageHandler(msgPayload) } + case proto.Client: + actionType := client.MessageType(msgType) + node.log.Info("NET: received message: Client/Transaction") + switch actionType { + case client.Transaction: + if node.Client != nil { + node.Client.TransactionMessageHandler(msgPayload) + } + } + default: + node.log.Error("Unknown", "MsgCateory:", msgCategory) } - node.log.Info("HOORAY: Done sending info to syncing node.") } func (node *Node) transactionMessageHandler(msgPayload []byte) { diff --git a/node/node_test.go b/node/node_test.go index 8894df225..bb0458b4b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -18,9 +18,9 @@ import ( func TestNewNewNode(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil) + node := New(consensus, nil, leader) if node.Consensus == nil { test.Error("Consensus is not initialized for the node") } @@ -45,9 +45,9 @@ func TestNewNewNode(test *testing.T) { func TestCountNumTransactionsInBlockchain(test *testing.T) { leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil) + node := New(consensus, nil, leader) node.AddTestingAddresses(1000) if node.countNumTransactionsInBlockchain() != 1001 { test.Error("Count of transactions in the blockchain is incorrect") @@ -79,9 +79,9 @@ func TestAddPeers(test *testing.T) { } leader := p2p.Peer{IP: "1", Port: "2"} validator := p2p.Peer{IP: "3", Port: "5"} - consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) + consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader) - node := New(consensus, nil) + node := New(consensus, nil, leader) r1 := node.AddPeers(peers1) e1 := 2 if r1 != e1 { @@ -147,16 +147,13 @@ func exitServer() { os.Exit(0) } -func TestPingPongHandler(test *testing.T) { - leader := p2p.Peer{IP: "127.0.0.1", Port: "8881"} - validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"} - consensus := consensus.NewConsensus("127.0.0.1", "8881", "0", []p2p.Peer{leader, validator}, leader) - - node := New(consensus, nil) - - // go sendPingMessage(leader) - go sendPongMessage(leader) - go exitServer() - - node.StartServer("8881") -} +// func TestPingPongHandler(test *testing.T) { +// leader := p2p.Peer{IP: "127.0.0.1", Port: "8881"} +// // validator := p2p.Peer{IP: "127.0.0.1", Port: "9991"} +// consensus := consensus.New("127.0.0.1", "8881", "0", []p2p.Peer{leader}, leader) +// node := New(consensus, nil) +// // go sendPingMessage(leader) +// go sendPongMessage(leader) +// go exitServer() +// node.StartServer("8881") +// } diff --git a/node/worker/worker.go b/node/worker/worker.go index a58ebcad3..a8b066b80 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -1,6 +1,9 @@ package worker import ( + "math/big" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" "github.com/harmony-one/harmony/consensus" @@ -8,8 +11,6 @@ import ( "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" - "math/big" - "time" ) // environment is the worker's current environment and holds all of the current state information. @@ -22,7 +23,7 @@ type environment struct { receipts []*types.Receipt } -// worker is the main object which takes care of submitting new work to consensus engine +// Worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type Worker struct { config *params.ChainConfig @@ -49,6 +50,7 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } +// CommitTransactions commits transactions. func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) error { snap := w.current.state.Snapshot() @@ -66,6 +68,7 @@ func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Ad return nil } +// UpdateCurrent updates ... func (w *Worker) UpdateCurrent() error { parent := w.chain.CurrentBlock() num := parent.Number() @@ -95,10 +98,12 @@ func (w *Worker) makeCurrent(parent *types.Block, header *types.Header) error { return nil } +// GetCurrentState ... func (w *Worker) GetCurrentState() *state.StateDB { return w.current.state } +// Commit ... func (w *Worker) Commit() (*types.Block, error) { s := w.current.state.Copy() block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) @@ -108,6 +113,7 @@ func (w *Worker) Commit() (*types.Block, error) { return block, nil } +// New ... func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine) *Worker { worker := &Worker{ config: config, diff --git a/p2p/backoff.go b/p2p/backoff.go index 2bac92e41..c57a636cb 100644 --- a/p2p/backoff.go +++ b/p2p/backoff.go @@ -16,6 +16,7 @@ type BackoffBase struct { Min, Cur, Max time.Duration } +// NewBackoffBase creates a new BackOffBase structure func NewBackoffBase(min, max time.Duration) *BackoffBase { return &BackoffBase{min, min, max} } @@ -40,21 +41,23 @@ func (b *BackoffBase) Sleep() { } } -// Adjust the duration. Subtypes shall implement this. +// Backoff adjusts the duration. Subtypes shall implement this. func (b *BackoffBase) Backoff() { // default implementation does not backoff } -// Exponential backoff. +// ExpBackoff is an exponential backoff data structure. type ExpBackoff struct { BackoffBase Factor float64 } +// NewExpBackoff creates a new ExpBackOff structure func NewExpBackoff(min, max time.Duration, factor float64) *ExpBackoff { return &ExpBackoff{*NewBackoffBase(min, max), factor} } +// Backoff implements the exponential backoff func (b *ExpBackoff) Backoff() { b.Cur = time.Duration(float64(b.Cur) * b.Factor) } diff --git a/p2p/helper.go b/p2p/helper.go index d748a0aae..2be326afd 100644 --- a/p2p/helper.go +++ b/p2p/helper.go @@ -24,18 +24,17 @@ content (n bytes) - actual message content */ -const BATCH_SIZE = 1 << 16 +// BatchSizeInByte defines the size of buffer (64MB) +const BatchSizeInByte = 1 << 16 -// Read the message type and content size, and return the actual content. +// ReadMessageContent reads the message type and content size, and return the actual content. func ReadMessageContent(conn net.Conn) ([]byte, error) { var ( contentBuf = bytes.NewBuffer([]byte{}) r = bufio.NewReader(conn) ) - timeoutDuration := 1 * time.Second conn.SetReadDeadline(time.Now().Add(timeoutDuration)) - //// Read 1 byte for message type _, err := r.ReadByte() switch err { @@ -49,7 +48,6 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { return contentBuf.Bytes(), err } // TODO: check on msgType and take actions accordingly - //// Read 4 bytes for message size fourBytes := make([]byte, 4) n, err := r.Read(fourBytes) @@ -60,25 +58,22 @@ func ReadMessageContent(conn net.Conn) ([]byte, error) { log.Printf("Failed reading the p2p message size field: only read %d bytes", n) return contentBuf.Bytes(), err } - //log.Print(fourBytes) // Number of bytes for the message content bytesToRead := binary.BigEndian.Uint32(fourBytes) //log.Printf("The content size is %d bytes.", bytesToRead) - //// Read the content in chunk of 16 * 1024 bytes - tmpBuf := make([]byte, BATCH_SIZE) + tmpBuf := make([]byte, BatchSizeInByte) ILOOP: for { timeoutDuration := 10 * time.Second conn.SetReadDeadline(time.Now().Add(timeoutDuration)) - if bytesToRead < BATCH_SIZE { + if bytesToRead < BatchSizeInByte { // Read the last number of bytes less than 1024 tmpBuf = make([]byte, bytesToRead) } n, err := r.Read(tmpBuf) contentBuf.Write(tmpBuf[:n]) - switch err { case io.EOF: // TODO: should we return error here, or just ignore it? @@ -97,6 +92,7 @@ ILOOP: return contentBuf.Bytes(), nil } +// CreateMessage create a general message. FIXME: this is not used func CreateMessage(msgType byte, data []byte) []byte { buffer := bytes.NewBuffer([]byte{}) @@ -110,6 +106,7 @@ func CreateMessage(msgType byte, data []byte) []byte { return buffer.Bytes() } +// SendMessageContent send message over net connection. FIXME: this is not used func SendMessageContent(conn net.Conn, data []byte) { msgToSend := CreateMessage(byte(1), data) w := bufio.NewWriter(conn) diff --git a/p2p/peer.go b/p2p/peer.go index 4918c99ef..2226ad437 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/p2pv2" "github.com/dedis/kyber" ) @@ -26,6 +27,11 @@ type Peer struct { // MaxBroadCast is the maximum number of neighbors to broadcast const MaxBroadCast = 20 +// Version The version number of p2p library +// 1 - Direct socket connection +// 2 - libp2p +const Version = 1 + // SendMessage sends the message to the peer func SendMessage(peer Peer, msg []byte) { // Construct normal p2p message @@ -136,7 +142,13 @@ func send(ip, port string, message []byte) { backoff := NewExpBackoff(250*time.Millisecond, 10*time.Second, 2) for trial := 0; trial < 10; trial++ { - err := sendWithSocketClient(ip, port, message) + var err error + if Version == 1 { + // TODO(ricl): remove sendWithSocketClient related code. + err = sendWithSocketClient(ip, port, message) + } else { + err = p2pv2.Send(ip, port, message) + } if err == nil { if trial > 0 { log.Warn("retry sendWithSocketClient", "rety", trial) diff --git a/p2pv2/host.go b/p2pv2/host.go new file mode 100644 index 000000000..4f7e5f650 --- /dev/null +++ b/p2pv2/host.go @@ -0,0 +1,146 @@ +package p2pv2 + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "time" + + "github.com/harmony-one/harmony/log" + libp2p "github.com/libp2p/go-libp2p" + host "github.com/libp2p/go-libp2p-host" + net "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + multiaddr "github.com/multiformats/go-multiaddr" +) + +var ( + myHost host.Host // TODO(ricl): this should be a field in node. +) + +const ( + // BatchSizeInByte The batch size in byte (64MB) in which we return data + BatchSizeInByte = 1 << 16 + // ProtocolID The ID of protocol used in stream handling. + ProtocolID = "/harmony/0.0.1" +) + +// InitHost Initialize a host for p2p communication +func InitHost(ip, port string) { + addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) + sourceAddr, err := multiaddr.NewMultiaddr(addr) + catchError(err) + priv := addrToPrivKey(addr) + myHost, err = libp2p.New(context.Background(), + libp2p.ListenAddrs(sourceAddr), + libp2p.Identity(priv), + libp2p.NoSecurity, // The security (signature generation and verification) is, for now, taken care by ourselves. + // TODO(ricl): Other features to probe + // libp2p.EnableRelay; libp2p.Routing; + ) + catchError(err) + log.Debug("Host is up!", "port", port, "id", myHost.ID().Pretty(), "addrs", sourceAddr) +} + +// BindHandler bind a streamHandler to the harmony protocol. +func BindHandler(handler net.StreamHandler) { + myHost.SetStreamHandler(ProtocolID, handler) +} + +// Send a p2p message sending function with signature compatible to p2pv1. +func Send(ip, port string, message []byte) error { + addr := fmt.Sprintf("/ip4/%s/tcp/%s", ip, port) + targetAddr, err := multiaddr.NewMultiaddr(addr) + + priv := addrToPrivKey(addr) + peerID, _ := peer.IDFromPrivateKey(priv) + myHost.Peerstore().AddAddrs(peerID, []multiaddr.Multiaddr{targetAddr}, peerstore.PermanentAddrTTL) + s, err := myHost.NewStream(context.Background(), peerID, ProtocolID) + catchError(err) + + // Create a buffered stream so that read and writes are non blocking. + w := bufio.NewWriter(bufio.NewWriter(s)) + + // Create a thread to read and write data. + go writeData(w, message) + return nil +} + +// ReadData Call this function in streamHandler to get the binary data. +func ReadData(s net.Stream) ([]byte, error) { + timeoutDuration := 1 * time.Second + s.SetReadDeadline(time.Now().Add(timeoutDuration)) + + // Create a buffered stream so that read and writes are non blocking. + rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) + + contentBuf := bytes.NewBuffer([]byte{}) + // Read 1 byte for message type + _, err := rw.ReadByte() + switch err { + case nil: + //log.Printf("Received p2p message type: %x\n", msgType) + case io.EOF: + fallthrough + default: + log.Error("Error reading the p2p message type field", "err", err) + return contentBuf.Bytes(), err + } + // TODO: check on msgType and take actions accordingly + + // Read 4 bytes for message size + fourBytes := make([]byte, 4) + n, err := rw.Read(fourBytes) + if err != nil { + log.Error("Error reading the p2p message size field", "err", err) + return contentBuf.Bytes(), err + } else if n < len(fourBytes) { + log.Error("Invalid byte size", "bytes", n) + return contentBuf.Bytes(), err + } + + //log.Print(fourBytes) + // Number of bytes for the message content + bytesToRead := binary.BigEndian.Uint32(fourBytes) + //log.Printf("The content size is %d bytes.", bytesToRead) + + // Read the content in chunk of size `BatchSizeInByte` + tmpBuf := make([]byte, BatchSizeInByte) +ILOOP: + for { + // TODO(ricl): is this necessary? If yes, figure out how to make it work + // timeoutDuration := 10 * time.Second + // s.SetReadDeadline(time.Now().Add(timeoutDuration)) + if bytesToRead < BatchSizeInByte { + // Read the last number of bytes less than `BatchSizeInByte` + tmpBuf = make([]byte, bytesToRead) + } + n, err := rw.Read(tmpBuf) + contentBuf.Write(tmpBuf[:n]) + + switch err { + case io.EOF: + // TODO: should we return error here, or just ignore it? + log.Error("EOF reached while reading p2p message") + break ILOOP + case nil: + bytesToRead -= uint32(n) // TODO: think about avoid the casting in every loop + if bytesToRead <= 0 { + break ILOOP + } + default: + log.Error("Error reading p2p message") + return []byte{}, err + } + } + return contentBuf.Bytes(), nil +} + +// GetHost Get the p2p host +func GetHost() host.Host { + return myHost +} diff --git a/p2pv2/util.go b/p2pv2/util.go new file mode 100644 index 000000000..52486bd78 --- /dev/null +++ b/p2pv2/util.go @@ -0,0 +1,31 @@ +package p2pv2 + +import ( + "bufio" + "hash/fnv" + "math/rand" + + "github.com/harmony-one/harmony/log" + ic "github.com/libp2p/go-libp2p-crypto" +) + +func catchError(err error) { + if err != nil { + log.Error("catchError", "err", err) + panic(err) + } +} + +func addrToPrivKey(addr string) ic.PrivKey { + h := fnv.New32a() + _, err := h.Write([]byte(addr)) + catchError(err) + r := rand.New(rand.NewSource(int64(h.Sum32()))) // Hack: forcing the random see to be the hash of addr so that we can recover priv from ip + port. + priv, _, err := ic.GenerateKeyPairWithReader(ic.RSA, 512, r) + return priv +} + +func writeData(w *bufio.Writer, data []byte) { + w.Write(data) + w.Flush() +} diff --git a/profiler/profiler.go b/profiler/profiler.go index fee3c8ab3..4d34e909f 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -12,6 +12,7 @@ import ( "github.com/shirou/gopsutil/process" ) +// Profiler is the profiler data structure. type Profiler struct { // parameters logger log.Logger @@ -25,6 +26,8 @@ type Profiler struct { var singleton *Profiler var once sync.Once +// GetProfiler returns a pointer of Profiler. +// TODO: This should be a New method. func GetProfiler() *Profiler { once.Do(func() { singleton = &Profiler{} @@ -32,6 +35,7 @@ func GetProfiler() *Profiler { return singleton } +// Config configurates Profiler. func (profiler *Profiler) Config(logger log.Logger, shardID string, metricsReportURL string) { profiler.logger = logger profiler.pid = int32(os.Getpid()) @@ -39,6 +43,7 @@ func (profiler *Profiler) Config(logger log.Logger, shardID string, metricsRepor profiler.MetricsReportURL = metricsReportURL } +// LogMemory logs memory. func (profiler *Profiler) LogMemory() { for { // log mem usage @@ -50,6 +55,7 @@ func (profiler *Profiler) LogMemory() { } } +// LogCPU logs CPU metrics. func (profiler *Profiler) LogCPU() { for { // log cpu usage @@ -61,6 +67,7 @@ func (profiler *Profiler) LogCPU() { } } +// LogMetrics logs metrics. func (profiler *Profiler) LogMetrics(metrics map[string]interface{}) { jsonValue, _ := json.Marshal(metrics) rsp, err := http.Post(profiler.MetricsReportURL, "application/json", bytes.NewBuffer(jsonValue)) @@ -69,6 +76,7 @@ func (profiler *Profiler) LogMetrics(metrics map[string]interface{}) { } } +// Start starts profiling. func (profiler *Profiler) Start() { profiler.proc, _ = process.NewProcess(profiler.pid) go profiler.LogCPU() diff --git a/proto/client/client.go b/proto/client/client.go index 5bcc08177..be7644cb8 100644 --- a/proto/client/client.go +++ b/proto/client/client.go @@ -8,28 +8,31 @@ import ( "github.com/harmony-one/harmony/proto" ) -// The specific types of message under Client category -type ClientMessageType byte +// MessageType is the specific types of message under Client category +type MessageType byte +// Message type supported by client const ( - Transaction ClientMessageType = iota + Transaction MessageType = iota // TODO: add more types ) -// The types of messages used for Client/Transaction +// TransactionMessageType defines the types of messages used for Client/Transaction type TransactionMessageType int +// The proof of accept or reject returned by the leader to the client tnat issued cross shard transactions const ( - ProofOfLock TransactionMessageType = iota // The proof of accept or reject returned by the leader to the client tnat issued cross shard transactions. + ProofOfLock TransactionMessageType = iota UtxoResponse ) +// FetchUtxoResponseMessage is the data structure of UTXO map type FetchUtxoResponseMessage struct { UtxoMap blockchain.UtxoMap ShardID uint32 } -// [leader] Constructs the proof of accept or reject message that will be sent to client +// ConstructProofOfAcceptOrRejectMessage constructs the proof of accept or reject message that will be sent to client func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)}) byteBuffer.WriteByte(byte(Transaction)) @@ -40,7 +43,7 @@ func ConstructProofOfAcceptOrRejectMessage(proofs []blockchain.CrossShardTxProof return byteBuffer.Bytes() } -// Constructs the response message to fetch utxo message +// ConstructFetchUtxoResponseMessage constructs the response message to fetch utxo message func ConstructFetchUtxoResponseMessage(utxoMap *blockchain.UtxoMap, shardID uint32) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Client)}) byteBuffer.WriteByte(byte(Transaction)) diff --git a/proto/common.go b/proto/common.go index be8100e0f..a552e98b3 100644 --- a/proto/common.go +++ b/proto/common.go @@ -21,7 +21,7 @@ n - 2 bytes - actual message payload ---- content end ----- */ -// The message category enum +// MessageCategory defines the message category enum type MessageCategory byte //Consensus and other message categories @@ -39,26 +39,26 @@ const MessageCategoryBytes = 1 // MessageTypeBytes is the number of bytes message type takes const MessageTypeBytes = 1 -// Get the message category from the p2p message content +// GetMessageCategory gets the message category from the p2p message content func GetMessageCategory(message []byte) (MessageCategory, error) { if len(message) < MessageCategoryBytes { - return 0, errors.New("Failed to get message category: no data available.") + return 0, errors.New("failed to get message category: no data available") } return MessageCategory(message[MessageCategoryBytes-1]), nil } -// Get the message type from the p2p message content +// GetMessageType gets the message type from the p2p message content func GetMessageType(message []byte) (byte, error) { if len(message) < MessageCategoryBytes+MessageTypeBytes { - return 0, errors.New("Failed to get message type: no data available.") + return 0, errors.New("failed to get message type: no data available") } return byte(message[MessageCategoryBytes+MessageTypeBytes-1]), nil } -// Get the node message payload from the p2p message content +// GetMessagePayload gets the node message payload from the p2p message content func GetMessagePayload(message []byte) ([]byte, error) { if len(message) < MessageCategoryBytes+MessageTypeBytes { - return []byte{}, errors.New("Failed to get message payload: no data available.") + return []byte{}, errors.New("failed to get message payload: no data available") } return message[MessageCategoryBytes+MessageTypeBytes:], nil } diff --git a/proto/consensus/consensus.go b/proto/consensus/consensus.go index d389dd4fb..94c086292 100644 --- a/proto/consensus/consensus.go +++ b/proto/consensus/consensus.go @@ -71,12 +71,12 @@ Response: // MessageTypeBytes is the number of bytes consensus message type occupies const MessageTypeBytes = 1 -// ConsensusMessageType is the specific types of message under Consensus category -type ConsensusMessageType byte +// ConMessageType is the specific types of message under Consensus category +type ConMessageType byte // Consensus message type constants. const ( - Consensus ConsensusMessageType = iota + Consensus ConMessageType = iota // TODO: add more types ) @@ -117,23 +117,23 @@ func (msgType MessageType) String() string { return names[msgType] } -// Get the consensus message type from the consensus message +// GetConsensusMessageType gets the consensus message type from the consensus message func GetConsensusMessageType(message []byte) (MessageType, error) { if len(message) < 1 { - return 0, errors.New("Failed to get consensus message type: no data available.") + return 0, errors.New("failed to get consensus message type: no data available") } return MessageType(message[0]), nil } -// Get the consensus message payload from the consensus message +// GetConsensusMessagePayload gets the consensus message payload from the consensus message func GetConsensusMessagePayload(message []byte) ([]byte, error) { if len(message) < 2 { - return []byte{}, errors.New("Failed to get consensus message payload: no data available.") + return []byte{}, errors.New("failed to get consensus message payload: no data available") } return message[MessageTypeBytes:], nil } -// Concatenate msgType as one byte with payload, and return the whole byte array +// ConstructConsensusMessage concatenates msgType as one byte with payload, and return the whole byte array func ConstructConsensusMessage(consensusMsgType MessageType, payload []byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Consensus)}) byteBuffer.WriteByte(byte(Consensus)) diff --git a/proto/identity/identity.go b/proto/identity/identity.go index 716dfe165..cafba9cc3 100644 --- a/proto/identity/identity.go +++ b/proto/identity/identity.go @@ -10,12 +10,12 @@ import ( // IdentityMessageTypeBytes is the number of bytes consensus message type occupies const IdentityMessageTypeBytes = 1 -// IdentityMessageType is the identity message type. -type IdentityMessageType byte +// IDMessageType is the identity message type. +type IDMessageType byte // Constants of IdentityMessageType. const ( - Identity IdentityMessageType = iota + Identity IDMessageType = iota // TODO: add more types ) @@ -44,7 +44,7 @@ func (msgType MessageType) String() string { // GetIdentityMessageType Get the identity message type from the identity message func GetIdentityMessageType(message []byte) (MessageType, error) { if len(message) < 1 { - return 0, errors.New("Failed to get identity message type: no data available.") + return 0, errors.New("failed to get identity message type: no data available") } return MessageType(message[0]), nil } @@ -52,12 +52,12 @@ func GetIdentityMessageType(message []byte) (MessageType, error) { // GetIdentityMessagePayload message payload from the identity message func GetIdentityMessagePayload(message []byte) ([]byte, error) { if len(message) < 2 { - return []byte{}, errors.New("Failed to get identity message payload: no data available.") + return []byte{}, errors.New("failed to get identity message payload: no data available") } return message[IdentityMessageTypeBytes:], nil } -// Concatenate msgType as one byte with payload, and return the whole byte array +// ConstructIdentityMessage concatenates msgType as one byte with payload, and return the whole byte array func ConstructIdentityMessage(identityMessageType MessageType, payload []byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Identity)}) byteBuffer.WriteByte(byte(Identity)) diff --git a/proto/node/node.go b/proto/node/node.go index 195e93ba1..d440231cd 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -28,7 +28,6 @@ const ( Block Client Control - BlockchainSync PING // node send ip/pki to register with leader PONG // node broadcast pubK // TODO: add more types @@ -169,25 +168,6 @@ func ConstructTransactionListMessageAccount(transactions types.Transactions) []b return byteBuffer.Bytes() } -// ConstructBlockchainSyncMessage constructs Blockchain Sync Message. -func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte { - byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) - byteBuffer.WriteByte(byte(BlockchainSync)) - byteBuffer.WriteByte(byte(msgType)) - if msgType != GetLastBlockHashes { - byteBuffer.Write(blockHash[:]) - } - return byteBuffer.Bytes() -} - -// GenerateBlockchainSyncMessage generates blockchain sync message. -func GenerateBlockchainSyncMessage(payload []byte) *BlockchainSyncMessage { - dec := gob.NewDecoder(bytes.NewBuffer(payload)) - var res BlockchainSyncMessage - dec.Decode(&res) - return &res -} - // ConstructRequestTransactionsMessage constructs serialized transactions func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) diff --git a/syncing/syncing.go b/syncing/syncing.go index 14df44604..0c7d3770d 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -1,7 +1,9 @@ package syncing import ( + "bytes" "reflect" + "sort" "sync" "time" @@ -11,6 +13,11 @@ import ( "github.com/harmony-one/harmony/syncing/downloader" ) +// Constants for syncing. +const ( + ConsensusRatio = float64(0.66) +) + // SyncPeerConfig is peer config to sync. type SyncPeerConfig struct { ip string @@ -27,7 +34,7 @@ type SyncBlockTask struct { // SyncConfig contains an array of SyncPeerConfig. type SyncConfig struct { - peers []SyncPeerConfig + peers []*SyncPeerConfig } // GetStateSync returns the implementation of StateSyncInterface interface. @@ -44,6 +51,30 @@ type StateSync struct { stateSyncTaskQueue *queue.Queue } +// CreateTestSyncPeerConfig used for testing. +func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig { + return &SyncPeerConfig{ + client: client, + blockHashes: blockHashes, + } +} + +// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes. +func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int { + if len(a.blockHashes) != len(b.blockHashes) { + if len(a.blockHashes) < len(b.blockHashes) { + return -1 + } + return 1 + } + for id := range a.blockHashes { + if !reflect.DeepEqual(a.blockHashes[id], b.blockHashes[id]) { + return bytes.Compare(a.blockHashes[id], b.blockHashes[id]) + } + } + return 0 +} + // GetBlockHashes gets block hashes by calling grpc request to the corresponding peer. func (peerConfig *SyncPeerConfig) GetBlockHashes() error { if peerConfig.client == nil { @@ -82,18 +113,18 @@ func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain. func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { ss.peerNumber = len(peers) ss.syncConfig = &SyncConfig{ - peers: make([]SyncPeerConfig, ss.peerNumber), + peers: make([]*SyncPeerConfig, ss.peerNumber), } for id := range ss.syncConfig.peers { - ss.syncConfig.peers[id] = SyncPeerConfig{ + ss.syncConfig.peers[id] = &SyncPeerConfig{ ip: peers[id].IP, port: peers[id].Port, } } } -// makeConnectionToPeers makes grpc connection to all peers. -func (ss *StateSync) makeConnectionToPeers() { +// MakeConnectionToPeers makes grpc connection to all peers. +func (ss *StateSync) MakeConnectionToPeers() { var wg sync.WaitGroup wg.Add(ss.peerNumber) @@ -101,9 +132,14 @@ func (ss *StateSync) makeConnectionToPeers() { go func(peerConfig *SyncPeerConfig) { defer wg.Done() peerConfig.client = downloader.ClientSetup(peerConfig.ip, peerConfig.port) - }(&ss.syncConfig.peers[id]) + }(ss.syncConfig.peers[id]) } wg.Wait() + ss.CleanUpNilPeers() +} + +// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber. +func (ss *StateSync) CleanUpNilPeers() { ss.activePeerNumber = 0 for _, configPeer := range ss.syncConfig.peers { if configPeer.client != nil { @@ -112,24 +148,70 @@ func (ss *StateSync) makeConnectionToPeers() { } } -// areConsensusHashesEqual chesk if all consensus hashes are equal. -func (ss *StateSync) areConsensusHashesEqual() bool { - var firstPeer *SyncPeerConfig - for _, configPeer := range ss.syncConfig.peers { - if configPeer.client != nil { - if firstPeer == nil { - firstPeer = &configPeer - } - if !reflect.DeepEqual(configPeer.blockHashes, firstPeer.blockHashes) { - return false - } +// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. +// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first. +func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { + // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. + curCount := 0 + curFirstID := -1 + maxCount := 0 + maxFirstID := -1 + for i := range syncConfig.peers { + if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { + curCount = 1 + curFirstID = i + } else { + curCount++ + } + if curCount > maxCount { + maxCount = curCount + maxFirstID = curFirstID } } - return true + return maxFirstID, maxCount } -// getConsensusHashes gets all hashes needed to download. -func (ss *StateSync) getConsensusHashes() { +// InitForTesting used for testing. +func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { + for i := range syncConfig.peers { + syncConfig.peers[i].blockHashes = blockHashes + syncConfig.peers[i].client = client + } +} + +// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. +func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { + fixedPeer := syncConfig.peers[maxFirstID] + for i := 0; i < len(syncConfig.peers); i++ { + if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 { + // TODO: move it into a util delete func. + // See tip https://github.com/golang/go/wiki/SliceTricks + // Close the client and remove the peer out of the + syncConfig.peers[i].client.Close() + copy(syncConfig.peers[i:], syncConfig.peers[i+1:]) + syncConfig.peers[len(syncConfig.peers)-1] = nil + syncConfig.peers = syncConfig.peers[:len(syncConfig.peers)-1] + } + } +} + +// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. +func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool { + // Sort all peers by the blockHashes. + sort.Slice(ss.syncConfig.peers, func(i, j int) bool { + return CompareSyncPeerConfigByblockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 + }) + maxFirstID, maxCount := ss.syncConfig.GetHowManyMaxConsensus() + if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { + ss.syncConfig.CleanUpPeers(maxFirstID) + ss.CleanUpNilPeers() + return true + } + return false +} + +// GetConsensusHashes gets all hashes needed to download. +func (ss *StateSync) GetConsensusHashes() { for { var wg sync.WaitGroup wg.Add(ss.activePeerNumber) @@ -142,10 +224,10 @@ func (ss *StateSync) getConsensusHashes() { defer wg.Done() response := peerConfig.client.GetBlockHashes() peerConfig.blockHashes = response.Payload - }(&ss.syncConfig.peers[id]) + }(ss.syncConfig.peers[id]) } wg.Wait() - if ss.areConsensusHashesEqual() { + if ss.GetBlockHashesConsensusAndCleanUp() { break } } @@ -198,7 +280,7 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) { } } } - }(&ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) + }(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc) } wg.Wait() } @@ -208,10 +290,10 @@ func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) // Creates sync config. ss.CreateSyncConfig(peers) // Makes connections to peers. - ss.makeConnectionToPeers() + ss.MakeConnectionToPeers() for { // Gets consensus hashes. - ss.getConsensusHashes() + ss.GetConsensusHashes() // Generates state-sync task queue. ss.generateStateSyncTaskQueue(bc) diff --git a/syncing/syncing_test.go b/syncing/syncing_test.go index 013ac05c0..a55326092 100644 --- a/syncing/syncing_test.go +++ b/syncing/syncing_test.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/syncing" "github.com/harmony-one/harmony/syncing/downloader" pb "github.com/harmony-one/harmony/syncing/downloader/proto" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -69,6 +70,16 @@ func (node *FakeNode) Init(ip, port string) { node.server = downloader.NewServer(node) } +// SetBlockchain is used for testing +func (node *FakeNode) Init2(ip, port string) { + addresses := [][20]byte{TestAddressOne} + node.bc = bc.CreateBlockchainWithMoreBlocks(addresses, ShardID) + node.ip = ip + node.port = port + + node.server = downloader.NewServer(node) +} + // Start ... func (node *FakeNode) Start() error { var err error @@ -100,6 +111,16 @@ func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.Down return response, nil } +func TestCompareSyncPeerConfigByBlockHashes(t *testing.T) { + a := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 6}, {1, 2, 3, 4, 5, 6}}) + b := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 6}, {1, 2, 3, 4, 5, 6}}) + assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, b), 0, "they should be equal") + c := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 7}, {1, 2, 3, 4, 5, 6}}) + assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, c), -1, "a should be less than c") + d := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 4}, {1, 2, 3, 4, 5, 6}}) + assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, d), 1, "a should be greater than c") +} + func TestSyncing(t *testing.T) { fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}} for i := range fakeNodes { @@ -108,6 +129,11 @@ func TestSyncing(t *testing.T) { t.Error(err) } } + defer func() { + for _, fakeNode := range fakeNodes { + fakeNode.grpcServer.Stop() + } + }() stateSync := &syncing.StateSync{} bc := &bc.Blockchain{} @@ -125,7 +151,41 @@ func TestSyncing(t *testing.T) { } } - for _, fakeNode := range fakeNodes { - fakeNode.grpcServer.Stop() +} + +func TestSyncingIncludingBadNode(t *testing.T) { + fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}} + for i := range fakeNodes { + if i == 2 { + // Bad node. + fakeNodes[i].Init2(serverIP, ServerPorts[i]) + } else { + // Good node. + fakeNodes[i].Init(serverIP, ServerPorts[i]) + } + if err := fakeNodes[i].Start(); err != nil { + t.Error(err) + } + } + defer func() { + for _, fakeNode := range fakeNodes { + fakeNode.grpcServer.Stop() + } + }() + + stateSync := &syncing.StateSync{} + bc := &bc.Blockchain{} + peers := make([]p2p.Peer, len(fakeNodes)) + for i := range peers { + peers[i].IP = fakeNodes[i].ip + peers[i].Port = fakeNodes[i].port + } + + stateSync.StartStateSync(peers, bc) + + for i := range bc.Blocks { + if !reflect.DeepEqual(bc.Blocks[i], fakeNodes[0].bc.Blocks[i]) { + t.Error("not equal") + } } } diff --git a/utils/bytes.go b/utils/bytes.go index a309ad43b..fecc34172 100644 --- a/utils/bytes.go +++ b/utils/bytes.go @@ -111,7 +111,7 @@ func LeftPadBytes(slice []byte, l int) []byte { return padded } -// Parse the string representation of hex into 32 byte array +// Get32BytesFromString parses the string representation of hex into 32 byte array func Get32BytesFromString(hashString string) ([32]byte, error) { bytes, err := hex.DecodeString(hashString) if err != nil { diff --git a/utils/distribution_config.go b/utils/distribution_config.go index ce90fd46b..b4419028b 100644 --- a/utils/distribution_config.go +++ b/utils/distribution_config.go @@ -12,6 +12,7 @@ import ( "github.com/harmony-one/harmony/p2p" ) +// ConfigEntry is the config entry. type ConfigEntry struct { IP string Port string @@ -20,17 +21,18 @@ type ConfigEntry struct { ValidatorID int // Validator ID in its shard. } +// DistributionConfig is the distribution config. type DistributionConfig struct { config []ConfigEntry } -// done +// NewDistributionConfig creates new DistributionConfig func NewDistributionConfig() *DistributionConfig { config := DistributionConfig{} return &config } -// Gets all the leader peers and corresponding shard Ids +// GetLeadersAndShardIDs gets all the leader peers and corresponding shard Ids func (config *DistributionConfig) GetLeadersAndShardIDs() ([]p2p.Peer, []uint32) { var peerList []p2p.Peer var shardIDs []uint32 @@ -48,6 +50,7 @@ func (config *DistributionConfig) GetLeadersAndShardIDs() ([]p2p.Peer, []uint32) return peerList, shardIDs } +// GetClientPeer returns client peer. func (config *DistributionConfig) GetClientPeer() *p2p.Peer { for _, entry := range config.config { if entry.Role != "client" { @@ -59,8 +62,7 @@ func (config *DistributionConfig) GetClientPeer() *p2p.Peer { return nil } -// done -// Gets the port of the client node in the config +// GetClientPort gets the port of the client node in the config func (config *DistributionConfig) GetClientPort() string { for _, entry := range config.config { if entry.Role == "client" { @@ -70,8 +72,7 @@ func (config *DistributionConfig) GetClientPort() string { return "" } -// done -// Parse the config file and return a 2d array containing the file data +// ReadConfigFile parses the config file and return a 2d array containing the file data func (config *DistributionConfig) ReadConfigFile(filename string) error { file, err := os.Open(filename) if err != nil { @@ -123,7 +124,7 @@ func (config *DistributionConfig) GetPeers(ip, port, shardID string) []p2p.Peer return peerList } -// GetPeers Gets the validator list +// GetSelfPeer Gets the validator list func (config *DistributionConfig) GetSelfPeer(ip, port, shardID string) p2p.Peer { for _, entry := range config.config { if entry.IP == ip && entry.Port == port && entry.ShardID == shardID { @@ -147,10 +148,12 @@ func (config *DistributionConfig) GetLeader(shardID string) p2p.Peer { return leaderPeer } +// GetConfigEntries returns a list of ConfigEntry. func (config *DistributionConfig) GetConfigEntries() []ConfigEntry { return config.config } +// GetMyConfigEntry ... func (config *DistributionConfig) GetMyConfigEntry(ip string, port string) *ConfigEntry { if config.config == nil { return nil @@ -165,6 +168,6 @@ func (config *DistributionConfig) GetMyConfigEntry(ip string, port string) *Conf func setKey(peer *p2p.Peer) { // Get public key deterministically based on ip and port - priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIdFromPeer(*peer))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) + priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIDFromPeer(*peer))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) peer.PubKey = pki.GetPublicKeyFromScalar(priKey) } diff --git a/utils/metrics.go b/utils/metrics.go index adc276f55..0d85470a9 100644 --- a/utils/metrics.go +++ b/utils/metrics.go @@ -1,5 +1,6 @@ package utils +// BToMb ... func BToMb(b uint64) uint64 { return b / 1024 / 1024 } diff --git a/utils/utils.go b/utils/utils.go index b91df3a05..5305b130f 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -25,27 +25,23 @@ func ConvertFixedDataIntoByteArray(data interface{}) []byte { } // TODO(minhdoan): this is probably a hack, probably needs some strong non-collision hash. -func GetUniqueIdFromPeer(peer p2p.Peer) uint16 { - reg, err := regexp.Compile("[^0-9]+") - if err != nil { - log.Panic("Regex Compilation Failed", "err", err) - } - socketId := reg.ReplaceAllString(peer.IP+peer.Port, "") // A integer Id formed by unique IP/PORT pair - value, _ := strconv.Atoi(socketId) - return uint16(value) +// GetUniqueIDFromPeer -- +func GetUniqueIDFromPeer(peer p2p.Peer) uint16 { + return GetUniqueIDFromIPPort(peer.IP, peer.Port) } -func GetUniqueIdFromIPPort(ip, port string) uint16 { +// GetUniqueIDFromIPPort -- +func GetUniqueIDFromIPPort(ip, port string) uint16 { reg, err := regexp.Compile("[^0-9]+") if err != nil { log.Panic("Regex Compilation Failed", "err", err) } - socketId := reg.ReplaceAllString(ip+port, "") // A integer Id formed by unique IP/PORT pair - value, _ := strconv.Atoi(socketId) + socketID := reg.ReplaceAllString(ip+port, "") // A integer Id formed by unique IP/PORT pair + value, _ := strconv.Atoi(socketID) return uint16(value) } -// RunCmd Runs command `name` with arguments `args` +// RunCmd runs command `name` with arguments `args` func RunCmd(name string, args ...string) error { cmd := exec.Command(name, args...) if err := cmd.Start(); err != nil { @@ -64,8 +60,9 @@ func RunCmd(name string, args ...string) error { return nil } +// GenKey generates a key given ip and port. func GenKey(ip, port string) (kyber.Scalar, kyber.Point) { - priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIdFromIPPort(ip, port))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) + priKey := crypto.Ed25519Curve.Scalar().SetInt64(int64(GetUniqueIDFromIPPort(ip, port))) // TODO: figure out why using a random hash value doesn't work for private key (schnorr) pubKey := pki.GetPublicKeyFromScalar(priKey) return priKey, pubKey