diff --git a/beaconchain/libs/beaconchain.go b/beaconchain/libs/beaconchain.go index a5e16f5bb..172f0d8f9 100644 --- a/beaconchain/libs/beaconchain.go +++ b/beaconchain/libs/beaconchain.go @@ -63,7 +63,7 @@ func (bc *BeaconChain) AcceptConnections(b []byte) { response := bcconn.ResponseRandomNumber{NumberOfShards: bc.NumberOfShards, NumberOfNodesAdded: bc.NumberOfNodesAdded, Leaders: bc.Leaders} msg := bcconn.SerializeRandomInfo(response) msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg) - host.SendMessage(bc.host, Node.Self, msgToSend) + host.SendMessage(bc.host, Node.Self, msgToSend, nil) } //StartServer a server and process the request by a handler. diff --git a/benchmark.go b/benchmark.go index cbc75c4c4..99ddd745f 100644 --- a/benchmark.go +++ b/benchmark.go @@ -180,6 +180,8 @@ func main() { attack.GetInstance().SetLogger(consensus.Log) // Current node. currentNode := node.New(host, consensus, ldb) + currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers + // If there is a client configured in the node list. if clientPeer != nil { currentNode.ClientPeer = clientPeer diff --git a/blockchain/block.go b/blockchain/block.go index 48f925086..b306bbd7f 100644 --- a/blockchain/block.go +++ b/blockchain/block.go @@ -12,6 +12,11 @@ import ( "github.com/harmony-one/harmony/utils" ) +const ( + // TimeStampForGenesisBlock is the constant timestamp for the genesis block. + TimeStampForGenesisBlock = 0 +) + // Block is a block in the blockchain that contains block headers, transactions and signature etc. type Block struct { // Header @@ -115,14 +120,18 @@ func (b *Block) CalculateBlockHash() []byte { } // NewBlock creates and returns a new block. -func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardID uint32) *Block { +func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardID uint32, isGenesisBlock bool) *Block { numTxs := int32(len(transactions)) var txIDs [][32]byte for _, tx := range transactions { txIDs = append(txIDs, tx.ID) } - block := &Block{Timestamp: time.Now().Unix(), PrevBlockHash: prevBlockHash, NumTransactions: numTxs, TransactionIds: txIDs, Transactions: transactions, ShardID: shardID, Hash: [32]byte{}} + timestamp := time.Now().Unix() + if isGenesisBlock { + timestamp = TimeStampForGenesisBlock + } + block := &Block{Timestamp: timestamp, PrevBlockHash: prevBlockHash, NumTransactions: numTxs, TransactionIds: txIDs, Transactions: transactions, ShardID: shardID, Hash: [32]byte{}} copy(block.Hash[:], block.CalculateBlockHash()[:]) return block @@ -130,7 +139,7 @@ func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardID uint3 // NewGenesisBlock creates and returns genesis Block. func NewGenesisBlock(coinbase *Transaction, shardID uint32) *Block { - return NewBlock([]*Transaction{coinbase}, [32]byte{}, shardID) + return NewBlock([]*Transaction{coinbase}, [32]byte{}, shardID, true) } // NewStateBlock creates and returns a state Block based on utxo pool. @@ -157,7 +166,7 @@ func NewStateBlock(utxoPool *UTXOPool, numBlocks, numTxs int32) *Block { stateTransactions = append(stateTransactions, stateTransaction) } } - newBlock := NewBlock(stateTransactions, [32]byte{}, utxoPool.ShardID) + newBlock := NewBlock(stateTransactions, [32]byte{}, utxoPool.ShardID, false) newBlock.State = &State{NumBlocks: numBlocks, NumTransactions: numTxs} return newBlock } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 92ec83341..e13366be3 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -195,7 +195,7 @@ func (bc *Blockchain) NewUTXOTransaction(priKey kyber.Scalar, from, to [20]byte, func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, priKey kyber.Scalar, from, to [20]byte, amount int, shardID uint32) bool { tx := bc.NewUTXOTransaction(priKey, from, to, amount, shardID) if tx != nil { - newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, shardID) + newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, shardID, false) if bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) { return true } diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index b718da1eb..b003ca2d8 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -84,7 +84,7 @@ func TestVerifyNewBlock(t *testing.T) { if tx == nil { t.Error("failed to create a new transaction.") } - newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, 0) + newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, 0, false) if !bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) { t.Error("failed to add a new valid block.") diff --git a/client/client.go b/client/client.go index b6088f89d..cde62a19b 100644 --- a/client/client.go +++ b/client/client.go @@ -124,7 +124,7 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) { for shardID, txs := range BuildOutputShardTransactionMap(txsToSend) { - host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs)) + host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs), nil) } } diff --git a/consensus/consensus.go b/consensus/consensus.go index dce293105..89ea3d9da 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -3,6 +3,7 @@ package consensus // consensus import ( "fmt" + "reflect" "strconv" "sync" @@ -21,6 +22,8 @@ import ( "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/utils" + + proto_node "github.com/harmony-one/harmony/proto/node" ) // Consensus data containing all info related to one round of consensus process @@ -99,6 +102,12 @@ type Consensus struct { // The p2p host used to send/receive p2p messages host host.Host + + // Signal channel for lost validators + OfflinePeers chan p2p.Peer + + // List of offline Peers + OfflinePeerList []p2p.Peer } // BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far @@ -185,6 +194,8 @@ func New(host host.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Con consensus.Log = log.New() consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance() + consensus.OfflinePeerList = make([]p2p.Peer, 0) + // consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "pubKey", consensus.pubKey) return &consensus } @@ -230,6 +241,9 @@ func (consensus *Consensus) ResetState() { consensus.aggregatedCommitment = nil consensus.aggregatedFinalCommitment = nil consensus.secret = map[uint32]kyber.Scalar{} + + // Clear the OfflinePeersList again + consensus.OfflinePeerList = make([]p2p.Peer, 0) } // Returns a string representation of this consensus @@ -256,18 +270,65 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int { peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID()) } consensus.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer) + consensus.pubKeyLock.Lock() consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey) + consensus.pubKeyLock.Unlock() } count++ } return count } -// RemovePeers will remove the peers from the validator list and PublicKeys +// RemovePeers will remove the peer from the validator list and PublicKeys // It will be called when leader/node lost connection to peers func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { - // TODO (lc) we need to have a corresponding RemovePeers function - return 0 + // early return as most of the cases no peers to remove + if len(peers) == 0 { + return 0 + } + + count := 0 + count2 := 0 + newList := append(consensus.PublicKeys[:0:0], consensus.PublicKeys...) + + for _, peer := range peers { + consensus.validators.Range(func(k, v interface{}) bool { + if p, ok := v.(p2p.Peer); ok { + // We are using peer.IP and peer.Port to identify the unique peer + // FIXME (lc): use a generic way to identify a peer + if p.IP == peer.IP && p.Port == peer.Port { + consensus.validators.Delete(k) + count++ + } + return true + } + return false + }) + + for i, pp := range newList { + // Not Found the pubkey, if found pubkey, ignore it + if reflect.DeepEqual(peer.PubKey, pp) { + // consensus.Log.Debug("RemovePeers", "i", i, "pp", pp, "peer.PubKey", peer.PubKey) + newList = append(newList[:i], newList[i+1:]...) + count2++ + } + } + } + + if count2 > 0 { + consensus.UpdatePublicKeys(newList) + + // Send out Pong messages to everyone in the shard to keep the publickeys in sync + // Or the shard won't be able to reach consensus if public keys are mismatch + + validators := consensus.GetValidatorPeers() + pong := proto_node.NewPongMessage(validators, consensus.PublicKeys) + buffer := pong.ConstructPongMessage() + + host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers) + } + + return count2 } // DebugPrintPublicKeys print all the PublicKeys in string format in Consensus @@ -420,5 +481,5 @@ func (consensus *Consensus) GetNodeID() uint16 { // SendMessage sends message thru p2p host to peer. func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) { - host.SendMessage(consensus.host, peer, message) + host.SendMessage(consensus.host, peer, message, nil) } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 10e1e9341..59179c31f 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -38,6 +38,11 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) for { // keep waiting for new blocks newBlock := <-blockChannel + c := consensus.RemovePeers(consensus.OfflinePeerList) + if c > 0 { + consensus.Log.Debug("WaitForNewBlock", "removed peers", c) + } + if !consensus.HasEnoughValidators() { consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) time.Sleep(waitForEnoughValidators * time.Millisecond) @@ -63,6 +68,11 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc newBlock := <-blockChannel // TODO: think about potential race condition + c := consensus.RemovePeers(consensus.OfflinePeerList) + if c > 0 { + consensus.Log.Debug("WaitForNewBlock", "removed peers", c) + } + if !consensus.HasEnoughValidators() { consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) time.Sleep(waitForEnoughValidators * time.Millisecond) @@ -114,6 +124,7 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) { } // processStartConsensusMessage is the handler for message which triggers consensus process. +// TODO(minh): clean-up. this function is never called. func (consensus *Consensus) processStartConsensusMessage(payload []byte) { // TODO: remove these method after testnet tx := blockchain.NewCoinbaseTX([20]byte{0}, "y", 0) @@ -134,7 +145,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { consensus.Log.Debug("Stop encoding block") msgToSend := consensus.constructAnnounceMessage() - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) // Set state to AnnounceDone consensus.state = AnnounceDone consensus.commitByLeader(true) @@ -261,7 +272,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta consensus.responseByLeader(challengeScalar, targetState == ChallengeDone) // Broadcast challenge message - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) // Set state to targetState (ChallengeDone or FinalChallengeDone) consensus.state = targetState @@ -418,7 +429,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S // Start the second round of Cosi msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap) - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) consensus.commitByLeader(false) } else { consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses)) diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index b48454c7b..6c25d7427 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -5,6 +5,7 @@ import ( "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" + "github.com/harmony-one/harmony/utils" ) func TestNew(test *testing.T) { @@ -28,3 +29,31 @@ func TestNew(test *testing.T) { test.Error("Consensus Leader is set to wrong Peer") } } + +func TestRemovePeers(t *testing.T) { + _, pk1 := utils.GenKey("1", "1") + _, pk2 := utils.GenKey("2", "2") + _, pk3 := utils.GenKey("3", "3") + _, pk4 := utils.GenKey("4", "4") + _, pk5 := utils.GenKey("5", "5") + + p1 := p2p.Peer{IP: "1", Port: "1", PubKey: pk1} + p2 := p2p.Peer{IP: "2", Port: "2", PubKey: pk2} + p3 := p2p.Peer{IP: "3", Port: "3", PubKey: pk3} + p4 := p2p.Peer{IP: "4", Port: "4", PubKey: pk4} + + peers := []p2p.Peer{p1, p2, p3, p4} + + peerRemove := []p2p.Peer{p1, p2} + + leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5} + host := p2pimpl.NewHost(leader) + consensus := New(host, "0", peers, leader) + + // consensus.DebugPrintPublicKeys() + f := consensus.RemovePeers(peerRemove) + if f == 0 { + t.Errorf("consensus.RemovePeers return false") + consensus.DebugPrintPublicKeys() + } +} diff --git a/deploy.sh b/deploy.sh index dd8e924e4..03c4480dc 100755 --- a/deploy.sh +++ b/deploy.sh @@ -10,6 +10,17 @@ function cleanup() { done } +function killnode() { + local port=$1 + + if [ -n "port" ]; then + pid=$(/bin/ps -fu $USER | grep "benchmark" | grep "$port" | awk '{print $2}') + echo "killing node with port: $port" + kill -9 $pid 2> /dev/null + echo "node with port: $port is killed" + fi +} + trap cleanup SIGINT SIGTERM function usage { @@ -25,6 +36,7 @@ USAGE: $ME [OPTIONS] config_file_name -D duration txgen run duration (default: $DURATION) -m min_peers minimal number of peers to start consensus (default: $MIN) -s shards number of shards (default: $SHARDS) + -k nodeport kill the node with specified port number (default: $KILLPORT) This script will build all the binaries and start benchmark and txgen based on the configuration file. @@ -43,8 +55,9 @@ TXGEN=true DURATION=90 MIN=5 SHARDS=2 +KILLPORT=9004 -while getopts "hpdtD:m:s:" option; do +while getopts "hpdtD:m:s:k:" option; do case $option in h) usage ;; p) PEER='-peer_discovery' ;; @@ -53,6 +66,7 @@ while getopts "hpdtD:m:s:" option; do D) DURATION=$OPTARG ;; m) MIN=$OPTARG ;; s) SHARDS=$OPTARG ;; + k) KILLPORT=$OPTARG ;; esac done @@ -103,6 +117,9 @@ while IFS='' read -r line || [[ -n "$line" ]]; do fi done < $config +# Emulate node offline +(sleep 45; killnode $KILLPORT) & + echo "launching txgen ..." if [ "$TXGEN" == "true" ]; then if [ -z "$PEER" ]; then diff --git a/newnode/newnode.go b/newnode/newnode.go index ace735fd9..6aa086fa2 100644 --- a/newnode/newnode.go +++ b/newnode/newnode.go @@ -89,7 +89,7 @@ checkLoop: gotShardInfo = true break checkLoop } else { - host.SendMessage(node.host, BCPeer, msgToSend) + host.SendMessage(node.host, BCPeer, msgToSend, nil) } } } diff --git a/node/node.go b/node/node.go index 2b3a91e67..5840e768c 100644 --- a/node/node.go +++ b/node/node.go @@ -49,6 +49,26 @@ const ( NodeLeader // Node is the leader of some shard. ) +func (state State) String() string { + switch state { + case NodeInit: + return "NodeInit" + case NodeWaitToJoin: + return "NodeWaitToJoin" + case NodeJoinedShard: + return "NodeJoinedShard" + case NodeOffline: + return "NodeOffline" + case NodeReadyForConsensus: + return "NodeReadyForConsensus" + case NodeDoingConsensus: + return "NodeDoingConsensus" + case NodeLeader: + return "NodeLeader" + } + return "Unknown" +} + // Constants related to doing syncing. const ( NotDoingSyncing uint32 = iota @@ -112,6 +132,9 @@ type Node struct { // Channel to stop sending ping message StopPing chan struct{} + + // Signal channel for lost validators + OfflinePeers chan p2p.Peer } // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client @@ -229,6 +252,17 @@ func DeserializeNode(d []byte) *NetworkNode { return &wn } +//AddSmartContractsToPendingTransactions adds the faucet contract the genesis block. +func (node *Node) AddSmartContractsToPendingTransactions() { + // Add a contract deployment transactionv + priKey := node.ContractKeys[0] + contractData := "0x608060405234801561001057600080fd5b506040516020806104c08339810180604052602081101561003057600080fd5b505160008054600160a060020a0319163317808255600160a060020a031681526001602081905260409091205560ff811661006c600282610073565b50506100bd565b8154818355818111156100975760008381526020902061009791810190830161009c565b505050565b6100ba91905b808211156100b657600081556001016100a2565b5090565b90565b6103f4806100cc6000396000f3fe60806040526004361061005b577c010000000000000000000000000000000000000000000000000000000060003504635c19a95c8114610060578063609ff1bd146100955780639e7b8d61146100c0578063b3f98adc146100f3575b600080fd5b34801561006c57600080fd5b506100936004803603602081101561008357600080fd5b5035600160a060020a0316610120565b005b3480156100a157600080fd5b506100aa610280565b6040805160ff9092168252519081900360200190f35b3480156100cc57600080fd5b50610093600480360360208110156100e357600080fd5b5035600160a060020a03166102eb565b3480156100ff57600080fd5b506100936004803603602081101561011657600080fd5b503560ff16610348565b3360009081526001602081905260409091209081015460ff1615610144575061027d565b5b600160a060020a0382811660009081526001602081905260409091200154620100009004161580159061019c5750600160a060020a0382811660009081526001602081905260409091200154620100009004163314155b156101ce57600160a060020a039182166000908152600160208190526040909120015462010000900490911690610145565b600160a060020a0382163314156101e5575061027d565b6001818101805460ff1916821775ffffffffffffffffffffffffffffffffffffffff0000191662010000600160a060020a0386169081029190911790915560009081526020829052604090209081015460ff16156102725781546001820154600280549091610100900460ff1690811061025b57fe5b60009182526020909120018054909101905561027a565b815481540181555b50505b50565b600080805b60025460ff821610156102e6578160028260ff168154811015156102a557fe5b906000526020600020016000015411156102de576002805460ff83169081106102ca57fe5b906000526020600020016000015491508092505b600101610285565b505090565b600054600160a060020a0316331415806103215750600160a060020a0381166000908152600160208190526040909120015460ff165b1561032b5761027d565b600160a060020a0316600090815260016020819052604090912055565b3360009081526001602081905260409091209081015460ff1680610371575060025460ff831610155b1561037c575061027d565b6001818101805460ff191690911761ff00191661010060ff8516908102919091179091558154600280549192909181106103b257fe5b600091825260209091200180549091019055505056fea165627a7a72305820164189ef302b4648e01e22456b0a725191604cb63ee472f230ef6a2d17d702f900290000000000000000000000000000000000000000000000000000000000000002" + dataEnc := common.FromHex(contractData) + // Unsigned transaction to avoid the case of transaction address. + mycontracttx, _ := types.SignTx(types.NewContractCreation(uint64(0), 0, big.NewInt(1000000), params.TxGasContractCreation*10, nil, dataEnc), types.HomesteadSigner{}, priKey) + node.pendingTransactionsAccount = append(node.pendingTransactionsAccount, mycontracttx) +} + // New creates a new node. func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node := Node{} @@ -270,11 +304,17 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { for i := 0; i < 100; i++ { testBankKey, _ := ecdsa.GenerateKey(crypto.S256(), reader) testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey) - testBankFunds := big.NewInt(10000000000) + testBankFunds := big.NewInt(8000000000000000000) genesisAloc[testBankAddress] = core.GenesisAccount{Balance: testBankFunds} node.TestBankKeys = append(node.TestBankKeys, testBankKey) } + contractKey, _ := ecdsa.GenerateKey(crypto.S256(), reader) + contractAddress := crypto.PubkeyToAddress(contractKey.PublicKey) + contractFunds := big.NewInt(8000000000000000000) + genesisAloc[contractAddress] = core.GenesisAccount{Balance: contractFunds} + node.ContractKeys = append(node.ContractKeys, contractKey) + database := hdb.NewMemDatabase() chainConfig := params.TestChainConfig chainConfig.ChainID = big.NewInt(int64(node.Consensus.ShardID)) // Use ChainID as piggybacked ShardID @@ -286,11 +326,13 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { _ = gspec.MustCommit(database) chain, _ := core.NewBlockChain(database, nil, gspec.Config, node.Consensus, vm.Config{}, nil) - node.Chain = chain + //This one is not used --- RJ. node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.BlockChannelAccount = make(chan *types.Block) node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) + //Initialize the pending transactions with smart contract transactions + node.AddSmartContractsToPendingTransactions() } // Logger @@ -305,6 +347,9 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node.syncingState = NotDoingSyncing node.StopPing = make(chan struct{}) + node.OfflinePeers = make(chan p2p.Peer) + go node.RemovePeersHandler() + return &node } @@ -449,3 +494,14 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (* } return response, nil } + +// RemovePeersHandler is a goroutine to wait on the OfflinePeers channel +// and remove the peers from validator list +func (node *Node) RemovePeersHandler() { + for { + select { + case p := <-node.OfflinePeers: + node.Consensus.OfflinePeerList = append(node.Consensus.OfflinePeerList, p) + } + } +} diff --git a/node/node_handler.go b/node/node_handler.go index 5e06d17a3..19543df8f 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -310,7 +310,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) { node.transactionInConsensus = selectedTxs node.CrossTxsInConsensus = crossShardTxAndProofs - newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash, node.Consensus.ShardID) + newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash, node.Consensus.ShardID, false) break } } @@ -566,7 +566,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int { // Broadcast the message to all validators, as publicKeys is updated // FIXME: HAR-89 use a separate nodefind/neighbor message - host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer) + host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers) return len(peers) } @@ -578,6 +578,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return -1 } + // node.log.Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID()) + peers := make([]*p2p.Peer, 0) for _, p := range pong.Peers { @@ -615,9 +617,13 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { publicKeys = append(publicKeys, key) } - node.State = NodeJoinedShard - // Notify JoinShard to stop sending Ping messages - node.StopPing <- struct{}{} + if node.State == NodeWaitToJoin { + node.State = NodeJoinedShard + // Notify JoinShard to stop sending Ping messages + if node.StopPing != nil { + node.StopPing <- struct{}{} + } + } return node.Consensus.UpdatePublicKeys(publicKeys) } diff --git a/node/p2p.go b/node/p2p.go index 836922c0b..5cdc1c080 100644 --- a/node/p2p.go +++ b/node/p2p.go @@ -7,12 +7,12 @@ import ( // SendMessage sends data to ip, port func (node *Node) SendMessage(p p2p.Peer, data []byte) { - host.SendMessage(node.host, p, data) + host.SendMessage(node.host, p, data, nil) } // BroadcastMessage broadcasts message to peers func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) { - host.BroadcastMessage(node.host, peers, data) + host.BroadcastMessage(node.host, peers, data, nil) } // GetHost returns the p2p host diff --git a/p2p/host/hostv1/hostv1.go b/p2p/host/hostv1/hostv1.go index 1144a7672..14f01fff2 100644 --- a/p2p/host/hostv1/hostv1.go +++ b/p2p/host/hostv1/hostv1.go @@ -1,6 +1,7 @@ package hostv1 import ( + "fmt" "io" "net" "time" @@ -71,18 +72,17 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) { func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { addr := net.JoinHostPort(peer.IP, peer.Port) conn, err := net.Dial("tcp", addr) - // log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr) if err != nil { - log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) - return + log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err) + return fmt.Errorf("Dail Failed") } defer conn.Close() nw, err := conn.Write(message) if err != nil { log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err) - return + return fmt.Errorf("Write Failed") } if nw < len(message) { log.Warn("Write() returned short count", @@ -91,7 +91,7 @@ func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) { } // No ack (reply) message from the receiver for now. - return + return nil } // Close closes the host diff --git a/p2p/host/message.go b/p2p/host/message.go index f3e576cf0..3f96641c0 100644 --- a/p2p/host/message.go +++ b/p2p/host/message.go @@ -13,14 +13,14 @@ import ( // SendMessage is to connect a socket given a port and send the given message. // TODO(minhdoan, rj): need to check if a peer is reachable or not. -func SendMessage(host Host, p p2p.Peer, message []byte) { +func SendMessage(host Host, p p2p.Peer, message []byte, lostPeer chan p2p.Peer) { // Construct normal p2p message content := ConstructP2pMessage(byte(0), message) - go send(host, p, content) + go send(host, p, content, lostPeer) } // BroadcastMessage sends the message to a list of peers -func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) { +func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) { if len(peers) == 0 { return } @@ -32,7 +32,7 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) { start := time.Now() for _, peer := range peers { peerCopy := peer - go send(h, peerCopy, content) + go send(h, peerCopy, content, lostPeer) } log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds()) @@ -43,6 +43,13 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) { } } +// BroadcastMessageFromLeader sends the message to a list of peers from a leader. +func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) { + // TODO(minhdoan): Enable back for multicast. + peers = SelectMyPeers(peers, 1, MaxBroadCast) + BroadcastMessage(h, peers, msg, lostPeer) +} + // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content] func ConstructP2pMessage(msgType byte, content []byte) []byte { @@ -58,19 +65,10 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte { return byteBuffer.Bytes() } -// BroadcastMessageFromLeader sends the message to a list of peers from a leader. -func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte) { - // TODO(minhdoan): Enable back for multicast. - peers = SelectMyPeers(peers, 1, MaxBroadCast) - BroadcastMessage(h, peers, msg) - log.Info("Done sending from leader") -} - // BroadcastMessageFromValidator sends the message to a list of peers from a validator. func BroadcastMessageFromValidator(h Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) { peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast) - BroadcastMessage(h, peers, msg) - log.Info("Done sending from validator") + BroadcastMessage(h, peers, msg, nil) } // MaxBroadCast is the maximum number of neighbors to broadcast @@ -89,14 +87,14 @@ func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer { } // Send a message to another node with given port. -func send(h Host, peer p2p.Peer, message []byte) { +func send(h Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) { // Add attack code here. //attack.GetInstance().Run() - backoff := p2p.NewExpBackoff(250*time.Millisecond, 10*time.Second, 2) + backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2) for trial := 0; trial < 10; trial++ { var err error - h.SendMessage(peer, message) + err = h.SendMessage(peer, message) if err == nil { if trial > 0 { log.Warn("retry send", "rety", trial) @@ -108,6 +106,11 @@ func send(h Host, peer p2p.Peer, message []byte) { backoff.Sleep() } log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port)) + + if lostPeer != nil { + // Notify lostPeer channel + lostPeer <- peer + } } // DialWithSocketClient joins host port and establishes connection