Merge branch 'main' into requestmanager

pull/3560/head
Rongjian Lan 4 years ago committed by GitHub
commit 1c02ef1861
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      consensus/view_change.go
  2. 50
      consensus/view_change_construct.go
  3. 11
      hmy/blockchain.go
  4. 2
      internal/chain/engine.go
  5. 4
      node/node.go
  6. 8
      rpc/eth/types.go
  7. 20
      rpc/filters/api.go
  8. 16
      rpc/filters/filter.go
  9. 6
      rpc/filters/filter_system.go
  10. 1
      staking/availability/measure.go

@ -258,14 +258,16 @@ func (consensus *Consensus) startViewChange() {
defer consensus.consensusTimeout[timeoutViewChange].Start()
// update the dictionary key if the viewID is first time received
consensus.vc.AddViewIDKeyIfNotExist(nextViewID, consensus.Decider.Participants())
members := consensus.Decider.Participants()
consensus.vc.AddViewIDKeyIfNotExist(nextViewID, members)
// init my own payload
if err := consensus.vc.InitPayload(
consensus.FBFTLog,
nextViewID,
consensus.blockNum,
consensus.priKey); err != nil {
consensus.priKey,
members); err != nil {
consensus.getLogger().Error().Err(err).Msg("[startViewChange] Init Payload Error")
}
@ -387,7 +389,8 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) {
if err := consensus.vc.InitPayload(consensus.FBFTLog,
recvMsg.ViewID,
recvMsg.BlockNum,
consensus.priKey); err != nil {
consensus.priKey,
members); err != nil {
consensus.getLogger().Error().Err(err).Msg("[onViewChange] Init Payload Error")
return
}

@ -285,14 +285,30 @@ func (vc *viewChange) ProcessViewChangeMsg(
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M1 (prepared) type message")
if _, ok := vc.bhpSigs[recvMsg.ViewID]; !ok {
vc.bhpSigs[recvMsg.ViewID] = map[string]*bls_core.Sign{}
}
vc.bhpSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewchangeSig
if _, ok := vc.bhpBitmap[recvMsg.ViewID]; !ok {
bhpBitmap, _ := bls_cosi.NewMask(decider.Participants(), nil)
vc.bhpBitmap[recvMsg.ViewID] = bhpBitmap
}
vc.bhpBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M3 (ViewID) type message")
if _, ok := vc.viewIDSigs[recvMsg.ViewID]; !ok {
vc.viewIDSigs[recvMsg.ViewID] = map[string]*bls_core.Sign{}
}
vc.viewIDSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewidSig
if _, ok := vc.viewIDBitmap[recvMsg.ViewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(decider.Participants(), nil)
vc.viewIDBitmap[recvMsg.ViewID] = viewIDBitmap
}
// Set the bitmap indicating that this validator signed.
vc.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
@ -328,15 +344,32 @@ func (vc *viewChange) ProcessViewChangeMsg(
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M2 (NIL) type message")
if _, ok := vc.nilSigs[recvMsg.ViewID]; !ok {
vc.nilSigs[recvMsg.ViewID] = map[string]*bls_core.Sign{}
}
vc.nilSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewchangeSig
if _, ok := vc.nilBitmap[recvMsg.ViewID]; !ok {
nilBitmap, _ := bls_cosi.NewMask(decider.Participants(), nil)
vc.nilBitmap[recvMsg.ViewID] = nilBitmap
}
vc.nilBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true) // Set the bitmap indicating that this validator signed.
vc.getLogger().Info().Uint64("viewID", recvMsg.ViewID).
Str("validatorPubKey", senderKeyStr).
Msg("[ProcessViewChangeMsg] Add M3 (ViewID) type message")
if _, ok := vc.viewIDSigs[recvMsg.ViewID]; !ok {
vc.viewIDSigs[recvMsg.ViewID] = map[string]*bls_core.Sign{}
}
vc.viewIDSigs[recvMsg.ViewID][senderKeyStr] = recvMsg.ViewidSig
// Set the bitmap indicating that this validator signed.
if _, ok := vc.viewIDBitmap[recvMsg.ViewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(decider.Participants(), nil)
vc.viewIDBitmap[recvMsg.ViewID] = viewIDBitmap
}
vc.viewIDBitmap[recvMsg.ViewID].SetKey(senderKey.Bytes, true)
return nil
@ -348,6 +381,7 @@ func (vc *viewChange) InitPayload(
viewID uint64,
blockNum uint64,
privKeys multibls.PrivateKeys,
members multibls.PublicKeys,
) error {
// m1 or m2 init once per viewID/key.
// m1 and m2 are mutually exclusive.
@ -376,10 +410,18 @@ func (vc *viewChange) InitPayload(
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M1 (prepared) type messaage")
msgToSign := append(preparedMsg.BlockHash[:], preparedMsg.Payload...)
for _, key := range privKeys {
// update the dictionary key if the viewID is first time received
if _, ok := vc.bhpBitmap[viewID]; !ok {
bhpBitmap, _ := bls_cosi.NewMask(members, nil)
vc.bhpBitmap[viewID] = bhpBitmap
}
if err := vc.bhpBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] bhpBitmap setkey failed")
continue
}
if _, ok := vc.bhpSigs[viewID]; !ok {
vc.bhpSigs[viewID] = map[string]*bls_core.Sign{}
}
vc.bhpSigs[viewID][key.Pub.Bytes.Hex()] = key.Pri.SignHash(msgToSign)
}
hasBlock = true
@ -393,6 +435,10 @@ func (vc *viewChange) InitPayload(
if !hasBlock {
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M2 (NIL) type messaage")
for _, key := range privKeys {
if _, ok := vc.nilBitmap[viewID]; !ok {
nilBitmap, _ := bls_cosi.NewMask(members, nil)
vc.nilBitmap[viewID] = nilBitmap
}
if err := vc.nilBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] nilBitmap setkey failed")
continue
@ -420,6 +466,10 @@ func (vc *viewChange) InitPayload(
binary.LittleEndian.PutUint64(viewIDBytes, viewID)
vc.getLogger().Info().Uint64("viewID", viewID).Uint64("blockNum", blockNum).Msg("[InitPayload] add my M3 (ViewID) type messaage")
for _, key := range privKeys {
if _, ok := vc.viewIDBitmap[viewID]; !ok {
viewIDBitmap, _ := bls_cosi.NewMask(members, nil)
vc.viewIDBitmap[viewID] = viewIDBitmap
}
if err := vc.viewIDBitmap[viewID].SetKey(key.Pub.Bytes, true); err != nil {
vc.getLogger().Warn().Str("key", key.Pub.Bytes.Hex()).Msg("[InitPayload] viewIDBitmap setkey failed")
continue

@ -308,11 +308,20 @@ func (hmy *Harmony) GetLeaderAddress(coinbaseAddr common.Address, epoch *big.Int
// Filter related APIs
// GetLogs ...
func (hmy *Harmony) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) {
func (hmy *Harmony) GetLogs(ctx context.Context, blockHash common.Hash, isEth bool) ([][]*types.Log, error) {
receipts := hmy.BlockChain.GetReceiptsByHash(blockHash)
if receipts == nil {
return nil, errors.New("Missing receipts")
}
if isEth {
for i, _ := range receipts {
for j, _ := range receipts[i].Logs {
// Override log txHash with receipt's
receipts[i].Logs[j].TxHash = receipts[i].TxHash
}
}
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs

@ -312,7 +312,7 @@ func payoutUndelegations(
countTrack[validator] = len(wrapper.Delegations)
}
utils.Logger().Info().
utils.Logger().Debug().
Uint64("epoch", header.Epoch().Uint64()).
Uint64("block-number", header.Number().Uint64()).
Interface("count-track", countTrack).

@ -767,8 +767,8 @@ func (node *Node) StartPubSub() error {
case <-node.psCtx.Done():
return
case m := <-msgChanConsensus:
// should not take more than 10 seconds to process one message
ctx, cancel := context.WithTimeout(node.psCtx, 10*time.Second)
// should not take more than 30 seconds to process one message
ctx, cancel := context.WithTimeout(node.psCtx, 30*time.Second)
msg := m
go func() {
defer cancel()

@ -112,10 +112,16 @@ func NewReceipt(tx *types.EthTransaction, blockHash common.Hash, blockNumber, bl
return nil, err
}
ethTxHash := tx.Hash()
for i, _ := range receipt.Logs {
// Override log txHash with receipt's
receipt.Logs[i].TxHash = ethTxHash
}
fields := map[string]interface{}{
"blockHash": blockHash,
"blockNumber": hexutil.Uint64(blockNumber),
"transactionHash": tx.Hash(),
"transactionHash": ethTxHash,
"transactionIndex": hexutil.Uint64(blockIndex),
"from": senderAddr,
"to": tx.To(),

@ -37,14 +37,16 @@ type PublicFilterAPI struct {
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
namespace string
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool, namespace string) rpc.API {
api := &PublicFilterAPI{
backend: backend,
events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
backend: backend,
events: NewEventSystem(backend, lightMode, namespace == "eth"),
filters: make(map[rpc.ID]*filter),
namespace: namespace,
}
go api.timeoutLoop()
@ -56,6 +58,10 @@ func NewPublicFilterAPI(backend Backend, lightMode bool, namespace string) rpc.A
}
}
func (api *PublicFilterAPI) isEth() bool {
return api.namespace == "eth"
}
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
// Tt is started when the api is created.
func (api *PublicFilterAPI) timeoutLoop() {
@ -357,7 +363,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics, api.isEth())
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
@ -369,7 +375,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics, api.isEth())
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
@ -412,7 +418,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics, api.isEth())
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
@ -424,7 +430,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics, api.isEth())
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)

@ -39,7 +39,7 @@ type Backend interface {
HeaderByNumber(ctx context.Context, blockNum rpc.BlockNumber) (*block.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*block.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
GetLogs(ctx context.Context, blockHash common.Hash, isEth bool) ([][]*types.Log, error)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
@ -62,11 +62,12 @@ type Filter struct {
begin, end int64 // Range interval if filtering multiple blocks
matcher *bloombits.Matcher
isEth bool // Whether this is used for eth_ rpc.
}
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not.
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash, isEth bool) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
@ -88,7 +89,7 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
size, _ := backend.BloomStatus()
// Create a generic filter and convert it into a range filter
filter := newFilter(backend, addresses, topics)
filter := newFilter(backend, addresses, topics, isEth)
filter.matcher = bloombits.NewMatcher(size, filters)
filter.begin = begin
@ -99,20 +100,21 @@ func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Addres
// NewBlockFilter creates a new filter which directly inspects the contents of
// a block to figure out whether it is interesting or not.
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash, isEth bool) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(backend, addresses, topics)
filter := newFilter(backend, addresses, topics, isEth)
filter.block = block
return filter
}
// newFilter creates a generic filter that can either filter based on a block hash,
// or based on range queries. The search criteria needs to be explicitly set.
func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter {
func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash, isEth bool) *Filter {
return &Filter{
backend: backend,
addresses: addresses,
topics: topics,
isEth: isEth,
}
}
@ -247,7 +249,7 @@ func (f *Filter) blockLogs(ctx context.Context, header *block.Header) (logs []*t
// match the filter criteria. This function is called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(ctx context.Context, header *block.Header) (logs []*types.Log, err error) {
// Get the logs of the block
logsList, err := f.backend.GetLogs(ctx, header.Hash())
logsList, err := f.backend.GetLogs(ctx, header.Hash(), f.isEth)
if err != nil {
return nil, err
}

@ -104,6 +104,7 @@ type EventSystem struct {
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
isEth bool
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@ -112,7 +113,7 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
func NewEventSystem(backend Backend, lightMode bool, isEth bool) *EventSystem {
m := &EventSystem{
mux: backend.EventMux(),
backend: backend,
@ -123,6 +124,7 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
isEth: isEth,
}
// Subscribe events
@ -406,7 +408,7 @@ func (es *EventSystem) lightFilterLogs(header *block.Header, addresses []common.
// Get the logs of the block
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
logsList, err := es.backend.GetLogs(ctx, header.Hash())
logsList, err := es.backend.GetLogs(ctx, header.Hash(), es.isEth)
if err != nil {
return nil
}

@ -206,6 +206,7 @@ func ComputeAndMutateEPOSStatus(
utils.Logger().Info().
Str("threshold", measure.String()).
Interface("computed", computed).
Str("validator", snapshot.Validator.Address.String()).
Msg("validator failed availability threshold, set to inactive")
default:
// Default is no-op so validator who wants

Loading…
Cancel
Save