Merge branch 'master' into attack_model

pull/27/head
Minh Doan 6 years ago
commit 57b6fcc863
  1. 8
      aws-code/create_client_instances.sh
  2. 8
      aws-code/create_instances.sh
  3. 84
      aws-code/create_ondemand_instances.py
  4. 13
      aws-code/get_availability_zones.sh
  5. 0
      aws-experiment-launch/configuration.txt
  6. 200
      aws-experiment-launch/create_and_deploy.py
  7. 0
      aws-experiment-launch/create_node_instances.sh
  8. 0
      aws-experiment-launch/dial/main.go
  9. 15
      aws-experiment-launch/experiment/commander/main.go
  10. 87
      aws-experiment-launch/experiment/soldier/main.go
  11. 0
      aws-experiment-launch/listen/main.go
  12. 0
      aws-experiment-launch/loghost/main.go
  13. 72
      aws-experiment-launch/spot-instance/create_launch_specs.py
  14. 6
      aws-experiment-launch/spot-instance/request-spot.sh
  15. 5
      aws-experiment-launch/spot-instance/request-spot2.sh
  16. 8
      aws-experiment-launch/spot-instance/userdata.sh
  17. 0
      aws-experiment-launch/user-data.sh
  18. 7
      aws-scripts/spot-instance/userdata.sh
  19. 4
      blockchain/block.go
  20. 13
      blockchain/transaction.go
  21. 4
      blockchain/utxopool.go
  22. 93
      client/client.go
  23. 67
      consensus/consensus.go
  24. 31
      consensus/consensus_leader.go
  25. 31
      consensus/consensus_state.go
  26. 4
      consensus/consensus_test.go
  27. 4
      consensus/consensus_validator.go
  28. 7
      consensus/message.go
  29. 5
      utils/metrics.go

@ -1,8 +0,0 @@
# security_group_name=$1
# security_group_description=$2
# instanceName=$3
aws ec2 create-security-group --group-name mcDG --description "mcdg"
aws ec2 authorize-security-group-ingress --group-name MySecurityGroup --protocol tcp --port all --cidr 0.0.0.0/0
aws ec2 run-instances --image-id ami-a9d09ed1 --count 5 --instance-type t2.micro --key-name main --security-group-ids mcDG \
--user-data user-data.sh --iam-instance-profile Name=CodeDeployDemo-EC2-Instance-Profile --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=Client}]'

@ -1,8 +0,0 @@
# security_group_name=$1
# security_group_description=$2
# instanceName=$3
aws ec2 create-security-group --group-name mcDG --description "mcdg"
aws ec2 authorize-security-group-ingress --group-name MySecurityGroup --protocol tcp --port all --cidr 0.0.0.0/0
aws ec2 run-instances --image-id ami-a9d09ed1 --count 5 --instance-type t2.micro --key-name main --security-group-ids mcDG \
--user-data user-data.sh --iam-instance-profile Name=CodeDeployDemo-EC2-Instance-Profile --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=Client}]'

@ -1,84 +0,0 @@
import os
import argparse
import json
import time
import datetime
REGION_NAME = 'region_name'
REGION_KEY = 'region_key'
REGION_SECURITY_GROUP = 'region_security_group'
REGION_HUMAN_NAME = 'region_human_name'
INSTANCE_TYPE = 't2.micro'
REGION_AMI = 'region_ami'
USER_DATA = 'user-data.sh'
IAM_INSTANCE_PROFILE = 'BenchMarkCodeDeployInstanceProfile'
def read_configuration_file(filename):
config = {}
with open(filename,'r') as f:
for myline in f:
mylist = myline.strip().split(',')
region_num = mylist[0]
config[region_num] = {}
config[region_num][REGION_NAME] = mylist[1]
config[region_num][REGION_KEY] = mylist[2]
config[region_num][REGION_SECURITY_GROUP] = mylist[3]
config[region_num][REGION_HUMAN_NAME] = mylist[4]
config[region_num][REGION_AMI] = mylist[5]
return config
def region_variant(region_name):
return region_name + "a"
def create_custom_json(config,num_instances,region_num,current_session):
print(num_instances)
input_cli = {}
input_cli['MinCount'] = num_instances
input_cli['MaxCount'] = num_instances
input_cli['ImageId'] = config[region_num][REGION_AMI]
input_cli['Placement'] = {}
input_cli['Placement']['AvailabilityZone'] = region_variant(config[region_num][REGION_NAME])
input_cli['SecurityGroups'] = []
input_cli['SecurityGroups'].append(config[region_num][REGION_SECURITY_GROUP])
input_cli['IamInstanceProfile'] = {}
input_cli['IamInstanceProfile']['Name'] = IAM_INSTANCE_PROFILE
input_cli['KeyName'] = config[region_num][REGION_KEY]
#input_cli['KeyName'] = "main"
input_cli['UserData'] = USER_DATA
input_cli['InstanceType'] = INSTANCE_TYPE
input_cli['TagSpecifications'] = []
input_cli['TagSpecifications'].append({"ResourceType": "instance","Tags":[{"Key":"Name","Value":"Node"}]})
my_dir = "input_jsons/" + "session-"+ current_session
if not os.path.exists(my_dir):
os.makedirs(my_dir)
#cli_input_file = os.path.join(my_dir,config[region_num][REGION_HUMAN_NAME]+".json")
cli_input_file = "local.json"
with open(cli_input_file,'w') as g:
json.dump(input_cli,g)
print("INPUT CLI JSON FILE: %s" % cli_input_file)
return cli_input_file
def create_instances(config,region_list,instances_list,current_session):
for i in range(len(region_list)):
region_num = region_list[i]
num_instances = int(instances_list[i])
cli_input_file = create_custom_json(config,num_instances,region_num,current_session)
cmd_str = "aws ec2 --region " + config[region_num][REGION_NAME] + " run-instances " + " --cli-input-json " + str(cli_input_file)
print(cmd_str)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='This script helps you start instances across multiple regions')
parser.add_argument('--regions',type=str,dest='regions',default='3',help="Supply a csv list of all regions")
parser.add_argument('--instances', type=str,dest='numInstances',default=1,help='number of instances')
parser.add_argument('--configuration',type=str,dest='config',default='configuration.txt')
args = parser.parse_args()
config = read_configuration_file(args.config)
region_list = args.regions.split(',')
instances_list = args.numInstances.split(',')
assert len(region_list) == len(instances_list),"number of regions: %d != number of instances per region: %d" % (len(region_list),len(intances_list))
time_stamp = time.time()
current_session = datetime.datetime.fromtimestamp(time_stamp).strftime('%H-%M-%S-%Y-%m-%d')
print("current session is %s" % current_session)
create_instances(config,region_list,instances_list,current_session)

@ -1,13 +0,0 @@
echo "Getting list of Availability Zones"
all_regions=$(aws ec2 describe-regions --output text --query 'Regions[*].[RegionName]' | sort)
all_az=()
echo $all_regions
while read -r region; do
az_per_region=$(aws ec2 describe-availability-zones --region $region --query 'AvailabilityZones[*].[ZoneName]' --output text | sort)
echo $region $az_per_region
while read -r az; do
all_az+=($az)
done <<< "$az_per_region"
done <<< "$all_regions"
echo $all_az

@ -0,0 +1,200 @@
import boto3
import argparse
import sys
import json
import time
import datetime
REGION_NAME = 'region_name'
REGION_KEY = 'region_key'
REGION_SECURITY_GROUP = 'region_security_group'
REGION_HUMAN_NAME = 'region_human_name'
INSTANCE_TYPE = 't2.micro'
REGION_AMI = 'region_ami'
USER_DATA = 'user-data.sh'
IAM_INSTANCE_PROFILE = 'BenchMarkCodeDeployInstanceProfile'
REPO = "simple-rules/harmony-benchmark"
APPLICATION_NAME = 'benchmark-experiments'
def run_one_region(config,region_number,number_of_instances,commitId):
region_name = config[region_number][REGION_NAME]
session = boto3.Session(region_name=region_name)
# ec2_client = session.client('ec2')
# response = create_instances(config,ec2_client,region_number,int(number_of_instances))
codedeploy = session.client('codedeploy')
#commitId = get_commitId(commitId)
application_name = APPLICATION_NAME
deployment_group = APPLICATION_NAME + str(commitId)
repo = REPO
response = get_application(codedeploy,application_name)
response = get_deployment_group(codedeploy,application_name,deployment_group)
print(response)
# deploy(codedeploy, application_name, deployment_group, repo, commitId, wait=True)
# return response
def get_availability_zones(ec2_client):
response = ec2_client.describe_availability_zones()
all_zones = []
if response.get('AvailabilityZones',None):
region_info = response.get('AvailabilityZones')
for info in region_info:
if info['State'] == 'available':
all_zones.append(info['ZoneName'])
return all_zones
def get_one_availability_zone(ec2_client):
all_zones = get_availability_zones(ec2_client)
if len(all_zones) > 0:
return all_zones[0]
else:
print("No availability zone for this region")
sys.exit()
def create_instances(config,ec2_client,region_number,number_of_instances):
response = ec2_client.run_instances(
MinCount = number_of_instances,
MaxCount = number_of_instances,
ImageId = config[region_number][REGION_AMI],
Placement = {
'AvailabilityZone': get_one_availability_zone(ec2_client)
},
SecurityGroups = [config[region_number][REGION_SECURITY_GROUP]],
IamInstanceProfile = {
'Name' : IAM_INSTANCE_PROFILE
},
KeyName = config[region_number][REGION_KEY],
UserData = USER_DATA,
InstanceType = INSTANCE_TYPE,
TagSpecifications = [
{
'ResourceType' : 'instance',
'Tags': [
{
'Key': 'Name',
'Value': 'Node'
},
]
},
]
)
return response
def get_deployment_group(codedeploy,application_name,deployment_group):
response = codedeploy.create_deployment_group(
applicationName = application_name,
deploymentGroupName = deployment_group,
deploymentConfigName='CodeDeployDefault.AllAtAOnce',
serviceRoleArn = 'arn:aws:iam::656503231766:role/BenchMarkCodeDeployServiceRole',
deploymentStyle={
'deploymentType': 'IN_PLACE'
},
ec2TagSet={
'ec2TagSetList': [
[
{
'Key': 'Name',
'Value': 'Node',
'Type': 'KEY_AND_VALUE'
},
],
]
}
)
return response
def get_commitId(commitId):
if commitId is None:
commitId = run("git rev-list --max-count=1 HEAD",
hide=True).stdout.strip()
print("Got newest commitId as " + commitId)
return commitId
def get_application(codedeploy,application_name):
response = codedeploy.list_applications()
if application_name in response['applications']:
return response
else:
response = codedeploy.create_application(
applicationName= application_name,
computePlatform='Server'
)
return response
def deploy(codedeploy, application_name,deployment_group,repo, commitId, wait=True):
"""Deploy new code at specified revision to instance.
arguments:
- repo: GitHub repository path from which to get the code
- commitId: commit ID to be deployed
- wait: wait until the CodeDeploy finishes
"""
print("Launching CodeDeploy with commit " + commitId)
res = codedeploy.create_deployment(
applicationName = application_name,
deploymentGroupName = deployment_group,
deploymentConfigName = 'CodeDeployDefault.AllAtAOnce',
description = 'benchmark experiments',
revision = {
'revisionType': 'GitHub',
'gitHubLocation': {
'repository': repo,
'commitId': commitId,
}
}
)
depId = res["deploymentId"]
print("Deployment ID: " + depId)
# The deployment is launched at this point, so exit unless asked to wait
# until it finishes
if not wait:
return
# This should use a boto3 waiter instead, but that hasn't been
# implemented yet: https://github.com/boto/boto3/issues/708
# So instead we check the status every few seconds manually
info = {'status': 'Created'}
start = time.time()
while info['status'] not in ('Succeeded', 'Failed', 'Stopped',) and (time.time() - start < 300.0):
info = codedeploy.get_deployment(deploymentId=depId)['deploymentInfo']
print(info)
print(info['status'])
time.sleep(15)
if info['status'] == 'Succeeded':
print("\nDeploy Succeeded")
else:
print("\nDeploy Failed")
print(info)
def read_configuration_file(filename):
config = {}
with open(filename,'r') as f:
for myline in f:
mylist = myline.strip().split(',')
region_num = mylist[0]
config[region_num] = {}
config[region_num][REGION_NAME] = mylist[1]
config[region_num][REGION_KEY] = mylist[2]
config[region_num][REGION_SECURITY_GROUP] = mylist[3]
config[region_num][REGION_HUMAN_NAME] = mylist[4]
config[region_num][REGION_AMI] = mylist[5]
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='This script helps you start instances across multiple regions')
parser.add_argument('--regions',type=str,dest='regions',default='3',help="Supply a csv list of all regions")
parser.add_argument('--instances', type=str,dest='numInstances',default='1',help='number of instances')
parser.add_argument('--configuration',type=str,dest='config',default='configuration.txt')
parser.add_argument('--commitId',type=str,dest='commitId',default='1f7e6e7ca7cf1c1190cedec10e791c01a29971cf')
args = parser.parse_args()
config = read_configuration_file(args.config)
region_list = args.regions.split(',')
instances_list = args.numInstances.split(',')
assert len(region_list) == len(instances_list),"number of regions: %d != number of instances per region: %d" % (len(region_list),len(intances_list))
time_stamp = time.time()
current_session = datetime.datetime.fromtimestamp(time_stamp).strftime('%H-%M-%S-%Y-%m-%d')
print("current session is %s" % current_session)
region_number = '1'
number_of_instances = '2'
run_one_region(config,region_number,number_of_instances,args.commitId)

@ -8,7 +8,7 @@ import (
)
const (
message = "Ping"
message = "init http://localhost:8080/configuration.txt"
StopCharacter = "\r\n\r\n"
)
@ -23,22 +23,23 @@ func SocketClient(ip string, port int) {
}
conn.Write([]byte(message))
conn.Write([]byte(StopCharacter))
// conn.Write([]byte(StopCharacter))
log.Printf("Send: %s", message)
buff := make([]byte, 1024)
n, _ := conn.Read(buff)
log.Printf("Receive: %s", buff[:n])
log.Printf("Receive from %v: %s", port, buff[:n])
}
func main() {
var (
ip = "127.0.0.1"
port = 3333
ip = "127.0.0.1"
portList = []int{3333, 4444}
)
SocketClient(ip, port)
for _, port := range portList {
SocketClient(ip, port)
}
}

@ -2,7 +2,9 @@ package main
import (
"bufio"
"flag"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -12,12 +14,14 @@ import (
)
const (
Message = "Pong"
StopCharacter = "\r\n\r\n"
)
func SocketServer(port int) {
var (
port *int
)
func SocketServer(port int) {
listen, err := net.Listen("tcp4", ":"+strconv.Itoa(port))
defer listen.Close()
if err != nil {
@ -34,11 +38,9 @@ func SocketServer(port int) {
}
go handler(conn)
}
}
func handler(conn net.Conn) {
defer conn.Close()
var (
@ -57,52 +59,72 @@ ILOOP:
break ILOOP
case nil:
log.Println("Receive:", data)
go handleCommand(data)
if isTransportOver(data) {
log.Println("Tranport Over!")
break ILOOP
}
go handleCommand(data, w)
default:
log.Fatalf("Receive data failed:%s", err)
return
}
}
w.Write([]byte(Message))
w.Flush()
log.Printf("Send: %s", Message)
}
func handleCommand(command string) {
// assume this is init command
handleInitCommand(command)
func handleCommand(command string, w *bufio.Writer) {
args := strings.Split(command, " ")
if len(args) <= 0 {
return
}
switch command := args[0]; command {
case "init":
{
handleInitCommand(args[1:], w)
}
case "close":
{
log.Println("close command")
}
}
}
func handleInitCommand(command string) {
log.Println("Init command")
out, err := os.Create("config_copy.txt")
func handleInitCommand(args []string, w *bufio.Writer) {
log.Println("Init command", args)
// create local config file
localConfig := "node_config_" + strconv.Itoa(*port) + ".txt"
out, err := os.Create(localConfig)
if err != nil {
panic("Failed to create local file")
log.Fatal("Failed to create local file")
}
log.Println("Created local file")
defer out.Close()
resp, err := http.Get("http://localhost/config.txt")
// get remote config file
configURL := args[0]
resp, err := http.Get(configURL)
if err != nil {
log.Println("Failed to read file content")
panic("Failed to read file content")
log.Fatal("Failed to read file content")
}
log.Println("Read file content")
log.Println(resp)
log.Println(resp.Body)
n, err := io.Copy(out, resp.Body)
defer resp.Body.Close()
// copy remote to local
_, err = io.Copy(out, resp.Body)
if err != nil {
panic("Failed to copy file")
log.Fatal("Failed to copy file")
}
log.Println("copy done")
log.Println(resp.Body)
defer resp.Body.Close()
log.Println(n)
content, err := ioutil.ReadFile(localConfig)
if err != nil {
log.Fatal(err)
}
log.Println("Successfully init-ed with config", content)
w.Write([]byte("Successfully init-ed"))
w.Flush()
}
func isTransportOver(data string) (over bool) {
@ -111,9 +133,8 @@ func isTransportOver(data string) (over bool) {
}
func main() {
port = flag.Int("port", 3333, "port of the node.")
flag.Parse()
port := 3333
SocketServer(port)
SocketServer(*port)
}

@ -0,0 +1,72 @@
import os
import argparse
import json
import time
import datetime
import base64
REGION_NAME = 'region_name'
REGION_KEY = 'region_key'
REGION_SECURITY_GROUP = 'region_security_group'
REGION_HUMAN_NAME = 'region_human_name'
INSTANCE_TYPE = 'm3.medium' # 't2.micro'
AMI = 'ami-f2d3638a' # 'ami-a9d09ed1'
# UserData must be base64 encoded.
with open("userdata.sh", "rb") as userdata_file:
USER_DATA = base64.b64encode(userdata_file.read())
IAM_INSTANCE_PROFILE = 'BenchMarkCodeDeployInstanceProfile'
def read_configuration_file(filename):
config = {}
with open(filename,'r') as f:
for line in f:
vals = line.strip().split(',')
region_num = vals[0]
config[region_num] = {}
config[region_num][REGION_NAME] = vals[1]
config[region_num][REGION_KEY] = vals[2]
config[region_num][REGION_SECURITY_GROUP] = vals[3]
config[region_num][REGION_HUMAN_NAME] = vals[4]
return config
def create_launch_specification(region_num):
input_cli = {}
input_cli['ImageId'] = AMI
# input_cli['Placement'] = {
# "AvailabilityZone": config[region_num][REGION_NAME] +"a"
# }
input_cli['SecurityGroups'] = [ "richard-spot-instance SSH" ] # [ config[region_num][REGION_SECURITY_GROUP] ]
input_cli['IamInstanceProfile'] = {
"Name": IAM_INSTANCE_PROFILE
}
input_cli['KeyName'] = "richard-spot-instance" # config[region_num][REGION_KEY]
input_cli['UserData'] = USER_DATA
input_cli['InstanceType'] = INSTANCE_TYPE
# folder = "launch_specs/" + "session-"+ current_session
folder = "launch_specs/latest"
if not os.path.exists(folder):
os.makedirs(folder)
launch_spec_file = os.path.join(folder,config[region_num][REGION_HUMAN_NAME]+".json")
with open(launch_spec_file,'w') as g:
json.dump(input_cli,g)
print("Launch spec: %s" % launch_spec_file)
return launch_spec_file
def create_instances(region_list):
for i in range(len(region_list)):
region_num = region_list[i]
launch_spec_file = create_launch_specification(region_num)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='This script helps you start instances across multiple regions')
parser.add_argument('--regions',type=str,dest='regions',default='3',help="a comma-separated-value list of all regions")
# configuration file contains the number/name/security-group etc. information of each region.
parser.add_argument('--config',type=str,dest='config',default='configuration.txt')
args = parser.parse_args()
config = read_configuration_file(args.config)
region_list = args.regions.split(',')
time_stamp = time.time()
current_session = datetime.datetime.fromtimestamp(time_stamp).strftime('%Y-%m-%d-%H-%M-%S')
print("current session is %s" % current_session)
create_instances(region_list)

@ -9,8 +9,10 @@ aws ec2 request-spot-instances \
], \
\"KeyName\": \"richard-spot-instance\", \
\"IamInstanceProfile\": { \
\"Name\": \"RichardCodeDeployInstanceRole\" \
\"Name\": \"BenchMarkCodeDeployInstanceProfile\" \
}, \
\"UserData\": \"`base64 -w 0 userdata.sh`\" \
\"UserData\": \"`base64 userdata.sh`\" \
}" \
--dry-run # uncomment this line to send a real request.
# Note: on windows, UserData should be "`base64 -w 0 userdata.sh`"

@ -0,0 +1,5 @@
aws ec2 request-spot-instances \
--instance-count 1 \
--block-duration-minutes 60 \
--launch-specification file://launch_specs/latest/ohio.json
# --dry-run # uncomment this line to send a real request.

@ -0,0 +1,8 @@
#!/bin/bash
REGION=$(curl 169.254.169.254/latest/meta-data/placement/availability-zone/ | sed 's/[a-z]$//')
yum -y update
yum install -y ruby
cd /home/ec2-user
curl -O https://aws-codedeploy-$REGION.s3.amazonaws.com/latest/install
chmod +x ./install
./install auto

@ -1,7 +0,0 @@
#!/bin/bash
yum -y update
yum install -y ruby
cd /home/ec2-user
curl -O https://aws-codedeploy-us-west-2.s3.amazonaws.com/latest/install
chmod +x ./install
./install auto

@ -10,7 +10,7 @@ import (
"time"
)
// Block keeps block headers, transactions and signature.
// A block in the blockchain that contains block headers, transactions and signature etc.
type Block struct {
// Header
Timestamp int64
@ -87,7 +87,7 @@ func (b *Block) CalculateBlockHash() []byte {
return blockHash[:]
}
// NewBlock creates and returns a neew block.
// NewBlock creates and returns a new block.
func NewBlock(transactions []*Transaction, prevBlockHash [32]byte, shardId uint32) *Block {
numTxs := int32(len(transactions))
var txIds [][32]byte

@ -33,11 +33,14 @@ type TXInput struct {
ShardId uint32 // The Id of the shard where this UTXO belongs
}
// The proof of accept or reject in the cross shard transaction locking phase.
// This is created by the shard leader, filled with proof signatures after consensus, and returned back to the client.
// One proof structure is only tied to one shard. Therefore, the utxos in the proof are all with the same shard.
type CrossShardTxProof struct {
RejectOrAccept bool // false means rejection, true means acceptance
TxID [32]byte // Id of transaction whose utxo is related to this proof
TxInput []TXInput // The list of Utxo that this proof is referring to. They should be in the same shard.
BlockHash [32]byte // The hash of the block where the proof is registered
Accept bool // false means proof-of-reject, true means proof-of-accept
TxID [32]byte // Id of the transaction which this proof is on
TxInput []TXInput // The list of Utxo that this proof is on. They should be in the same shard.
BlockHash [32]byte // The hash of the block where the proof is registered
// Signatures
}
@ -95,7 +98,7 @@ func (txOutput *TXOutput) String() string {
// Used for debuging.
func (proof *CrossShardTxProof) String() string {
res := fmt.Sprintf("RejectOrAccept: %v, ", proof.RejectOrAccept)
res := fmt.Sprintf("Accept: %v, ", proof.Accept)
return res
}

@ -140,7 +140,7 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
unlockToCommit := true
if isUnlockTx {
for _, proof := range tx.Proofs {
if !proof.RejectOrAccept {
if !proof.Accept {
unlockToCommit = false // if any proof is a rejection, they it's a unlock-to-abort tx. Otherwise, it's unlock-to-commit
}
}
@ -303,7 +303,7 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact
if valid || crossShard {
selected = append(selected, tx)
if crossShard {
proof := CrossShardTxProof{RejectOrAccept: valid, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardId)}
proof := CrossShardTxProof{Accept: valid, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardId)}
txAndProof := CrossShardTxAndProof{tx, &proof}
crossShardTxs = append(crossShardTxs, &txAndProof)
}

@ -9,7 +9,7 @@ import (
"sync"
)
// A client represent a node (e.g. wallet) which sends transactions and receive responses from the harmony network
// A client represents a node (e.g. a wallet) which sends transactions and receives responses from the harmony network
type Client struct {
PendingCrossTxs map[[32]byte]*blockchain.Transaction // Map of TxId to pending cross shard txs. Pending means the proof-of-accept/rejects are not complete
PendingCrossTxsMutex sync.Mutex // Mutex for the pending txs list
@ -26,59 +26,66 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
case PROOF_OF_LOCK:
// Decode the list of blockchain.CrossShardTxProof
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the PROOF_OF_LOCK messge type
proofList := new([]blockchain.CrossShardTxProof)
err := txDecoder.Decode(&proofList)
proofs := new([]blockchain.CrossShardTxProof)
err := txDecoder.Decode(proofs)
if err != nil {
client.log.Error("Failed deserializing cross transaction proof list")
}
client.handleProofOfLockMessage(proofs)
}
}
txsToSend := []blockchain.Transaction{}
// Loop through the newly received list of proofs
client.PendingCrossTxsMutex.Lock()
for _, proof := range *proofList {
// Find the corresponding pending cross tx
txAndProofs, ok := client.PendingCrossTxs[proof.TxID]
readyToUnlock := true // A flag used to mark whether whether this pending cross tx have all the proofs for its utxo input
if ok {
// Add the new proof to the cross tx's proof list
txAndProofs.Proofs = append(txAndProofs.Proofs, proof)
// Check whether this pending cross tx have all the proofs for its utxo input
txInputs := make(map[blockchain.TXInput]bool)
for _, curProof := range txAndProofs.Proofs {
for _, txInput := range curProof.TxInput {
txInputs[txInput] = true
}
}
for _, txInput := range txAndProofs.TxInput {
val, ok := txInputs[txInput]
if !ok || !val {
readyToUnlock = false
}
// Client once receives a list of proofs from a leader, for each proof:
// 1) retreive the pending cross shard transaction
// 2) add the proof to the transaction
// 3) checks whether all input utxos of the transaction have a corresponding proof.
// 4) for all transactions with full proofs, broadcast them back to the leaders
func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) {
txsToSend := []blockchain.Transaction{}
// Loop through the newly received list of proofs
client.PendingCrossTxsMutex.Lock()
for _, proof := range *proofs {
// Find the corresponding pending cross tx
txAndProofs, ok := client.PendingCrossTxs[proof.TxID]
readyToUnlock := true // A flag used to mark whether whether this pending cross tx have all the proofs for its utxo input
if ok {
// Add the new proof to the cross tx's proof list
txAndProofs.Proofs = append(txAndProofs.Proofs, proof)
// Check whether this pending cross tx have all the proofs for its utxo inputs
txInputs := make(map[blockchain.TXInput]bool)
for _, curProof := range txAndProofs.Proofs {
for _, txInput := range curProof.TxInput {
txInputs[txInput] = true
}
} else {
readyToUnlock = false
}
if readyToUnlock {
txsToSend = append(txsToSend, *txAndProofs)
for _, txInput := range txAndProofs.TxInput {
val, ok := txInputs[txInput]
if !ok || !val {
readyToUnlock = false
}
}
} else {
readyToUnlock = false
}
// Delete all the transactions with full proofs from the pending cross txs
for _, txToSend := range txsToSend {
delete(client.PendingCrossTxs, txToSend.ID)
if readyToUnlock {
txsToSend = append(txsToSend, *txAndProofs)
}
client.PendingCrossTxsMutex.Unlock()
}
// Broadcast the cross txs with full proofs for unlock-to-commit/abort
if len(txsToSend) != 0 {
client.broadcastCrossShardTxUnlockMessage(&txsToSend)
}
// Delete all the transactions with full proofs from the pending cross txs
for _, txToSend := range txsToSend {
delete(client.PendingCrossTxs, txToSend.ID)
}
client.PendingCrossTxsMutex.Unlock()
// Broadcast the cross txs with full proofs for unlock-to-commit/abort
if len(txsToSend) != 0 {
client.broadcastCrossShardTxUnlockMessage(&txsToSend)
}
}
@ -86,7 +93,7 @@ func (client *Client) broadcastCrossShardTxUnlockMessage(txsToSend *[]blockchain
p2p.BroadcastMessage(*client.leaders, ConstructUnlockToCommitOrAbortMessage(*txsToSend))
}
// Create a new Cient
// Create a new Client
func NewClient(leaders *[]p2p.Peer) *Client {
client := Client{}
client.PendingCrossTxs = make(map[[32]byte]*blockchain.Transaction)

@ -12,7 +12,7 @@ import (
"sync"
)
// Consensus data containing all info related to one consensus process
// Consensus data containing all info related to one round of consensus process
type Consensus struct {
state ConsensusState
// Signatures collected from validators
@ -55,7 +55,7 @@ type Consensus struct {
//// Network related fields
msgCategory byte
actionType byte
msgType byte
Log log.Logger
}
@ -67,79 +67,50 @@ type Consensus struct {
// should be stored in this temporary structure. In case the round N-1 finishes, it can catch
// up to the latest state of round N by using this structure.
type BlockConsensusStatus struct {
// BlockHeader to run consensus on
blockHeader []byte
state ConsensusState
}
// Consensus state enum for both leader and validator
// States for leader:
// FINISHED, ANNOUNCE_DONE, CHALLENGE_DONE
// States for validator:
// FINISHED, COMMIT_DONE, RESPONSE_DONE
type ConsensusState int
const (
FINISHED ConsensusState = iota // initial state or state after previous consensus is done.
ANNOUNCE_DONE
COMMIT_DONE
CHALLENGE_DONE
RESPONSE_DONE
)
// Returns string name for the ConsensusState enum
func (state ConsensusState) String() string {
names := [...]string{
"FINISHED",
"ANNOUNCE_DONE",
"COMMIT_DONE",
"CHALLENGE_DONE",
"RESPONSE_DONE"}
if state < FINISHED || state > RESPONSE_DONE {
return "Unknown"
}
return names[state]
blockHeader []byte // the block header of the block which the consensus is running on
state ConsensusState // the latest state of the consensus
}
// NewConsensus creates a new Consensus object
// TODO(minhdoan): Maybe convert it into just New
// FYI, see https://golang.org/doc/effective_go.html?#package-names
func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) Consensus {
// The first Ip, port passed will be leader.
consensus := Consensus{}
peer := p2p.Peer{Port: port, Ip: ip}
Peers := peers
leaderPeer := leader
if leaderPeer == peer {
selfPeer := p2p.Peer{Port: port, Ip: ip}
if leaderPeer == selfPeer {
consensus.IsLeader = true
} else {
consensus.IsLeader = false
}
consensus.commits = make(map[string]string)
consensus.responses = make(map[string]string)
consensus.leader = leaderPeer
consensus.validators = Peers
consensus.priKey = ip + ":" + port // use ip:port as unique key for now
reg, err := regexp.Compile("[^0-9]+")
if err != nil {
consensus.Log.Crit("Regex Compilation Failed", "err", err, "consensus", consensus)
}
consensus.consensusId = 0
consensus.consensusId = 0 // or view Id in the original pbft paper
myShardID, err := strconv.Atoi(ShardID)
if err != nil {
panic("Unparseable shard Id" + ShardID)
}
consensus.ShardID = uint32(myShardID)
// For validators
// For validators to keep track of all blocks received but not yet committed, so as to catch up to latest consensus if lagged behind.
consensus.blocksReceived = make(map[uint32]*BlockConsensusStatus)
// For now use socket address as 16 byte Id
// TODO: populate with correct Id
reg, err := regexp.Compile("[^0-9]+")
if err != nil {
consensus.Log.Crit("Regex Compilation Failed", "err", err, "consensus", consensus)
}
socketId := reg.ReplaceAllString(consensus.priKey, "")
value, err := strconv.Atoi(socketId)
consensus.nodeId = uint16(value)
@ -147,14 +118,16 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) C
if consensus.IsLeader {
consensus.ReadySignal = make(chan int)
// send a signal to indicate it's ready to run consensus
// this signal is consumed by node object to create a new block and in turn trigger a new consensus on it
// this is a goroutine because go channel without buffer will block
go func() {
consensus.ReadySignal <- 1
}()
}
// The message category and type used for any messages sent for consensus
consensus.msgCategory = byte(common.COMMITTEE)
consensus.actionType = byte(CONSENSUS)
consensus.msgType = byte(CONSENSUS)
consensus.Log = log.New()
return consensus
@ -167,7 +140,7 @@ func (consensus *Consensus) ResetState() {
consensus.responses = make(map[string]string)
}
// Returns ID of this consensus
// Returns a string representation of this consensus
func (consensus *Consensus) String() string {
var duty string
if consensus.IsLeader {

@ -7,6 +7,7 @@ import (
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/p2p"
"harmony-benchmark/utils"
"runtime"
"strings"
"time"
@ -16,13 +17,14 @@ var (
startTime time.Time
)
// WaitForNewBlock waits for a new block.
// Waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus)
for { // keep waiting for new blocks
newBlock := <-blockChannel
// TODO: think about potential race condition
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus)
startTime = time.Now()
consensus.Log.Info("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
for consensus.state == FINISHED {
time.Sleep(500 * time.Millisecond)
consensus.startConsensus(&newBlock)
@ -31,7 +33,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
}
}
// ProcessMessageLeader is the leader's consensus message dispatcher
// Consensus message dispatcher for the leader
func (consensus *Consensus) ProcessMessageLeader(message []byte) {
msgType, err := GetConsensusMessageType(message)
if err != nil {
@ -61,11 +63,11 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
// Handler for message which triggers consensus process
func (consensus *Consensus) processStartConsensusMessage(payload []byte) {
startTime = time.Now()
tx := blockchain.NewCoinbaseTX("x", "y", 0)
consensus.startConsensus(blockchain.NewGenesisBlock(tx, 0))
}
// Starts a new consensus for a block by broadcast a announce message to the validators
func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
// Copy over block hash and block header data
copy(consensus.blockHash[:], newBlock.Hash[:])
@ -82,7 +84,7 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.state = ANNOUNCE_DONE
}
// Construct the announce message to send to validators
// Constructs the announce message
func (consensus *Consensus) constructAnnounceMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
@ -120,6 +122,7 @@ func signMessage(message []byte) []byte {
return append(mockSignature[:], mockSignature[:]...)
}
// Processes the commit message sent from validators
func (consensus *Consensus) processCommitMessage(payload []byte) {
//#### Read payload data
offset := 0
@ -185,7 +188,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
}
}
// Construct the challenge message to send to validators
// Construct the challenge message
func (consensus *Consensus) constructChallengeMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
@ -245,6 +248,7 @@ func getChallenge() []byte {
return make([]byte, 32)
}
// Processes the response message sent from validators
func (consensus *Consensus) processResponseMessage(payload []byte) {
//#### Read payload data
offset := 0
@ -325,12 +329,17 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
endTime := time.Now()
timeElapsed := endTime.Sub(startTime)
numOfTxs := blockHeaderObj.NumTransactions
consensus.Log.Info("TPS Report", "numOfTXs", numOfTxs, "timeElapsed", timeElapsed, "TPS", float64(numOfTxs)/timeElapsed.Seconds())
consensus.Log.Info("TPS Report",
"numOfTXs", numOfTxs,
"startTime", startTime,
"endTime", endTime,
"timeElapsed", timeElapsed,
"TPS", float64(numOfTxs)/timeElapsed.Seconds())
var m runtime.MemStats
runtime.ReadMemStats(&m)
consensus.Log.Info("Mem Report", "Alloc", bToMb(m.Alloc), "TotalAlloc", bToMb(m.TotalAlloc),
"Sys", bToMb(m.Sys), "NumGC", m.NumGC)
consensus.Log.Info("Mem Report", "Alloc", utils.BToMb(m.Alloc), "TotalAlloc", utils.BToMb(m.TotalAlloc),
"Sys", utils.BToMb(m.Sys), "NumGC", m.NumGC)
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- 1
@ -338,7 +347,3 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
consensus.mutex.Unlock()
}
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

@ -0,0 +1,31 @@
package consensus
// Consensus state enum for both leader and validator
// States for leader:
// FINISHED, ANNOUNCE_DONE, CHALLENGE_DONE
// States for validator:
// FINISHED, COMMIT_DONE, RESPONSE_DONE
type ConsensusState int
const (
FINISHED ConsensusState = iota // initial state or state after previous consensus is done.
ANNOUNCE_DONE
COMMIT_DONE
CHALLENGE_DONE
RESPONSE_DONE
)
// Returns string name for the ConsensusState enum
func (state ConsensusState) String() string {
names := [...]string{
"FINISHED",
"ANNOUNCE_DONE",
"COMMIT_DONE",
"CHALLENGE_DONE",
"RESPONSE_DONE"}
if state < FINISHED || state > RESPONSE_DONE {
return "Unknown"
}
return names[state]
}

@ -22,8 +22,8 @@ func TestNewConsensus(test *testing.T) {
test.Error("Consensus ReadySignal should be initialized")
}
if consensus.actionType != byte(CONSENSUS) {
test.Error("Consensus actionType should be CONSENSUS")
if consensus.msgType != byte(CONSENSUS) {
test.Error("Consensus msgType should be CONSENSUS")
}
if consensus.msgCategory != byte(common.COMMITTEE) {

@ -36,6 +36,7 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
}
}
// Processes the announce message sent from the leader
func (consensus *Consensus) processAnnounceMessage(payload []byte) {
//#### Read payload data
offset := 0
@ -160,6 +161,7 @@ func getCommitMessage() []byte {
return make([]byte, 33)
}
// Processes the challenge message sent from the leader
func (consensus *Consensus) processChallengeMessage(payload []byte) {
//#### Read payload data
offset := 0
@ -307,7 +309,7 @@ func (consensus *Consensus) constructResponseMessage() []byte {
return consensus.ConstructConsensusMessage(RESPONSE, buffer.Bytes())
}
// TODO: fill in this function
func getResponseMessage() []byte {
// TODO: construct real response
return make([]byte, 32)
}

@ -66,7 +66,8 @@ RESPONSE:
---- message end -----
*/
const MESSAGE_TYPE_BYTES = 1
// the number of bytes consensus action type occupies
const ACTION_TYPE_BYTES = 1
// The specific types of message under COMMITTEE category
type CommitteeMessageType byte
@ -117,13 +118,13 @@ func GetConsensusMessagePayload(message []byte) ([]byte, error) {
if len(message) < 2 {
return []byte{}, errors.New("Failed to get consensus message payload: no data available.")
}
return message[MESSAGE_TYPE_BYTES:], nil
return message[ACTION_TYPE_BYTES:], nil
}
// Concatenate msgType as one byte with payload, and return the whole byte array
func (consensus Consensus) ConstructConsensusMessage(consensusMsgType MessageType, payload []byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{consensus.msgCategory})
byteBuffer.WriteByte(consensus.actionType)
byteBuffer.WriteByte(consensus.msgType)
byteBuffer.WriteByte(byte(consensusMsgType))
byteBuffer.Write(payload)
return byteBuffer.Bytes()

@ -0,0 +1,5 @@
package utils
func BToMb(b uint64) uint64 {
return b / 1024 / 1024
}
Loading…
Cancel
Save