commit
3d71a16b74
@ -0,0 +1 @@ |
||||
protoc -I proto/ proto/client.proto --go_out=plugins=grpc:proto |
@ -0,0 +1,145 @@ |
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// source: drand.proto
|
||||
|
||||
package drand |
||||
|
||||
import ( |
||||
fmt "fmt" |
||||
proto "github.com/golang/protobuf/proto" |
||||
math "math" |
||||
) |
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal |
||||
var _ = fmt.Errorf |
||||
var _ = math.Inf |
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type MessageType int32 |
||||
|
||||
const ( |
||||
MessageType_UNKNOWN MessageType = 0 |
||||
MessageType_INIT MessageType = 1 |
||||
MessageType_COMMIT MessageType = 2 |
||||
) |
||||
|
||||
var MessageType_name = map[int32]string{ |
||||
0: "UNKNOWN", |
||||
1: "INIT", |
||||
2: "COMMIT", |
||||
} |
||||
|
||||
var MessageType_value = map[string]int32{ |
||||
"UNKNOWN": 0, |
||||
"INIT": 1, |
||||
"COMMIT": 2, |
||||
} |
||||
|
||||
func (x MessageType) String() string { |
||||
return proto.EnumName(MessageType_name, int32(x)) |
||||
} |
||||
|
||||
func (MessageType) EnumDescriptor() ([]byte, []int) { |
||||
return fileDescriptor_1d855c36cf2c0c50, []int{0} |
||||
} |
||||
|
||||
type Message struct { |
||||
Type MessageType `protobuf:"varint,1,opt,name=type,proto3,enum=drand.MessageType" json:"type,omitempty"` |
||||
SenderId uint32 `protobuf:"varint,3,opt,name=sender_id,json=senderId,proto3" json:"sender_id,omitempty"` |
||||
BlockHash []byte `protobuf:"bytes,4,opt,name=block_hash,json=blockHash,proto3" json:"block_hash,omitempty"` |
||||
Payload []byte `protobuf:"bytes,5,opt,name=payload,proto3" json:"payload,omitempty"` |
||||
Signature []byte `protobuf:"bytes,6,opt,name=signature,proto3" json:"signature,omitempty"` |
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"` |
||||
XXX_unrecognized []byte `json:"-"` |
||||
XXX_sizecache int32 `json:"-"` |
||||
} |
||||
|
||||
func (m *Message) Reset() { *m = Message{} } |
||||
func (m *Message) String() string { return proto.CompactTextString(m) } |
||||
func (*Message) ProtoMessage() {} |
||||
func (*Message) Descriptor() ([]byte, []int) { |
||||
return fileDescriptor_1d855c36cf2c0c50, []int{0} |
||||
} |
||||
|
||||
func (m *Message) XXX_Unmarshal(b []byte) error { |
||||
return xxx_messageInfo_Message.Unmarshal(m, b) |
||||
} |
||||
func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { |
||||
return xxx_messageInfo_Message.Marshal(b, m, deterministic) |
||||
} |
||||
func (m *Message) XXX_Merge(src proto.Message) { |
||||
xxx_messageInfo_Message.Merge(m, src) |
||||
} |
||||
func (m *Message) XXX_Size() int { |
||||
return xxx_messageInfo_Message.Size(m) |
||||
} |
||||
func (m *Message) XXX_DiscardUnknown() { |
||||
xxx_messageInfo_Message.DiscardUnknown(m) |
||||
} |
||||
|
||||
var xxx_messageInfo_Message proto.InternalMessageInfo |
||||
|
||||
func (m *Message) GetType() MessageType { |
||||
if m != nil { |
||||
return m.Type |
||||
} |
||||
return MessageType_UNKNOWN |
||||
} |
||||
|
||||
func (m *Message) GetSenderId() uint32 { |
||||
if m != nil { |
||||
return m.SenderId |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
func (m *Message) GetBlockHash() []byte { |
||||
if m != nil { |
||||
return m.BlockHash |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Message) GetPayload() []byte { |
||||
if m != nil { |
||||
return m.Payload |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Message) GetSignature() []byte { |
||||
if m != nil { |
||||
return m.Signature |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func init() { |
||||
proto.RegisterEnum("drand.MessageType", MessageType_name, MessageType_value) |
||||
proto.RegisterType((*Message)(nil), "drand.Message") |
||||
} |
||||
|
||||
func init() { proto.RegisterFile("drand.proto", fileDescriptor_1d855c36cf2c0c50) } |
||||
|
||||
var fileDescriptor_1d855c36cf2c0c50 = []byte{ |
||||
// 214 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x8f, 0xcf, 0x4a, 0x87, 0x40, |
||||
0x10, 0x80, 0xdb, 0xf2, 0xe7, 0x9f, 0xb1, 0x42, 0xe6, 0xb4, 0x50, 0x81, 0x74, 0x08, 0xe9, 0x20, |
||||
0x51, 0x8f, 0xd0, 0xa5, 0x25, 0x5c, 0x41, 0x8c, 0x8e, 0xb2, 0xb6, 0x8b, 0x4a, 0xe2, 0x2e, 0xbb, |
||||
0x76, 0xf0, 0x81, 0x7a, 0xcf, 0x60, 0xad, 0xf8, 0x1d, 0xbf, 0xef, 0x9b, 0x81, 0x19, 0x48, 0xa5, |
||||
0x15, 0x8b, 0x2c, 0x8d, 0xd5, 0xab, 0xc6, 0x83, 0x87, 0xdb, 0x6f, 0x02, 0x51, 0xa5, 0x9c, 0x13, |
||||
0x83, 0xc2, 0x3b, 0x08, 0xd6, 0xcd, 0x28, 0x4a, 0x72, 0x52, 0x5c, 0x3e, 0x62, 0xb9, 0x8f, 0xff, |
||||
0xd6, 0x76, 0x33, 0xaa, 0xf1, 0x1d, 0xaf, 0x20, 0x71, 0x6a, 0x91, 0xca, 0x76, 0x93, 0xa4, 0x67, |
||||
0x39, 0x29, 0x2e, 0x9a, 0x78, 0x17, 0x4c, 0xe2, 0x0d, 0x40, 0x3f, 0xeb, 0x8f, 0xcf, 0x6e, 0x14, |
||||
0x6e, 0xa4, 0x41, 0x4e, 0x8a, 0xf3, 0x26, 0xf1, 0xe6, 0x45, 0xb8, 0x11, 0x29, 0x44, 0x46, 0x6c, |
||||
0xb3, 0x16, 0x92, 0x1e, 0x7c, 0xfb, 0x43, 0xbc, 0x86, 0xc4, 0x4d, 0xc3, 0x22, 0xd6, 0x2f, 0xab, |
||||
0x68, 0xb8, 0xef, 0xfd, 0x8b, 0xfb, 0x07, 0x48, 0x8f, 0x0e, 0xc1, 0x14, 0xa2, 0x37, 0xfe, 0xca, |
||||
0xeb, 0x77, 0x9e, 0x9d, 0x60, 0x0c, 0x01, 0xe3, 0xac, 0xcd, 0x08, 0x02, 0x84, 0xcf, 0x75, 0x55, |
||||
0xb1, 0x36, 0x3b, 0xed, 0x43, 0xff, 0xe7, 0xd3, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x79, 0xfa, |
||||
0xf8, 0x57, 0xf6, 0x00, 0x00, 0x00, |
||||
} |
@ -0,0 +1,17 @@ |
||||
syntax = "proto3"; |
||||
|
||||
package drand; |
||||
|
||||
enum MessageType { |
||||
UNKNOWN = 0; |
||||
INIT = 1; |
||||
COMMIT = 2; |
||||
} |
||||
|
||||
message Message { |
||||
MessageType type = 1; |
||||
uint32 sender_id = 3; // TODO: make it public key |
||||
bytes block_hash = 4; |
||||
bytes payload = 5; |
||||
bytes signature = 6; |
||||
} |
@ -0,0 +1,174 @@ |
||||
package core |
||||
|
||||
import ( |
||||
"math/rand" |
||||
"sort" |
||||
"strconv" |
||||
|
||||
"github.com/harmony-one/harmony/core/types" |
||||
) |
||||
|
||||
const ( |
||||
// InitialSeed is the initial random seed, a magic number to answer everything, remove later
|
||||
InitialSeed int64 = 42 |
||||
) |
||||
|
||||
// ShardingState is data structure hold the sharding state
|
||||
type ShardingState struct { |
||||
epoch uint64 // current epoch
|
||||
rnd int64 // random seed for resharding
|
||||
numShards int |
||||
shardState types.ShardState |
||||
} |
||||
|
||||
// sortedCommitteeBySize will sort shards by size
|
||||
// Suppose there are N shards, the first N/2 larger shards are called active committees
|
||||
// the rest N/2 smaller committees are called inactive committees
|
||||
// actually they are all just normal shards
|
||||
// TODO: sort the committee weighted by total staking instead of shard size
|
||||
func (ss *ShardingState) sortCommitteeBySize() { |
||||
sort.Slice(ss.shardState, func(i, j int) bool { |
||||
return len(ss.shardState[i].NodeList) > len(ss.shardState[j].NodeList) |
||||
}) |
||||
} |
||||
|
||||
// assignNewNodes add new nodes into the N/2 active committees evenly
|
||||
func (ss *ShardingState) assignNewNodes(newNodeList []types.NodeID) { |
||||
ss.sortCommitteeBySize() |
||||
numActiveShards := ss.numShards / 2 |
||||
Shuffle(newNodeList) |
||||
for i, nid := range newNodeList { |
||||
id := i % numActiveShards |
||||
ss.shardState[id].NodeList = append(ss.shardState[id].NodeList, nid) |
||||
} |
||||
} |
||||
|
||||
// cuckooResharding uses cuckoo rule to reshard X% of active committee(shards) into inactive committee(shards)
|
||||
func (ss *ShardingState) cuckooResharding(percent float64) { |
||||
ss.sortCommitteeBySize() |
||||
numActiveShards := ss.numShards / 2 |
||||
kickedNodes := []types.NodeID{} |
||||
for i := range ss.shardState { |
||||
if i >= numActiveShards { |
||||
break |
||||
} |
||||
Shuffle(ss.shardState[i].NodeList) |
||||
numKicked := int(percent * float64(len(ss.shardState[i].NodeList))) |
||||
tmp := ss.shardState[i].NodeList[:numKicked] |
||||
kickedNodes = append(kickedNodes, tmp...) |
||||
ss.shardState[i].NodeList = ss.shardState[i].NodeList[numKicked:] |
||||
} |
||||
|
||||
Shuffle(kickedNodes) |
||||
for i, nid := range kickedNodes { |
||||
id := numActiveShards + i%(ss.numShards-numActiveShards) |
||||
ss.shardState[id].NodeList = append(ss.shardState[id].NodeList, nid) |
||||
} |
||||
} |
||||
|
||||
// UpdateShardState will first add new nodes into shards, then use cuckoo rule to reshard to get new shard state
|
||||
func (ss *ShardingState) UpdateShardState(newNodeList []types.NodeID, percent float64) { |
||||
rand.Seed(ss.rnd) |
||||
ss.assignNewNodes(newNodeList) |
||||
ss.cuckooResharding(percent) |
||||
} |
||||
|
||||
// Shuffle will shuffle the list with result uniquely determined by seed, assuming there is no repeat items in the list
|
||||
func Shuffle(list []types.NodeID) { |
||||
sort.Slice(list, func(i, j int) bool { |
||||
return types.CompareNodeID(list[i], list[j]) == -1 |
||||
}) |
||||
rand.Shuffle(len(list), func(i, j int) { |
||||
list[i], list[j] = list[j], list[i] |
||||
}) |
||||
} |
||||
|
||||
// GetBlockNumberFromEpoch calculates the block number where epoch sharding information is stored
|
||||
func GetBlockNumberFromEpoch(epoch uint64) uint64 { |
||||
number := epoch * uint64(BlocksPerEpoch) // currently we use the first block in each epoch
|
||||
return number |
||||
} |
||||
|
||||
// GetEpochFromBlockNumber calculates the epoch number the block belongs to
|
||||
func GetEpochFromBlockNumber(blockNumber uint64) uint64 { |
||||
return blockNumber / uint64(BlocksPerEpoch) |
||||
} |
||||
|
||||
// CheckEpochBlock check whethere a given block number is the one to store epoch information
|
||||
func CheckEpochBlock(blockNumber uint64) bool { |
||||
return blockNumber%uint64(BlocksPerEpoch) == 0 |
||||
} |
||||
|
||||
// GetPreviousEpochBlockNumber gets the epoch block number of previous epoch
|
||||
func GetPreviousEpochBlockNumber(blockNumber uint64) uint64 { |
||||
epoch := GetEpochFromBlockNumber(blockNumber) |
||||
if epoch == 1 { |
||||
// no previous epoch
|
||||
return epoch |
||||
} |
||||
return GetBlockNumberFromEpoch(epoch - 1) |
||||
} |
||||
|
||||
// GetShardingStateFromBlockChain will retrieve random seed and shard map from beacon chain for given a epoch
|
||||
func GetShardingStateFromBlockChain(bc *BlockChain, epoch uint64) *ShardingState { |
||||
number := GetBlockNumberFromEpoch(epoch) |
||||
shardState := bc.GetShardStateByNumber(number) |
||||
rnd := bc.GetRandSeedByNumber(number) |
||||
|
||||
return &ShardingState{epoch: epoch, rnd: rnd, shardState: shardState, numShards: len(shardState)} |
||||
} |
||||
|
||||
// CalculateNewShardState get sharding state from previous epoch and calcualte sharding state for new epoch
|
||||
// TODO: currently, we just mock everything
|
||||
func CalculateNewShardState(bc *BlockChain, epoch uint64) types.ShardState { |
||||
if epoch == 1 { |
||||
return fakeGetInitShardState() |
||||
} |
||||
ss := GetShardingStateFromBlockChain(bc, epoch-1) |
||||
newNodeList := fakeNewNodeList(ss.rnd) |
||||
percent := ss.calculateKickoutRate(newNodeList) |
||||
ss.UpdateShardState(newNodeList, percent) |
||||
return ss.shardState |
||||
} |
||||
|
||||
// calculateKickoutRate calculates the cuckoo rule kick out rate in order to make committee balanced
|
||||
func (ss *ShardingState) calculateKickoutRate(newNodeList []types.NodeID) float64 { |
||||
numActiveCommittees := ss.numShards / 2 |
||||
newNodesPerShard := len(newNodeList) / numActiveCommittees |
||||
ss.sortCommitteeBySize() |
||||
return float64(newNodesPerShard) / float64(len(ss.shardState[numActiveCommittees].NodeList)) |
||||
} |
||||
|
||||
// FakeGenRandSeed generate random seed based on previous rnd seed; remove later after VRF implemented
|
||||
func FakeGenRandSeed(seed int64) int64 { |
||||
rand.Seed(seed) |
||||
return rand.Int63() |
||||
} |
||||
|
||||
// remove later after bootstrap codes ready
|
||||
func fakeGetInitShardState() types.ShardState { |
||||
rand.Seed(InitialSeed) |
||||
shardState := types.ShardState{} |
||||
for i := 0; i < 6; i++ { |
||||
sid := uint32(i) |
||||
com := types.Committee{ShardID: sid} |
||||
for j := 0; j < 10; j++ { |
||||
nid := strconv.Itoa(int(rand.Int63())) |
||||
com.NodeList = append(com.NodeList, types.NodeID(nid)) |
||||
} |
||||
shardState = append(shardState, com) |
||||
} |
||||
return shardState |
||||
} |
||||
|
||||
// remove later after new nodes list generation ready
|
||||
func fakeNewNodeList(seed int64) []types.NodeID { |
||||
rand.Seed(seed) |
||||
numNewNodes := rand.Intn(10) |
||||
nodeList := []types.NodeID{} |
||||
for i := 0; i < numNewNodes; i++ { |
||||
nid := strconv.Itoa(int(rand.Int63())) |
||||
nodeList = append(nodeList, types.NodeID(nid)) |
||||
} |
||||
return nodeList |
||||
} |
@ -0,0 +1,12 @@ |
||||
## Resharding |
||||
|
||||
In current design, the epoch is defined to be fixed length, the epoch length is a constant parameter BlocksPerEpoch. In future, it will be dynamically adjustable according to security parameter. During the epoch transition, suppose there are N shards, we sort the shards according to the size of active nodes (that had staking for next epoch). The first N/2 larger shards will be called active committees, and the last N/2 smaller shards will be called inactive committees. Don't be confused by |
||||
the name, they are all normal shards with same function. |
||||
|
||||
All the information about sharding will be stored in BeaconChain. A sharding state is defined as a map which maps each NodeID to the ShardID the node belongs to. Every node will have a unique NodeID and be mapped to one ShardID. At the beginning of a new epoch, the BeaconChain leader will propose a new block containing the new sharding state, the new sharding state is uniquely determined by the randomness generated by distributed randomness protocol. During the consensus process, all the validators will perform the same calculation and verify the proposed sharding state is valid. After consensus is reached, each node will write the new sharding state into the block. This block is called epoch block. In current code, it's the first block of each epoch in BeaconChain. |
||||
|
||||
The main function of resharding is CalculcateNewShardState. It will take 3 inputs: newNodeList, oldShardState, randomSeed and output newShardState. |
||||
The newNodeList will be retrieved from BeaconChain staking transaction during the previous epoch. The randomSeed and oldShardState is stored in previous epoch block. It should be noticed that the randomSeed generation currently is mocked. After the distributed randomness protocol(drand) is ready, the drand service will generate the random seed for resharding. |
||||
|
||||
The resharding process is as follows: we first get newNodeList from staking transactions from previous epoch and assign the new nodes evenly into the N/2 active committees. Then, we kick out X% of nodes from each active committees and put these kicked out nodes into inactive committees evenly. The percentage X roughly equals to the percentage of new nodes into active committee in order to balance the committee size. |
||||
|
@ -0,0 +1,18 @@ |
||||
package core |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
) |
||||
|
||||
func TestFakeGetInitShardState(t *testing.T) { |
||||
ss := fakeGetInitShardState() |
||||
for i := range ss { |
||||
fmt.Printf("ShardID: %v, NodeList: %v\n", ss[i].ShardID, ss[i].NodeList) |
||||
} |
||||
} |
||||
|
||||
func TestFakeNewNodeList(t *testing.T) { |
||||
nodeList := fakeNewNodeList(42) |
||||
fmt.Println("newNodeList: ", nodeList) |
||||
} |
@ -0,0 +1,68 @@ |
||||
package types |
||||
|
||||
import ( |
||||
"sort" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"golang.org/x/crypto/sha3" |
||||
) |
||||
|
||||
// NodeID is a unique ID represent a node
|
||||
type NodeID string |
||||
|
||||
// ShardState is the collection of all committees
|
||||
type ShardState []Committee |
||||
|
||||
// Committee contains the active nodes in one shard
|
||||
type Committee struct { |
||||
ShardID uint32 |
||||
NodeList []NodeID // a list of NodeID where NodeID is represented by a string
|
||||
} |
||||
|
||||
// GetHashFromNodeList will sort the list, then use Keccak256 to hash the list
|
||||
// notice that the input nodeList will be modified (sorted)
|
||||
func GetHashFromNodeList(nodeList []NodeID) []byte { |
||||
// in general, nodeList should not be empty
|
||||
if nodeList == nil || len(nodeList) == 0 { |
||||
return []byte{} |
||||
} |
||||
|
||||
sort.Slice(nodeList, func(i, j int) bool { |
||||
return CompareNodeID(nodeList[i], nodeList[j]) == -1 |
||||
}) |
||||
d := sha3.NewLegacyKeccak256() |
||||
for i := range nodeList { |
||||
d.Write(nodeList[i].Serialize()) |
||||
} |
||||
return d.Sum(nil) |
||||
} |
||||
|
||||
// Hash is the root hash of ShardState
|
||||
func (ss ShardState) Hash() (h common.Hash) { |
||||
sort.Slice(ss, func(i, j int) bool { |
||||
return ss[i].ShardID < ss[j].ShardID |
||||
}) |
||||
d := sha3.NewLegacyKeccak256() |
||||
for i := range ss { |
||||
hash := GetHashFromNodeList(ss[i].NodeList) |
||||
d.Write(hash) |
||||
} |
||||
d.Sum(h[:0]) |
||||
return h |
||||
} |
||||
|
||||
// CompareNodeID compares two nodes by their ID; used to sort node list
|
||||
func CompareNodeID(n1 NodeID, n2 NodeID) int { |
||||
if n1 < n2 { |
||||
return -1 |
||||
} |
||||
if n1 > n2 { |
||||
return 1 |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
// Serialize serialize NodeID into bytes
|
||||
func (n NodeID) Serialize() []byte { |
||||
return []byte(n) |
||||
} |
@ -0,0 +1,33 @@ |
||||
package types |
||||
|
||||
import ( |
||||
"bytes" |
||||
"testing" |
||||
) |
||||
|
||||
func TestGetHashFromNodeList(t *testing.T) { |
||||
l1 := []NodeID{"node1", "node2", "node3"} |
||||
l2 := []NodeID{"node2", "node1", "node3"} |
||||
h1 := GetHashFromNodeList(l1) |
||||
h2 := GetHashFromNodeList(l2) |
||||
|
||||
if bytes.Compare(h1, h2) != 0 { |
||||
t.Error("node list l1 and l2 should have equal hash") |
||||
} |
||||
} |
||||
|
||||
func TestHash(t *testing.T) { |
||||
com1 := Committee{ShardID: 22, NodeList: []NodeID{"node11", "node22", "node1"}} |
||||
com2 := Committee{ShardID: 2, NodeList: []NodeID{"node4", "node5", "node6"}} |
||||
shardState1 := ShardState{com1, com2} |
||||
h1 := shardState1.Hash() |
||||
|
||||
com3 := Committee{ShardID: 2, NodeList: []NodeID{"node6", "node5", "node4"}} |
||||
com4 := Committee{ShardID: 22, NodeList: []NodeID{"node1", "node11", "node22"}} |
||||
shardState2 := ShardState{com3, com4} |
||||
h2 := shardState2.Hash() |
||||
|
||||
if bytes.Compare(h1[:], h2[:]) != 0 { |
||||
t.Error("shardState1 and shardState2 should have equal hash") |
||||
} |
||||
} |
@ -0,0 +1,194 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
"crypto/sha256" |
||||
"encoding/binary" |
||||
"errors" |
||||
"strconv" |
||||
"sync" |
||||
|
||||
protobuf "github.com/golang/protobuf/proto" |
||||
"github.com/harmony-one/bls/ffi/go/bls" |
||||
drand_proto "github.com/harmony-one/harmony/api/drand" |
||||
bls_cosi "github.com/harmony-one/harmony/crypto/bls" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
) |
||||
|
||||
// DRand is the main struct which contains state for the distributed randomness protocol.
|
||||
type DRand struct { |
||||
vrfs *map[uint32][]byte |
||||
bitmap *bls_cosi.Mask |
||||
pRand *[32]byte |
||||
rand *[32]byte |
||||
|
||||
// map of nodeID to validator Peer object
|
||||
// FIXME: should use PubKey of p2p.Peer as the hashkey
|
||||
validators sync.Map // key is uint16, value is p2p.Peer
|
||||
|
||||
// Leader's address
|
||||
leader p2p.Peer |
||||
|
||||
// Public keys of the committee including leader and validators
|
||||
PublicKeys []*bls.PublicKey |
||||
|
||||
// private/public keys of current node
|
||||
priKey *bls.SecretKey |
||||
pubKey *bls.PublicKey |
||||
|
||||
// Whether I am leader. False means I am validator
|
||||
IsLeader bool |
||||
|
||||
// Leader or validator Id - 4 byte
|
||||
nodeID uint32 |
||||
|
||||
// The p2p host used to send/receive p2p messages
|
||||
host p2p.Host |
||||
|
||||
// Shard Id which this node belongs to
|
||||
ShardID uint32 |
||||
|
||||
// Blockhash - 32 byte
|
||||
blockHash [32]byte |
||||
} |
||||
|
||||
// New creates a new dRand object
|
||||
func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *DRand { |
||||
dRand := DRand{} |
||||
dRand.host = host |
||||
|
||||
selfPeer := host.GetSelfPeer() |
||||
if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { |
||||
dRand.IsLeader = true |
||||
} else { |
||||
dRand.IsLeader = false |
||||
} |
||||
|
||||
dRand.leader = leader |
||||
for _, peer := range peers { |
||||
dRand.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) |
||||
} |
||||
|
||||
dRand.vrfs = &map[uint32][]byte{} |
||||
|
||||
// Initialize cosign bitmap
|
||||
allPublicKeys := make([]*bls.PublicKey, 0) |
||||
for _, validatorPeer := range peers { |
||||
allPublicKeys = append(allPublicKeys, validatorPeer.PubKey) |
||||
} |
||||
allPublicKeys = append(allPublicKeys, leader.PubKey) |
||||
|
||||
dRand.PublicKeys = allPublicKeys |
||||
|
||||
bitmap, _ := bls_cosi.NewMask(dRand.PublicKeys, dRand.leader.PubKey) |
||||
dRand.bitmap = bitmap |
||||
|
||||
dRand.pRand = nil |
||||
dRand.rand = nil |
||||
|
||||
// For now use socket address as ID
|
||||
// TODO: populate Id derived from address
|
||||
dRand.nodeID = utils.GetUniqueIDFromPeer(selfPeer) |
||||
|
||||
// Set private key for myself so that I can sign messages.
|
||||
nodeIDBytes := make([]byte, 32) |
||||
binary.LittleEndian.PutUint32(nodeIDBytes, dRand.nodeID) |
||||
privateKey := bls.SecretKey{} |
||||
err := privateKey.SetLittleEndian(nodeIDBytes) |
||||
dRand.priKey = &privateKey |
||||
dRand.pubKey = privateKey.GetPublicKey() |
||||
|
||||
myShardID, err := strconv.Atoi(ShardID) |
||||
if err != nil { |
||||
panic("Unparseable shard Id" + ShardID) |
||||
} |
||||
dRand.ShardID = uint32(myShardID) |
||||
|
||||
return &dRand |
||||
} |
||||
|
||||
// Sign on the drand message signature field.
|
||||
func (dRand *DRand) signDRandMessage(message *drand_proto.Message) error { |
||||
message.Signature = nil |
||||
// TODO: use custom serialization method rather than protobuf
|
||||
marshaledMessage, err := protobuf.Marshal(message) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
// 64 byte of signature on previous data
|
||||
hash := sha256.Sum256(marshaledMessage) |
||||
signature := dRand.priKey.SignHash(hash[:]) |
||||
|
||||
message.Signature = signature.Serialize() |
||||
return nil |
||||
} |
||||
|
||||
// Signs the drand message and returns the marshaled message.
|
||||
func (dRand *DRand) signAndMarshalDRandMessage(message *drand_proto.Message) ([]byte, error) { |
||||
err := dRand.signDRandMessage(message) |
||||
if err != nil { |
||||
return []byte{}, err |
||||
} |
||||
|
||||
marshaledMessage, err := protobuf.Marshal(message) |
||||
if err != nil { |
||||
return []byte{}, err |
||||
} |
||||
return marshaledMessage, nil |
||||
} |
||||
|
||||
func (dRand *DRand) vrf(blockHash [32]byte) (rand [32]byte, proof []byte) { |
||||
// TODO: implement vrf
|
||||
return [32]byte{}, []byte{} |
||||
} |
||||
|
||||
// GetValidatorPeers returns list of validator peers.
|
||||
func (dRand *DRand) GetValidatorPeers() []p2p.Peer { |
||||
validatorPeers := make([]p2p.Peer, 0) |
||||
|
||||
dRand.validators.Range(func(k, v interface{}) bool { |
||||
if peer, ok := v.(p2p.Peer); ok { |
||||
validatorPeers = append(validatorPeers, peer) |
||||
return true |
||||
} |
||||
return false |
||||
}) |
||||
|
||||
return validatorPeers |
||||
} |
||||
|
||||
// Verify the signature of the message are valid from the signer's public key.
|
||||
func verifyMessageSig(signerPubKey *bls.PublicKey, message drand_proto.Message) error { |
||||
signature := message.Signature |
||||
message.Signature = nil |
||||
messageBytes, err := protobuf.Marshal(&message) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
msgSig := bls.Sign{} |
||||
err = msgSig.Deserialize(signature) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
msgHash := sha256.Sum256(messageBytes) |
||||
if !msgSig.VerifyHash(signerPubKey, msgHash[:]) { |
||||
return errors.New("failed to verify the signature") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Gets the validator peer based on validator ID.
|
||||
func (dRand *DRand) getValidatorPeerByID(validatorID uint32) *p2p.Peer { |
||||
v, ok := dRand.validators.Load(validatorID) |
||||
if !ok { |
||||
utils.GetLogInstance().Warn("Unrecognized validator", "validatorID", validatorID, "dRand", dRand) |
||||
return nil |
||||
} |
||||
value, ok := v.(p2p.Peer) |
||||
if !ok { |
||||
utils.GetLogInstance().Warn("Invalid validator", "validatorID", validatorID, "dRand", dRand) |
||||
return nil |
||||
} |
||||
return &value |
||||
} |
@ -0,0 +1,93 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
protobuf "github.com/golang/protobuf/proto" |
||||
drand_proto "github.com/harmony-one/harmony/api/drand" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p/host" |
||||
) |
||||
|
||||
// WaitForEpochBlock waits for the first epoch block to run DRG on
|
||||
func (dRand *DRand) WaitForEpochBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}) { |
||||
go func() { |
||||
defer close(stoppedChan) |
||||
for { |
||||
select { |
||||
default: |
||||
// keep waiting for new blocks
|
||||
newBlock := <-blockChannel |
||||
// TODO: think about potential race condition
|
||||
|
||||
dRand.init(newBlock) |
||||
case <-stopChan: |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
|
||||
func (dRand *DRand) init(epochBlock *types.Block) { |
||||
// Copy over block hash and block header data
|
||||
blockHash := epochBlock.Hash() |
||||
copy(dRand.blockHash[:], blockHash[:]) |
||||
|
||||
msgToSend := dRand.constructInitMessage() |
||||
|
||||
// Leader commit vrf itself
|
||||
rand, proof := dRand.vrf(dRand.blockHash) |
||||
|
||||
(*dRand.vrfs)[dRand.nodeID] = append(rand[:], proof...) |
||||
|
||||
host.BroadcastMessageFromLeader(dRand.host, dRand.GetValidatorPeers(), msgToSend, nil) |
||||
} |
||||
|
||||
// ProcessMessageLeader dispatches messages for the leader to corresponding processors.
|
||||
func (dRand *DRand) ProcessMessageLeader(payload []byte) { |
||||
message := drand_proto.Message{} |
||||
err := protobuf.Unmarshal(payload, &message) |
||||
|
||||
if err != nil { |
||||
utils.GetLogInstance().Error("Failed to unmarshal message payload.", "err", err, "dRand", dRand) |
||||
} |
||||
|
||||
switch message.Type { |
||||
case drand_proto.MessageType_COMMIT: |
||||
dRand.processCommitMessage(message) |
||||
default: |
||||
utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand) |
||||
} |
||||
} |
||||
|
||||
// ProcessMessageValidator dispatches validator's consensus message.
|
||||
func (dRand *DRand) processCommitMessage(message drand_proto.Message) { |
||||
if message.Type != drand_proto.MessageType_COMMIT { |
||||
utils.GetLogInstance().Error("Wrong message type received", "expected", drand_proto.MessageType_COMMIT, "got", message.Type) |
||||
return |
||||
} |
||||
|
||||
// Verify message signature
|
||||
err := verifyMessageSig(dRand.leader.PubKey, message) |
||||
if err != nil { |
||||
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) |
||||
return |
||||
} |
||||
|
||||
rand := message.Payload[:32] |
||||
proof := message.Payload[32:] |
||||
_ = rand |
||||
_ = proof |
||||
// TODO: check the validity of the vrf commit
|
||||
|
||||
validatorID := message.SenderId |
||||
validatorPeer := dRand.getValidatorPeerByID(validatorID) |
||||
vrfs := dRand.vrfs |
||||
utils.GetLogInstance().Debug("Received new prepare signature", "numReceivedSoFar", len((*vrfs)), "validatorID", validatorID, "PublicKeys", len(dRand.PublicKeys)) |
||||
|
||||
(*vrfs)[validatorID] = message.Payload |
||||
dRand.bitmap.SetKey(validatorPeer.PubKey, true) // Set the bitmap indicating that this validator signed.
|
||||
|
||||
if len((*vrfs)) >= ((len(dRand.PublicKeys))/3 + 1) { |
||||
// Construct pRand and initiate consensus on it
|
||||
} |
||||
} |
@ -0,0 +1,21 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
drand_proto "github.com/harmony-one/harmony/api/drand" |
||||
"github.com/harmony-one/harmony/api/proto" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
) |
||||
|
||||
// Constructs the init message
|
||||
func (drand *DRand) constructInitMessage() []byte { |
||||
message := drand_proto.Message{} |
||||
message.Type = drand_proto.MessageType_INIT |
||||
|
||||
message.BlockHash = drand.blockHash[:] |
||||
// Don't need the payload in init message
|
||||
marshaledMessage, err := drand.signAndMarshalDRandMessage(&message) |
||||
if err != nil { |
||||
utils.GetLogInstance().Error("Failed to sign and marshal the init message", "error", err) |
||||
} |
||||
return proto.ConstructDRandMessage(marshaledMessage) |
||||
} |
@ -0,0 +1,26 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/p2pimpl" |
||||
) |
||||
|
||||
func TestConstructInitMessage(test *testing.T) { |
||||
leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} |
||||
validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"} |
||||
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") |
||||
host, err := p2pimpl.NewHost(&leader, priKey) |
||||
if err != nil { |
||||
test.Fatalf("newhost failure: %v", err) |
||||
} |
||||
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) |
||||
dRand.blockHash = [32]byte{} |
||||
msg := dRand.constructInitMessage() |
||||
|
||||
if len(msg) != 87 { |
||||
test.Errorf("Init message is not constructed in the correct size: %d", len(msg)) |
||||
} |
||||
} |
@ -0,0 +1,24 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/p2pimpl" |
||||
) |
||||
|
||||
func TestNew(test *testing.T) { |
||||
leader := p2p.Peer{IP: "127.0.0.1", Port: "9902"} |
||||
validator := p2p.Peer{IP: "127.0.0.1", Port: "9905"} |
||||
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") |
||||
host, err := p2pimpl.NewHost(&leader, priKey) |
||||
if err != nil { |
||||
test.Fatalf("newhost failure: %v", err) |
||||
} |
||||
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) |
||||
|
||||
if !dRand.IsLeader { |
||||
test.Error("dRand should belong to a leader") |
||||
} |
||||
} |
@ -0,0 +1,52 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
protobuf "github.com/golang/protobuf/proto" |
||||
drand_proto "github.com/harmony-one/harmony/api/drand" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p/host" |
||||
) |
||||
|
||||
// ProcessMessageValidator dispatches messages for the validator to corresponding processors.
|
||||
func (dRand *DRand) ProcessMessageValidator(payload []byte) { |
||||
message := drand_proto.Message{} |
||||
err := protobuf.Unmarshal(payload, &message) |
||||
|
||||
if err != nil { |
||||
utils.GetLogInstance().Error("Failed to unmarshal message payload.", "err", err, "dRand", dRand) |
||||
} |
||||
|
||||
switch message.Type { |
||||
case drand_proto.MessageType_COMMIT: |
||||
dRand.processInitMessage(message) |
||||
default: |
||||
utils.GetLogInstance().Error("Unexpected message type", "msgType", message.Type, "dRand", dRand) |
||||
} |
||||
} |
||||
|
||||
// ProcessMessageValidator dispatches validator's consensus message.
|
||||
func (dRand *DRand) processInitMessage(message drand_proto.Message) { |
||||
if message.Type != drand_proto.MessageType_INIT { |
||||
utils.GetLogInstance().Error("Wrong message type received", "expected", drand_proto.MessageType_INIT, "got", message.Type) |
||||
return |
||||
} |
||||
|
||||
blockHash := message.BlockHash |
||||
|
||||
// Verify message signature
|
||||
err := verifyMessageSig(dRand.leader.PubKey, message) |
||||
if err != nil { |
||||
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) |
||||
return |
||||
} |
||||
|
||||
// TODO: check the blockHash is the block hash of last block of last epoch.
|
||||
copy(dRand.blockHash[:], blockHash[:]) |
||||
|
||||
rand, proof := dRand.vrf(dRand.blockHash) |
||||
|
||||
msgToSend := dRand.constructCommitMessage(rand, proof) |
||||
|
||||
// Send the commit message back to leader
|
||||
host.SendMessage(dRand.host, dRand.leader, msgToSend, nil) |
||||
} |
@ -0,0 +1,22 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
drand_proto "github.com/harmony-one/harmony/api/drand" |
||||
"github.com/harmony-one/harmony/api/proto" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
) |
||||
|
||||
// Constructs the init message
|
||||
func (drand *DRand) constructCommitMessage(vrf [32]byte, proof []byte) []byte { |
||||
message := drand_proto.Message{} |
||||
message.Type = drand_proto.MessageType_COMMIT |
||||
|
||||
message.BlockHash = drand.blockHash[:] |
||||
message.Payload = append(vrf[:], proof...) |
||||
|
||||
marshaledMessage, err := drand.signAndMarshalDRandMessage(&message) |
||||
if err != nil { |
||||
utils.GetLogInstance().Error("Failed to sign and marshal the commit message", "error", err) |
||||
} |
||||
return proto.ConstructDRandMessage(marshaledMessage) |
||||
} |
@ -0,0 +1,26 @@ |
||||
package drand |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
"github.com/harmony-one/harmony/p2p" |
||||
"github.com/harmony-one/harmony/p2p/p2pimpl" |
||||
) |
||||
|
||||
func TestConstructCommitMessage(test *testing.T) { |
||||
leader := p2p.Peer{IP: "127.0.0.1", Port: "19999"} |
||||
validator := p2p.Peer{IP: "127.0.0.1", Port: "55555"} |
||||
priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") |
||||
host, err := p2pimpl.NewHost(&leader, priKey) |
||||
if err != nil { |
||||
test.Fatalf("newhost failure: %v", err) |
||||
} |
||||
dRand := New(host, "0", []p2p.Peer{leader, validator}, leader) |
||||
dRand.blockHash = [32]byte{} |
||||
msg := dRand.constructCommitMessage([32]byte{}, []byte{}) |
||||
|
||||
if len(msg) != 121 { |
||||
test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) |
||||
} |
||||
} |
@ -0,0 +1,57 @@ |
||||
package utils |
||||
|
||||
import ( |
||||
"github.com/ethereum/go-ethereum/log" |
||||
net "github.com/libp2p/go-libp2p-net" |
||||
ma "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
type connLogger struct{} |
||||
|
||||
func (connLogger) Listen(net net.Network, ma ma.Multiaddr) { |
||||
log.Debug("[CONNECTIONS] Listener starting", "net", net, "addr", ma) |
||||
} |
||||
|
||||
func (connLogger) ListenClose(net net.Network, ma ma.Multiaddr) { |
||||
log.Debug("[CONNECTIONS] Listener closing", "net", net, "addr", ma) |
||||
} |
||||
|
||||
func (connLogger) Connected(net net.Network, conn net.Conn) { |
||||
log.Debug("[CONNECTIONS] Connected", "net", net, |
||||
"localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), |
||||
"remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), |
||||
) |
||||
} |
||||
|
||||
func (connLogger) Disconnected(net net.Network, conn net.Conn) { |
||||
log.Debug("[CONNECTIONS] Disconnected", "net", net, |
||||
"localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), |
||||
"remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), |
||||
) |
||||
} |
||||
|
||||
func (connLogger) OpenedStream(net net.Network, stream net.Stream) { |
||||
conn := stream.Conn() |
||||
log.Debug("[CONNECTIONS] Stream opened", "net", net, |
||||
"localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), |
||||
"remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), |
||||
"protocol", stream.Protocol(), |
||||
) |
||||
} |
||||
|
||||
func (connLogger) ClosedStream(net net.Network, stream net.Stream) { |
||||
conn := stream.Conn() |
||||
log.Debug("[CONNECTIONS] Stream closed", "net", net, |
||||
"localPeer", conn.LocalPeer(), "localAddr", conn.LocalMultiaddr(), |
||||
"remotePeer", conn.RemotePeer(), "remoteAddr", conn.RemoteMultiaddr(), |
||||
"protocol", stream.Protocol(), |
||||
) |
||||
} |
||||
|
||||
// ConnLogger is a LibP2P connection logger.
|
||||
// Add on a LibP2P host by calling:
|
||||
//
|
||||
// host.Network().Notify(utils.ConnLogger)
|
||||
//
|
||||
// It logs all listener/connection/stream open/close activities at debug level.
|
||||
var ConnLogger connLogger |
@ -0,0 +1,22 @@ |
||||
The MIT License (MIT) |
||||
|
||||
Copyright (c) 2018 Protocol Labs |
||||
Copyright (c) 2019 Harmony.One |
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
of this software and associated documentation files (the "Software"), to deal |
||||
in the Software without restriction, including without limitation the rights |
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the Software is |
||||
furnished to do so, subject to the following conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be included in |
||||
all copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
||||
THE SOFTWARE. |
@ -0,0 +1,162 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"bufio" |
||||
"context" |
||||
"flag" |
||||
"fmt" |
||||
"os" |
||||
"sync" |
||||
|
||||
"github.com/ipfs/go-log" |
||||
"github.com/libp2p/go-libp2p" |
||||
discovery "github.com/libp2p/go-libp2p-discovery" |
||||
libp2pdht "github.com/libp2p/go-libp2p-kad-dht" |
||||
peer "github.com/libp2p/go-libp2p-peer" |
||||
peerstore "github.com/libp2p/go-libp2p-peerstore" |
||||
pubsub "github.com/libp2p/go-libp2p-pubsub" |
||||
multiaddr "github.com/multiformats/go-multiaddr" |
||||
logging "github.com/whyrusleeping/go-logging" |
||||
) |
||||
|
||||
var logger = log.Logger("rendezvous") |
||||
|
||||
// Harmony MIT License
|
||||
func writePubsub(ps *pubsub.PubSub) { |
||||
stdReader := bufio.NewReader(os.Stdin) |
||||
for { |
||||
fmt.Print("> ") |
||||
data, _ := stdReader.ReadString('\n') |
||||
ps.Publish("pubsubtestchannel", []byte(data)) |
||||
} |
||||
} |
||||
|
||||
// Harmony MIT License
|
||||
func readPubsub(sub *pubsub.Subscription) { |
||||
ctx := context.Background() |
||||
for { |
||||
m, err := sub.Next(ctx) |
||||
|
||||
if err == nil { |
||||
msg := m.Data |
||||
sender := peer.ID(m.From) |
||||
fmt.Printf("Received pubsub: '%v' from: %v\n", string(msg), sender) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func main() { |
||||
log.SetAllLoggers(logging.WARNING) |
||||
log.SetLogLevel("rendezvous", "info") |
||||
help := flag.Bool("h", false, "Display Help") |
||||
config, err := ParseFlags() |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
if *help { |
||||
fmt.Println("This program demonstrates a simple p2p chat application using libp2p") |
||||
fmt.Println() |
||||
fmt.Println("Usage: Run './p2pchat in two different terminals. Let them connect to the bootstrap nodes, announce themselves and connect to the peers") |
||||
flag.PrintDefaults() |
||||
return |
||||
} |
||||
|
||||
ctx := context.Background() |
||||
|
||||
// libp2p.New constructs a new libp2p Host. Other options can be added
|
||||
// here.
|
||||
host, err := libp2p.New(ctx, |
||||
libp2p.ListenAddrs([]multiaddr.Multiaddr(config.ListenAddresses)...), |
||||
) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
logger.Info("Host created. We are:", host.ID()) |
||||
logger.Info(host.Addrs()) |
||||
|
||||
// Start a DHT, for use in peer discovery. We can't just make a new DHT
|
||||
// client because we want each peer to maintain its own local copy of the
|
||||
// DHT, so that the bootstrapping node of the DHT can go down without
|
||||
// inhibiting future peer discovery.
|
||||
kademliaDHT, err := libp2pdht.New(ctx, host) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
// Bootstrap the DHT. In the default configuration, this spawns a Background
|
||||
// thread that will refresh the peer table every five minutes.
|
||||
logger.Debug("Bootstrapping the DHT") |
||||
if err = kademliaDHT.Bootstrap(ctx); err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
// Let's connect to the bootstrap nodes first. They will tell us about the
|
||||
// other nodes in the network.
|
||||
var wg sync.WaitGroup |
||||
for _, peerAddr := range config.BootstrapPeers { |
||||
peerinfo, _ := peerstore.InfoFromP2pAddr(peerAddr) |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
if err := host.Connect(ctx, *peerinfo); err != nil { |
||||
logger.Warning(err) |
||||
} else { |
||||
logger.Info("Connection established with bootstrap node:", *peerinfo) |
||||
} |
||||
}() |
||||
} |
||||
wg.Wait() |
||||
|
||||
// We use a rendezvous point "meet me here" to announce our location.
|
||||
// This is like telling your friends to meet you at the Eiffel Tower.
|
||||
logger.Info("Announcing ourselves...") |
||||
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT) |
||||
discovery.Advertise(ctx, routingDiscovery, config.RendezvousString) |
||||
logger.Debug("Successfully announced!") |
||||
|
||||
// Now, look for others who have announced
|
||||
// This is like your friend telling you the location to meet you.
|
||||
logger.Debug("Searching for other peers...") |
||||
peerChan, err := routingDiscovery.FindPeers(ctx, config.RendezvousString) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
var ps *pubsub.PubSub |
||||
|
||||
switch config.PubSubImpl { |
||||
case "gossip": |
||||
ps, err = pubsub.NewGossipSub(ctx, host) |
||||
case "flood": |
||||
ps, err = pubsub.NewFloodSub(ctx, host) |
||||
default: |
||||
logger.Error("Unsupported Pubsub implementation") |
||||
return |
||||
} |
||||
|
||||
if err != nil { |
||||
fmt.Printf("pubsub error: %v", err) |
||||
panic(err) |
||||
} |
||||
|
||||
sub, err := ps.Subscribe("pubsubtestchannel") |
||||
|
||||
go writePubsub(ps) |
||||
go readPubsub(sub) |
||||
|
||||
for peer := range peerChan { |
||||
if peer.ID == host.ID() { |
||||
continue |
||||
} |
||||
logger.Debug("Found peer:", peer) |
||||
|
||||
if err := host.Connect(ctx, peer); err != nil { |
||||
logger.Warning("can't connect to peer", "error", err, "peer", peer) |
||||
} else { |
||||
logger.Info("connected to peer host", "node", peer) |
||||
} |
||||
} |
||||
|
||||
select {} |
||||
} |
@ -0,0 +1,76 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"flag" |
||||
"strings" |
||||
|
||||
maddr "github.com/multiformats/go-multiaddr" |
||||
) |
||||
|
||||
// A new type we need for writing a custom flag parser
|
||||
type addrList []maddr.Multiaddr |
||||
|
||||
func (al *addrList) String() string { |
||||
strs := make([]string, len(*al)) |
||||
for i, addr := range *al { |
||||
strs[i] = addr.String() |
||||
} |
||||
return strings.Join(strs, ",") |
||||
} |
||||
|
||||
func (al *addrList) Set(value string) error { |
||||
addr, err := maddr.NewMultiaddr(value) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
*al = append(*al, addr) |
||||
return nil |
||||
} |
||||
|
||||
// Harmony test bootstrap nodes. Used to find other peers in the network.
|
||||
var defaultBootstrapAddrStrings = []string{ |
||||
"/ip4/127.0.0.1/tcp/9876/p2p/QmayB8NwxmfGE4Usb4H61M8uwbfc7LRbmXb3ChseJgbVuf", |
||||
} |
||||
|
||||
// StringsToAddrs ...
|
||||
func StringsToAddrs(addrStrings []string) (maddrs []maddr.Multiaddr, err error) { |
||||
for _, addrString := range addrStrings { |
||||
addr, err := maddr.NewMultiaddr(addrString) |
||||
if err != nil { |
||||
return maddrs, err |
||||
} |
||||
maddrs = append(maddrs, addr) |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Config ...
|
||||
type Config struct { |
||||
RendezvousString string |
||||
BootstrapPeers addrList |
||||
ListenAddresses addrList |
||||
ProtocolID string |
||||
PubSubImpl string |
||||
} |
||||
|
||||
// ParseFlags ...
|
||||
func ParseFlags() (Config, error) { |
||||
config := Config{} |
||||
flag.StringVar(&config.RendezvousString, "rendezvous", "meet me here", |
||||
"Unique string to identify group of nodes. Share this with your friends to let them connect with you") |
||||
flag.Var(&config.BootstrapPeers, "peer", "Adds a peer multiaddress to the bootstrap list") |
||||
flag.Var(&config.ListenAddresses, "listen", "Adds a multiaddress to the listen list") |
||||
flag.StringVar(&config.ProtocolID, "pid", "/chat/1.1.0", "Sets a protocol id for stream headers") |
||||
flag.StringVar(&config.PubSubImpl, "pubsub", "gossip", "Set the pubsub implementation: gossip, flood") |
||||
flag.Parse() |
||||
|
||||
if len(config.BootstrapPeers) == 0 { |
||||
bootstrapPeerAddrs, err := StringsToAddrs(defaultBootstrapAddrStrings) |
||||
if err != nil { |
||||
return config, err |
||||
} |
||||
config.BootstrapPeers = bootstrapPeerAddrs |
||||
} |
||||
|
||||
return config, nil |
||||
} |
Loading…
Reference in new issue