Merge pull request #89 from harmony-one/lc4pr

HAR-5: fix nonstop ping message and concurrent map access issue
pull/92/head
Leo Chen 6 years ago committed by GitHub
commit 39ed7a9f6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      deploy.sh
  2. 3
      go_executable_build.sh
  3. 17
      node/node.go
  4. 16
      node/node_handler.go

@ -7,26 +7,44 @@ function usage {
local ME=$(basename $0) local ME=$(basename $0)
cat<<EOU cat<<EOU
USAGE: $ME config_file_name USAGE: $ME [OPTIONS] config_file_name
-h print this help message
-p use peer discovery (default: $PEER)
-d enable db support (default: $DB)
-t toggle txgen (default: $TXGEN)
This script will build all the binaries and start benchmark and txgen based on the configuration file. This script will build all the binaries and start benchmark and txgen based on the configuration file.
EXAMPLES: EXAMPLES:
$ME local_config.txt $ME local_config.txt
$ME -p local_config.txt
EOU EOU
exit 0 exit 0
} }
PEER=
DB=
TXGEN=true
while getopts "hpdt" option; do
case $option in
h) usage ;;
p) PEER='-peer_discovery' ;;
d) DB='-db_supported' ;;
t) TXGEN=false ;;
esac
done
shift $((OPTIND-1))
config=$1 config=$1
if [ -z "$config" ]; then if [ -z "$config" ]; then
usage usage
fi fi
db_supported=$2
# Kill nodes if any # Kill nodes if any
./kill_node.sh ./kill_node.sh
@ -49,16 +67,10 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardID <<< $line IFS=' ' read ip port mode shardID <<< $line
#echo $ip $port $mode #echo $ip $port $mode
if [ "$mode" != "client" ]; then if [ "$mode" != "client" ]; then
if [ -z "$db_supported" ]; then ./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder $DB $PEER &
./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder&
else
./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder -db_supported&
fi
fi fi
done < $config done < $config
txgen_disabled=$3 if [ "$TXGEN" == "true" ]; then
# Generate transactions
if [ -z "$txgen_disabled" ]; then
./bin/txgen -config_file $config -log_folder $log_folder ./bin/txgen -config_file $config -log_folder $log_folder
fi fi

@ -61,6 +61,9 @@ function build_only
if [ "$(uname -s)" == "Linux" ]; then if [ "$(uname -s)" == "Linux" ]; then
$BINDIR/$bin -version $BINDIR/$bin -version
fi fi
if [ "$(uname -s)" == "Darwin" -a "$GOOS" == "darwin" ]; then
$BINDIR/$bin -version
fi
done done
$MD5 $BINDIR/* > $BINDIR/md5sum.txt 2> /dev/null $MD5 $BINDIR/* > $BINDIR/md5sum.txt 2> /dev/null

@ -64,10 +64,10 @@ type Node struct {
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer IDCPeer p2p.Peer
SyncNode bool // TODO(minhdoan): Remove it later. SyncNode bool // TODO(minhdoan): Remove it later.
chain *core.BlockChain // Account Model chain *core.BlockChain // Account Model
Neighbors map[string]*p2p.Peer // All the neighbor nodes, key is the sha256 of Peer IP/Port Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State NodeState // State of the Node State NodeState // State of the Node
// Account Model // Account Model
Chain *core.BlockChain Chain *core.BlockChain
@ -245,7 +245,6 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
} }
// Logger // Logger
node.log = log.New() node.log = log.New()
node.Neighbors = make(map[string]*p2p.Peer)
node.State = INIT node.State = INIT
return &node return &node
@ -256,15 +255,14 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
count := 0 count := 0
for _, p := range peers { for _, p := range peers {
key := fmt.Sprintf("%v", p.PubKey) key := fmt.Sprintf("%v", p.PubKey)
_, ok := node.Neighbors[key] _, ok := node.Neighbors.Load(key)
if !ok { if !ok {
np := new(p2p.Peer) np := new(p2p.Peer)
copier.Copy(np, &p) copier.Copy(np, &p)
node.Neighbors[key] = np node.Neighbors.Store(key, *np)
count++ count++
} }
} }
node.log.Info("Added", "# of peers", count)
if count > 0 { if count > 0 {
c := node.Consensus.AddPeers(peers) c := node.Consensus.AddPeers(peers)
@ -275,7 +273,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
func (node *Node) JoinShard(leader p2p.Peer) { func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, with 10 minutes time-out // try to join the shard, with 10 minutes time-out
backoff := p2p.NewExpBackoff(500*time.Millisecond, 10*time.Minute, 2) backoff := p2p.NewExpBackoff(1*time.Second, 10*time.Minute, 2)
for node.State == WAIT { for node.State == WAIT {
backoff.Sleep() backoff.Sleep()
@ -285,5 +283,4 @@ func (node *Node) JoinShard(leader p2p.Peer) {
p2p.SendMessage(leader, buffer) p2p.SendMessage(leader, buffer)
node.log.Debug("Sent ping message") node.log.Debug("Sent ping message")
} }
} }

@ -528,9 +528,16 @@ func (node *Node) pingMessageHandler(msgPayload []byte) {
// Send a Pong message back // Send a Pong message back
peers := make([]p2p.Peer, 0) peers := make([]p2p.Peer, 0)
for _, v := range node.Neighbors { count := 0
peers = append(peers, *v) node.Neighbors.Range(func(k, v interface{}) bool {
} if p, ok := v.(p2p.Peer); ok {
peers = append(peers, p)
count++
return true
} else {
return false
}
})
pong := proto_node.NewPongMessage(peers) pong := proto_node.NewPongMessage(peers)
buffer := pong.ConstructPongMessage() buffer := pong.ConstructPongMessage()
@ -547,7 +554,8 @@ func (node *Node) pongMessageHandler(msgPayload []byte) {
node.log.Error("Can't get Pong Message") node.log.Error("Can't get Pong Message")
return return
} }
node.log.Info("Pong", "Msg", pong) // node.log.Info("Pong", "Msg", pong)
node.State = JOIN
peers := make([]p2p.Peer, 0) peers := make([]p2p.Peer, 0)

Loading…
Cancel
Save