Merge branch 'master' of github.com:harmony-one/harmony into invoke_sc

pull/147/head
ak 6 years ago
commit 85b7f28a5a
  1. 2
      beaconchain/libs/beaconchain.go
  2. 2
      benchmark.go
  3. 17
      blockchain/block.go
  4. 2
      blockchain/blockchain.go
  5. 2
      blockchain/blockchain_test.go
  6. 2
      client/client.go
  7. 67
      consensus/consensus.go
  8. 17
      consensus/consensus_leader.go
  9. 29
      consensus/consensus_test.go
  10. 19
      deploy.sh
  11. 2
      newnode/newnode.go
  12. 60
      node/node.go
  13. 10
      node/node_handler.go
  14. 4
      node/p2p.go
  15. 10
      p2p/host/hostv1/hostv1.go
  16. 37
      p2p/host/message.go

@ -63,7 +63,7 @@ func (bc *BeaconChain) AcceptConnections(b []byte) {
response := bcconn.ResponseRandomNumber{NumberOfShards: bc.NumberOfShards, NumberOfNodesAdded: bc.NumberOfNodesAdded, Leaders: bc.Leaders}
msg := bcconn.SerializeRandomInfo(response)
msgToSend := proto_identity.ConstructIdentityMessage(proto_identity.Acknowledge, msg)
host.SendMessage(bc.host, Node.Self, msgToSend)
host.SendMessage(bc.host, Node.Self, msgToSend, nil)
}
//StartServer a server and process the request by a handler.

@ -180,6 +180,8 @@ func main() {
attack.GetInstance().SetLogger(consensus.Log)
// Current node.
currentNode := node.New(host, consensus, ldb)
currentNode.Consensus.OfflinePeers = currentNode.OfflinePeers
// If there is a client configured in the node list.
if clientPeer != nil {
currentNode.ClientPeer = clientPeer

@ -12,6 +12,11 @@ import (
"github.com/harmony-one/harmony/utils"
)
const (
// TimeStampForGenesisBlock is the constant timestamp for the genesis block.
TimeStampForGenesisBlock = 0
)
// Block is a block in the blockchain that contains block headers, transactions and signature etc.
type Block struct {
// Header
@ -115,14 +120,18 @@ func (b *Block) CalculateBlockHash() []byte {
}
// NewBlock creates and returns a new block.
func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardID uint32) *Block {
func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardID uint32, isGenesisBlock bool) *Block {
numTxs := int32(len(transactions))
var txIDs [][32]byte
for _, tx := range transactions {
txIDs = append(txIDs, tx.ID)
}
block := &Block{Timestamp: time.Now().Unix(), PrevBlockHash: prevBlockHash, NumTransactions: numTxs, TransactionIds: txIDs, Transactions: transactions, ShardID: shardID, Hash: [32]byte{}}
timestamp := time.Now().Unix()
if isGenesisBlock {
timestamp = TimeStampForGenesisBlock
}
block := &Block{Timestamp: timestamp, PrevBlockHash: prevBlockHash, NumTransactions: numTxs, TransactionIds: txIDs, Transactions: transactions, ShardID: shardID, Hash: [32]byte{}}
copy(block.Hash[:], block.CalculateBlockHash()[:])
return block
@ -130,7 +139,7 @@ func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardID uint3
// NewGenesisBlock creates and returns genesis Block.
func NewGenesisBlock(coinbase *Transaction, shardID uint32) *Block {
return NewBlock([]*Transaction{coinbase}, [32]byte{}, shardID)
return NewBlock([]*Transaction{coinbase}, [32]byte{}, shardID, true)
}
// NewStateBlock creates and returns a state Block based on utxo pool.
@ -157,7 +166,7 @@ func NewStateBlock(utxoPool *UTXOPool, numBlocks, numTxs int32) *Block {
stateTransactions = append(stateTransactions, stateTransaction)
}
}
newBlock := NewBlock(stateTransactions, [32]byte{}, utxoPool.ShardID)
newBlock := NewBlock(stateTransactions, [32]byte{}, utxoPool.ShardID, false)
newBlock.State = &State{NumBlocks: numBlocks, NumTransactions: numTxs}
return newBlock
}

@ -195,7 +195,7 @@ func (bc *Blockchain) NewUTXOTransaction(priKey kyber.Scalar, from, to [20]byte,
func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, priKey kyber.Scalar, from, to [20]byte, amount int, shardID uint32) bool {
tx := bc.NewUTXOTransaction(priKey, from, to, amount, shardID)
if tx != nil {
newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, shardID)
newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, shardID, false)
if bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) {
return true
}

@ -84,7 +84,7 @@ func TestVerifyNewBlock(t *testing.T) {
if tx == nil {
t.Error("failed to create a new transaction.")
}
newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, 0)
newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash, 0, false)
if !bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) {
t.Error("failed to add a new valid block.")

@ -124,7 +124,7 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) {
for shardID, txs := range BuildOutputShardTransactionMap(txsToSend) {
host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs))
host.SendMessage(client.host, (*client.Leaders)[shardID], node.ConstructUnlockToCommitOrAbortMessage(txs), nil)
}
}

@ -3,6 +3,7 @@ package consensus // consensus
import (
"fmt"
"reflect"
"strconv"
"sync"
@ -21,6 +22,8 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/utils"
proto_node "github.com/harmony-one/harmony/proto/node"
)
// Consensus data containing all info related to one round of consensus process
@ -99,6 +102,12 @@ type Consensus struct {
// The p2p host used to send/receive p2p messages
host host.Host
// Signal channel for lost validators
OfflinePeers chan p2p.Peer
// List of offline Peers
OfflinePeerList []p2p.Peer
}
// BlockConsensusStatus used to keep track of the consensus status of multiple blocks received so far
@ -185,6 +194,8 @@ func New(host host.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Con
consensus.Log = log.New()
consensus.uniqueIDInstance = utils.GetUniqueValidatorIDInstance()
consensus.OfflinePeerList = make([]p2p.Peer, 0)
// consensus.Log.Info("New Consensus", "IP", ip, "Port", port, "NodeID", consensus.nodeID, "priKey", consensus.priKey, "pubKey", consensus.pubKey)
return &consensus
}
@ -230,6 +241,9 @@ func (consensus *Consensus) ResetState() {
consensus.aggregatedCommitment = nil
consensus.aggregatedFinalCommitment = nil
consensus.secret = map[uint32]kyber.Scalar{}
// Clear the OfflinePeersList again
consensus.OfflinePeerList = make([]p2p.Peer, 0)
}
// Returns a string representation of this consensus
@ -256,18 +270,65 @@ func (consensus *Consensus) AddPeers(peers []*p2p.Peer) int {
peer.ValidatorID = int(consensus.uniqueIDInstance.GetUniqueID())
}
consensus.validators.Store(utils.GetUniqueIDFromPeer(*peer), *peer)
consensus.pubKeyLock.Lock()
consensus.PublicKeys = append(consensus.PublicKeys, peer.PubKey)
consensus.pubKeyLock.Unlock()
}
count++
}
return count
}
// RemovePeers will remove the peers from the validator list and PublicKeys
// RemovePeers will remove the peer from the validator list and PublicKeys
// It will be called when leader/node lost connection to peers
func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// TODO (lc) we need to have a corresponding RemovePeers function
// early return as most of the cases no peers to remove
if len(peers) == 0 {
return 0
}
count := 0
count2 := 0
newList := append(consensus.PublicKeys[:0:0], consensus.PublicKeys...)
for _, peer := range peers {
consensus.validators.Range(func(k, v interface{}) bool {
if p, ok := v.(p2p.Peer); ok {
// We are using peer.IP and peer.Port to identify the unique peer
// FIXME (lc): use a generic way to identify a peer
if p.IP == peer.IP && p.Port == peer.Port {
consensus.validators.Delete(k)
count++
}
return true
}
return false
})
for i, pp := range newList {
// Not Found the pubkey, if found pubkey, ignore it
if reflect.DeepEqual(peer.PubKey, pp) {
// consensus.Log.Debug("RemovePeers", "i", i, "pp", pp, "peer.PubKey", peer.PubKey)
newList = append(newList[:i], newList[i+1:]...)
count2++
}
}
}
if count2 > 0 {
consensus.UpdatePublicKeys(newList)
// Send out Pong messages to everyone in the shard to keep the publickeys in sync
// Or the shard won't be able to reach consensus if public keys are mismatch
validators := consensus.GetValidatorPeers()
pong := proto_node.NewPongMessage(validators, consensus.PublicKeys)
buffer := pong.ConstructPongMessage()
host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers)
}
return count2
}
// DebugPrintPublicKeys print all the PublicKeys in string format in Consensus
@ -420,5 +481,5 @@ func (consensus *Consensus) GetNodeID() uint16 {
// SendMessage sends message thru p2p host to peer.
func (consensus *Consensus) SendMessage(peer p2p.Peer, message []byte) {
host.SendMessage(consensus.host, peer, message)
host.SendMessage(consensus.host, peer, message, nil)
}

@ -38,6 +38,11 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
for { // keep waiting for new blocks
newBlock := <-blockChannel
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
consensus.Log.Debug("WaitForNewBlock", "removed peers", c)
}
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
@ -63,6 +68,11 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc
newBlock := <-blockChannel
// TODO: think about potential race condition
c := consensus.RemovePeers(consensus.OfflinePeerList)
if c > 0 {
consensus.Log.Debug("WaitForNewBlock", "removed peers", c)
}
if !consensus.HasEnoughValidators() {
consensus.Log.Debug("Not enough validators", "# Validators", len(consensus.PublicKeys))
time.Sleep(waitForEnoughValidators * time.Millisecond)
@ -114,6 +124,7 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
}
// processStartConsensusMessage is the handler for message which triggers consensus process.
// TODO(minh): clean-up. this function is never called.
func (consensus *Consensus) processStartConsensusMessage(payload []byte) {
// TODO: remove these method after testnet
tx := blockchain.NewCoinbaseTX([20]byte{0}, "y", 0)
@ -134,7 +145,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.Log.Debug("Stop encoding block")
msgToSend := consensus.constructAnnounceMessage()
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to AnnounceDone
consensus.state = AnnounceDone
consensus.commitByLeader(true)
@ -261,7 +272,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
consensus.responseByLeader(challengeScalar, targetState == ChallengeDone)
// Broadcast challenge message
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to targetState (ChallengeDone or FinalChallengeDone)
consensus.state = targetState
@ -418,7 +429,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState S
// Start the second round of Cosi
msgToSend := consensus.constructCollectiveSigMessage(collectiveSig, bitmap)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
consensus.commitByLeader(false)
} else {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(*responses))

@ -5,6 +5,7 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
"github.com/harmony-one/harmony/utils"
)
func TestNew(test *testing.T) {
@ -28,3 +29,31 @@ func TestNew(test *testing.T) {
test.Error("Consensus Leader is set to wrong Peer")
}
}
func TestRemovePeers(t *testing.T) {
_, pk1 := utils.GenKey("1", "1")
_, pk2 := utils.GenKey("2", "2")
_, pk3 := utils.GenKey("3", "3")
_, pk4 := utils.GenKey("4", "4")
_, pk5 := utils.GenKey("5", "5")
p1 := p2p.Peer{IP: "1", Port: "1", PubKey: pk1}
p2 := p2p.Peer{IP: "2", Port: "2", PubKey: pk2}
p3 := p2p.Peer{IP: "3", Port: "3", PubKey: pk3}
p4 := p2p.Peer{IP: "4", Port: "4", PubKey: pk4}
peers := []p2p.Peer{p1, p2, p3, p4}
peerRemove := []p2p.Peer{p1, p2}
leader := p2p.Peer{IP: "127.0.0.1", Port: "9000", PubKey: pk5}
host := p2pimpl.NewHost(leader)
consensus := New(host, "0", peers, leader)
// consensus.DebugPrintPublicKeys()
f := consensus.RemovePeers(peerRemove)
if f == 0 {
t.Errorf("consensus.RemovePeers return false")
consensus.DebugPrintPublicKeys()
}
}

@ -10,6 +10,17 @@ function cleanup() {
done
}
function killnode() {
local port=$1
if [ -n "port" ]; then
pid=$(/bin/ps -fu $USER | grep "benchmark" | grep "$port" | awk '{print $2}')
echo "killing node with port: $port"
kill -9 $pid 2> /dev/null
echo "node with port: $port is killed"
fi
}
trap cleanup SIGINT SIGTERM
function usage {
@ -25,6 +36,7 @@ USAGE: $ME [OPTIONS] config_file_name
-D duration txgen run duration (default: $DURATION)
-m min_peers minimal number of peers to start consensus (default: $MIN)
-s shards number of shards (default: $SHARDS)
-k nodeport kill the node with specified port number (default: $KILLPORT)
This script will build all the binaries and start benchmark and txgen based on the configuration file.
@ -43,8 +55,9 @@ TXGEN=true
DURATION=90
MIN=5
SHARDS=2
KILLPORT=9004
while getopts "hpdtD:m:s:" option; do
while getopts "hpdtD:m:s:k:" option; do
case $option in
h) usage ;;
p) PEER='-peer_discovery' ;;
@ -53,6 +66,7 @@ while getopts "hpdtD:m:s:" option; do
D) DURATION=$OPTARG ;;
m) MIN=$OPTARG ;;
s) SHARDS=$OPTARG ;;
k) KILLPORT=$OPTARG ;;
esac
done
@ -103,6 +117,9 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
fi
done < $config
# Emulate node offline
(sleep 45; killnode $KILLPORT) &
echo "launching txgen ..."
if [ "$TXGEN" == "true" ]; then
if [ -z "$PEER" ]; then

@ -89,7 +89,7 @@ checkLoop:
gotShardInfo = true
break checkLoop
} else {
host.SendMessage(node.host, BCPeer, msgToSend)
host.SendMessage(node.host, BCPeer, msgToSend, nil)
}
}
}

@ -49,6 +49,26 @@ const (
NodeLeader // Node is the leader of some shard.
)
func (state State) String() string {
switch state {
case NodeInit:
return "NodeInit"
case NodeWaitToJoin:
return "NodeWaitToJoin"
case NodeJoinedShard:
return "NodeJoinedShard"
case NodeOffline:
return "NodeOffline"
case NodeReadyForConsensus:
return "NodeReadyForConsensus"
case NodeDoingConsensus:
return "NodeDoingConsensus"
case NodeLeader:
return "NodeLeader"
}
return "Unknown"
}
// Constants related to doing syncing.
const (
NotDoingSyncing uint32 = iota
@ -112,6 +132,9 @@ type Node struct {
// Channel to stop sending ping message
StopPing chan struct{}
// Signal channel for lost validators
OfflinePeers chan p2p.Peer
}
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -229,6 +252,17 @@ func DeserializeNode(d []byte) *NetworkNode {
return &wn
}
//AddSmartContractsToPendingTransactions adds the faucet contract the genesis block.
func (node *Node) AddSmartContractsToPendingTransactions() {
// Add a contract deployment transactionv
priKey := node.ContractKeys[0]
contractData := "0x608060405234801561001057600080fd5b506040516020806104c08339810180604052602081101561003057600080fd5b505160008054600160a060020a0319163317808255600160a060020a031681526001602081905260409091205560ff811661006c600282610073565b50506100bd565b8154818355818111156100975760008381526020902061009791810190830161009c565b505050565b6100ba91905b808211156100b657600081556001016100a2565b5090565b90565b6103f4806100cc6000396000f3fe60806040526004361061005b577c010000000000000000000000000000000000000000000000000000000060003504635c19a95c8114610060578063609ff1bd146100955780639e7b8d61146100c0578063b3f98adc146100f3575b600080fd5b34801561006c57600080fd5b506100936004803603602081101561008357600080fd5b5035600160a060020a0316610120565b005b3480156100a157600080fd5b506100aa610280565b6040805160ff9092168252519081900360200190f35b3480156100cc57600080fd5b50610093600480360360208110156100e357600080fd5b5035600160a060020a03166102eb565b3480156100ff57600080fd5b506100936004803603602081101561011657600080fd5b503560ff16610348565b3360009081526001602081905260409091209081015460ff1615610144575061027d565b5b600160a060020a0382811660009081526001602081905260409091200154620100009004161580159061019c5750600160a060020a0382811660009081526001602081905260409091200154620100009004163314155b156101ce57600160a060020a039182166000908152600160208190526040909120015462010000900490911690610145565b600160a060020a0382163314156101e5575061027d565b6001818101805460ff1916821775ffffffffffffffffffffffffffffffffffffffff0000191662010000600160a060020a0386169081029190911790915560009081526020829052604090209081015460ff16156102725781546001820154600280549091610100900460ff1690811061025b57fe5b60009182526020909120018054909101905561027a565b815481540181555b50505b50565b600080805b60025460ff821610156102e6578160028260ff168154811015156102a557fe5b906000526020600020016000015411156102de576002805460ff83169081106102ca57fe5b906000526020600020016000015491508092505b600101610285565b505090565b600054600160a060020a0316331415806103215750600160a060020a0381166000908152600160208190526040909120015460ff165b1561032b5761027d565b600160a060020a0316600090815260016020819052604090912055565b3360009081526001602081905260409091209081015460ff1680610371575060025460ff831610155b1561037c575061027d565b6001818101805460ff191690911761ff00191661010060ff8516908102919091179091558154600280549192909181106103b257fe5b600091825260209091200180549091019055505056fea165627a7a72305820164189ef302b4648e01e22456b0a725191604cb63ee472f230ef6a2d17d702f900290000000000000000000000000000000000000000000000000000000000000002"
dataEnc := common.FromHex(contractData)
// Unsigned transaction to avoid the case of transaction address.
mycontracttx, _ := types.SignTx(types.NewContractCreation(uint64(0), 0, big.NewInt(1000000), params.TxGasContractCreation*10, nil, dataEnc), types.HomesteadSigner{}, priKey)
node.pendingTransactionsAccount = append(node.pendingTransactionsAccount, mycontracttx)
}
// New creates a new node.
func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
node := Node{}
@ -270,11 +304,17 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
for i := 0; i < 100; i++ {
testBankKey, _ := ecdsa.GenerateKey(crypto.S256(), reader)
testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds := big.NewInt(10000000000)
testBankFunds := big.NewInt(8000000000000000000)
genesisAloc[testBankAddress] = core.GenesisAccount{Balance: testBankFunds}
node.TestBankKeys = append(node.TestBankKeys, testBankKey)
}
contractKey, _ := ecdsa.GenerateKey(crypto.S256(), reader)
contractAddress := crypto.PubkeyToAddress(contractKey.PublicKey)
contractFunds := big.NewInt(8000000000000000000)
genesisAloc[contractAddress] = core.GenesisAccount{Balance: contractFunds}
node.ContractKeys = append(node.ContractKeys, contractKey)
database := hdb.NewMemDatabase()
chainConfig := params.TestChainConfig
chainConfig.ChainID = big.NewInt(int64(node.Consensus.ShardID)) // Use ChainID as piggybacked ShardID
@ -286,11 +326,13 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
_ = gspec.MustCommit(database)
chain, _ := core.NewBlockChain(database, nil, gspec.Config, node.Consensus, vm.Config{}, nil)
node.Chain = chain
//This one is not used --- RJ.
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain)
node.BlockChannelAccount = make(chan *types.Block)
node.Worker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
//Initialize the pending transactions with smart contract transactions
node.AddSmartContractsToPendingTransactions()
}
// Logger
@ -305,6 +347,9 @@ func New(host host.Host, consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
node.syncingState = NotDoingSyncing
node.StopPing = make(chan struct{})
node.OfflinePeers = make(chan p2p.Peer)
go node.RemovePeersHandler()
return &node
}
@ -449,3 +494,14 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*
}
return response, nil
}
// RemovePeersHandler is a goroutine to wait on the OfflinePeers channel
// and remove the peers from validator list
func (node *Node) RemovePeersHandler() {
for {
select {
case p := <-node.OfflinePeers:
node.Consensus.OfflinePeerList = append(node.Consensus.OfflinePeerList, p)
}
}
}

@ -310,7 +310,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}) {
node.transactionInConsensus = selectedTxs
node.CrossTxsInConsensus = crossShardTxAndProofs
newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash, node.Consensus.ShardID)
newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash, node.Consensus.ShardID, false)
break
}
}
@ -566,7 +566,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte) int {
// Broadcast the message to all validators, as publicKeys is updated
// FIXME: HAR-89 use a separate nodefind/neighbor message
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer)
host.BroadcastMessageFromLeader(node.GetHost(), peers, buffer, node.Consensus.OfflinePeers)
return len(peers)
}
@ -578,6 +578,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
return -1
}
// node.log.Debug("pongMessageHandler", "pong", pong, "nodeID", node.Consensus.GetNodeID())
peers := make([]*p2p.Peer, 0)
for _, p := range pong.Peers {
@ -615,9 +617,13 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, key)
}
if node.State == NodeWaitToJoin {
node.State = NodeJoinedShard
// Notify JoinShard to stop sending Ping messages
if node.StopPing != nil {
node.StopPing <- struct{}{}
}
}
return node.Consensus.UpdatePublicKeys(publicKeys)
}

@ -7,12 +7,12 @@ import (
// SendMessage sends data to ip, port
func (node *Node) SendMessage(p p2p.Peer, data []byte) {
host.SendMessage(node.host, p, data)
host.SendMessage(node.host, p, data, nil)
}
// BroadcastMessage broadcasts message to peers
func (node *Node) BroadcastMessage(peers []p2p.Peer, data []byte) {
host.BroadcastMessage(node.host, peers, data)
host.BroadcastMessage(node.host, peers, data, nil)
}
// GetHost returns the p2p host

@ -1,6 +1,7 @@
package hostv1
import (
"fmt"
"io"
"net"
"time"
@ -71,18 +72,17 @@ func (host *HostV1) BindHandlerAndServe(handler p2p.StreamHandler) {
func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) {
addr := net.JoinHostPort(peer.IP, peer.Port)
conn, err := net.Dial("tcp", addr)
// log.Debug("Dial from local to remote", "localID", net.JoinHostPort(host.self.IP, host.self.Port), "local", conn.LocalAddr(), "remote", addr)
if err != nil {
log.Warn("Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err)
return
log.Warn("HostV1 SendMessage Dial() failed", "from", net.JoinHostPort(host.self.IP, host.self.Port), "to", addr, "error", err)
return fmt.Errorf("Dail Failed")
}
defer conn.Close()
nw, err := conn.Write(message)
if err != nil {
log.Warn("Write() failed", "addr", conn.RemoteAddr(), "error", err)
return
return fmt.Errorf("Write Failed")
}
if nw < len(message) {
log.Warn("Write() returned short count",
@ -91,7 +91,7 @@ func (host *HostV1) SendMessage(peer p2p.Peer, message []byte) (err error) {
}
// No ack (reply) message from the receiver for now.
return
return nil
}
// Close closes the host

@ -13,14 +13,14 @@ import (
// SendMessage is to connect a socket given a port and send the given message.
// TODO(minhdoan, rj): need to check if a peer is reachable or not.
func SendMessage(host Host, p p2p.Peer, message []byte) {
func SendMessage(host Host, p p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Construct normal p2p message
content := ConstructP2pMessage(byte(0), message)
go send(host, p, content)
go send(host, p, content, lostPeer)
}
// BroadcastMessage sends the message to a list of peers
func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
if len(peers) == 0 {
return
}
@ -32,7 +32,7 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
start := time.Now()
for _, peer := range peers {
peerCopy := peer
go send(h, peerCopy, content)
go send(h, peerCopy, content, lostPeer)
}
log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
@ -43,6 +43,13 @@ func BroadcastMessage(h Host, peers []p2p.Peer, msg []byte) {
}
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte, lostPeer chan p2p.Peer) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg, lostPeer)
}
// ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte {
@ -58,19 +65,10 @@ func ConstructP2pMessage(msgType byte, content []byte) []byte {
return byteBuffer.Bytes()
}
// BroadcastMessageFromLeader sends the message to a list of peers from a leader.
func BroadcastMessageFromLeader(h Host, peers []p2p.Peer, msg []byte) {
// TODO(minhdoan): Enable back for multicast.
peers = SelectMyPeers(peers, 1, MaxBroadCast)
BroadcastMessage(h, peers, msg)
log.Info("Done sending from leader")
}
// BroadcastMessageFromValidator sends the message to a list of peers from a validator.
func BroadcastMessageFromValidator(h Host, selfPeer p2p.Peer, peers []p2p.Peer, msg []byte) {
peers = SelectMyPeers(peers, selfPeer.ValidatorID*MaxBroadCast+1, (selfPeer.ValidatorID+1)*MaxBroadCast)
BroadcastMessage(h, peers, msg)
log.Info("Done sending from validator")
BroadcastMessage(h, peers, msg, nil)
}
// MaxBroadCast is the maximum number of neighbors to broadcast
@ -89,14 +87,14 @@ func SelectMyPeers(peers []p2p.Peer, min int, max int) []p2p.Peer {
}
// Send a message to another node with given port.
func send(h Host, peer p2p.Peer, message []byte) {
func send(h Host, peer p2p.Peer, message []byte, lostPeer chan p2p.Peer) {
// Add attack code here.
//attack.GetInstance().Run()
backoff := p2p.NewExpBackoff(250*time.Millisecond, 10*time.Second, 2)
backoff := p2p.NewExpBackoff(150*time.Millisecond, 5*time.Second, 2)
for trial := 0; trial < 10; trial++ {
var err error
h.SendMessage(peer, message)
err = h.SendMessage(peer, message)
if err == nil {
if trial > 0 {
log.Warn("retry send", "rety", trial)
@ -108,6 +106,11 @@ func send(h Host, peer p2p.Peer, message []byte) {
backoff.Sleep()
}
log.Error("gave up sending a message", "addr", net.JoinHostPort(peer.IP, peer.Port))
if lostPeer != nil {
// Notify lostPeer channel
lostPeer <- peer
}
}
// DialWithSocketClient joins host port and establishes connection

Loading…
Cancel
Save