Log errors, mostly at warn (or fatal for UT) level

Warn level was chosen for the current behavior: Alert about uncaught
failures but do not alter the code path (yet).  More proper error
handling will come later.
pull/986/head
Eugene Kim 6 years ago
parent c52cb51396
commit a7218a999a
  1. 4
      api/client/service/client.go
  2. 9
      api/client/service/server.go
  3. 4
      api/proto/message/client.go
  4. 4
      api/proto/message/client_test.go
  5. 7
      api/proto/message/server.go
  6. 4
      api/proto/message/server_test.go
  7. 8
      api/proto/node/node_test.go
  8. 64
      api/service/explorer/service.go
  9. 21
      api/service/explorer/storage.go
  10. 4
      api/service/explorer/storage_test.go
  11. 10
      api/service/staking/service.go
  12. 10
      api/service/syncing/downloader/server.go
  13. 22
      api/service/syncing/syncing.go
  14. 6
      cmd/client/txgen/main.go
  15. 52
      cmd/client/wallet/main.go
  16. 16
      cmd/harmony/main.go
  17. 10
      consensus/consensus_leader_msg_test.go
  18. 14
      consensus/consensus_service.go
  19. 48
      consensus/consensus_v2.go

@ -33,8 +33,8 @@ func NewClient(ip, port string) (*Client, error) {
}
// Close closes the Client.
func (client *Client) Close() {
client.conn.Close()
func (client *Client) Close() error {
return client.conn.Close()
}
// GetBalance gets account balance from the client service.

@ -8,6 +8,9 @@ import (
"github.com/ethereum/go-ethereum/common"
proto "github.com/harmony-one/harmony/api/client/service/proto"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"google.golang.org/grpc"
)
@ -64,7 +67,11 @@ func (s *Server) Start(ip, port string) (*grpc.Server, error) {
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
proto.RegisterClientServiceServer(grpcServer, s)
go grpcServer.Serve(lis)
go func() {
if err := grpcServer.Serve(lis); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "grpcServer.Serve() failed")
}
}()
return grpcServer, nil
}

@ -32,8 +32,8 @@ func NewClient(ip string) *Client {
}
// Close closes the Client.
func (client *Client) Close() {
client.conn.Close()
func (client *Client) Close() error {
return client.conn.Close()
}
// Process processes message.

@ -10,7 +10,9 @@ const (
func TestClient(t *testing.T) {
s := NewServer(nil, nil, nil)
s.Start()
if _, err := s.Start(); err != nil {
t.Fatalf("cannot start server: %s", err)
}
client := NewClient(testIP)
_, err := client.Process(&Message{})

@ -10,6 +10,7 @@ import (
"net"
"github.com/ethereum/go-ethereum/crypto"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"google.golang.org/grpc"
@ -88,7 +89,11 @@ func (s *Server) Start() (*grpc.Server, error) {
var opts []grpc.ServerOption
s.server = grpc.NewServer(opts...)
RegisterClientServiceServer(s.server, s)
go s.server.Serve(lis)
go func() {
if err := s.server.Serve(lis); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "server.Serve() failed")
}
}()
return s.server, nil
}

@ -7,7 +7,9 @@ import (
func TestServerStart(t *testing.T) {
s := NewServer(nil, nil, nil)
s.Start()
if _, err := s.Start(); err != nil {
t.Fatalf("cannot start server: %s", err)
}
time.Sleep(time.Second)
s.Stop()
}

@ -83,8 +83,12 @@ func TestConstructBlocksSyncMessage(t *testing.T) {
head.GasLimit = 10000000000
head.Difficulty = params.GenesisDifficulty
statedb.Commit(false)
statedb.Database().TrieDB().Commit(root, true)
if _, err := statedb.Commit(false); err != nil {
t.Fatalf("statedb.Commit() failed: %s", err)
}
if err := statedb.Database().TrieDB().Commit(root, true); err != nil {
t.Fatalf("statedb.Database().TrieDB().Commit() failed: %s", err)
}
block1 := types.NewBlock(head, nil, nil)

@ -16,6 +16,7 @@ import (
"github.com/gorilla/mux"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p-peer"
@ -26,6 +27,12 @@ const (
explorerPortDifference = 4000
)
// HTTPError is an HTTP error.
type HTTPError struct {
Code int
Msg string
}
// Service is the struct for explorer service.
type Service struct {
router *mux.Router
@ -108,7 +115,11 @@ func (s *Service) Run() *http.Server {
// Do serving now.
utils.GetLogInstance().Info("Listening on ", "port: ", GetExplorerPort(s.Port))
server := &http.Server{Addr: addr, Handler: s.router}
go server.ListenAndServe()
go func() {
if err := server.ListenAndServe(); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "server.ListenAndServe()")
}
}()
return server
}
@ -145,14 +156,21 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
data := &Data{
Blocks: []*Block{},
}
defer func() {
if err := json.NewEncoder(w).Encode(data.Blocks); err != nil {
ctxerror.Warn(utils.WithCallerSkip(utils.GetLogInstance(), 1), err,
"cannot JSON-encode blocks")
}
}()
if from == "" {
json.NewEncoder(w).Encode(data.Blocks)
return
}
db := s.storage.GetDB()
fromInt, err := strconv.Atoi(from)
if err != nil {
json.NewEncoder(w).Encode(data.Blocks)
ctxerror.Warn(utils.GetLogger(), err, "invalid from parameter",
"from", from)
return
}
var toInt int
@ -168,7 +186,7 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
toInt, err = strconv.Atoi(to)
}
if err != nil {
json.NewEncoder(w).Encode(data.Blocks)
ctxerror.Warn(utils.GetLogger(), err, "invalid to parameter", "to", to)
return
}
@ -209,7 +227,7 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
}
data.Blocks = append(data.Blocks, block)
}
json.NewEncoder(w).Encode(data.Blocks)
return
}
// GetExplorerTransaction servers /tx end-point.
@ -218,23 +236,27 @@ func (s *Service) GetExplorerTransaction(w http.ResponseWriter, r *http.Request)
id := r.FormValue("id")
data := &Data{}
defer func() {
if err := json.NewEncoder(w).Encode(data.TX); err != nil {
ctxerror.Warn(utils.WithCallerSkip(utils.GetLogInstance(), 1), err,
"cannot JSON-encode TX")
}
}()
if id == "" {
json.NewEncoder(w).Encode(data.TX)
return
}
db := s.storage.GetDB()
bytes, err := db.Get([]byte(GetTXKey(id)))
if err != nil {
json.NewEncoder(w).Encode(data.TX)
ctxerror.Warn(utils.GetLogger(), err, "cannot read TX", "id", id)
return
}
tx := new(Transaction)
if rlp.DecodeBytes(bytes, tx) != nil {
json.NewEncoder(w).Encode(data.TX)
utils.GetLogger().Warn("cannot convert data from DB", "id", id)
return
}
data.TX = *tx
json.NewEncoder(w).Encode(data.TX)
}
// GetExplorerAddress serves /address end-point.
@ -243,20 +265,26 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
id := r.FormValue("id")
key := GetAddressKey(id)
var result interface{}
defer func() {
if err := json.NewEncoder(w).Encode(result); err != nil {
ctxerror.Warn(utils.WithCallerSkip(utils.GetLogInstance(), 1), err,
"cannot JSON-encode address")
}
}()
data := &Data{}
if id == "" {
json.NewEncoder(w).Encode(nil)
return
}
db := s.storage.GetDB()
bytes, err := db.Get([]byte(key))
if err != nil {
json.NewEncoder(w).Encode(nil)
ctxerror.Warn(utils.GetLogger(), err, "cannot read address", "id", id)
return
}
var address Address
if err = rlp.DecodeBytes(bytes, &address); err != nil {
json.NewEncoder(w).Encode(nil)
utils.GetLogger().Warn("cannot convert data from DB", "id", id)
return
}
data.Address = address
@ -270,13 +298,15 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
}
}
json.NewEncoder(w).Encode(data.Address)
result = data.Address
}
// GetExplorerNodeCount serves /nodes end-point.
func (s *Service) GetExplorerNodeCount(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(len(s.GetNodeIDs()))
if err := json.NewEncoder(w).Encode(len(s.GetNodeIDs())); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot JSON-encode node count")
}
}
// GetExplorerShard serves /shard end-point
@ -288,9 +318,9 @@ func (s *Service) GetExplorerShard(w http.ResponseWriter, r *http.Request) {
ID: libp2p_peer.IDB58Encode(nodeID),
})
}
json.NewEncoder(w).Encode(Shard{
Nodes: nodes,
})
if err := json.NewEncoder(w).Encode(Shard{Nodes: nodes}); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot JSON-encode shard info")
}
}
// NotifyService notify service

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
)
@ -91,12 +92,16 @@ func (storage *Storage) Dump(block *types.Block, height uint32) {
batch := storage.db.NewBatch()
// Update block height.
batch.Put([]byte(BlockHeightKey), []byte(strconv.Itoa(int(height))))
if err := batch.Put([]byte(BlockHeightKey), []byte(strconv.Itoa(int(height)))); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot batch block height")
}
// Store block.
blockData, err := rlp.EncodeToBytes(block)
if err == nil {
batch.Put([]byte(GetBlockKey(int(height))), blockData)
if err := batch.Put([]byte(GetBlockKey(int(height))), blockData); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot batch block data")
}
} else {
utils.GetLogInstance().Debug("Failed to serialize block ", "error", err)
}
@ -111,14 +116,18 @@ func (storage *Storage) Dump(block *types.Block, height uint32) {
storage.UpdateTXStorage(batch, explorerTransaction, tx)
storage.UpdateAddress(batch, explorerTransaction, tx)
}
batch.Write()
if err := batch.Write(); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot write batch")
}
}
// UpdateTXStorage ...
func (storage *Storage) UpdateTXStorage(batch ethdb.Batch, explorerTransaction *Transaction, tx *types.Transaction) {
if data, err := rlp.EncodeToBytes(explorerTransaction); err == nil {
key := GetTXKey(tx.Hash().Hex())
batch.Put([]byte(key), data)
if err := batch.Put([]byte(key), data); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot batch TX")
}
} else {
utils.GetLogInstance().Error("EncodeRLP transaction error")
}
@ -148,7 +157,9 @@ func (storage *Storage) UpdateAddressStorage(batch ethdb.Batch, adr string, expl
address.ID = adr
address.TXs = append(address.TXs, explorerTransaction)
if encoded, err := rlp.EncodeToBytes(address); err == nil {
batch.Put([]byte(key), encoded)
if err := batch.Put([]byte(key), encoded); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot batch address")
}
} else {
utils.GetLogInstance().Error("Can not encode address account.")
}

@ -34,7 +34,9 @@ func TestGetTXKey(t *testing.T) {
func TestInit(t *testing.T) {
ins := GetStorageInstance("1.1.1.1", "3333", true)
ins.GetDB().Put([]byte{1}, []byte{2})
if err := ins.GetDB().Put([]byte{1}, []byte{2}); err != nil {
t.Fatal("(*LDBDatabase).Put failed:", err)
}
value, err := ins.GetDB().Get([]byte{1})
assert.Equal(t, bytes.Compare(value, []byte{2}), 0, "value should be []byte{2}")
assert.Nil(t, err, "error should be nil")

@ -23,6 +23,7 @@ import (
"github.com/harmony-one/harmony/contracts"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
hmykey "github.com/harmony-one/harmony/internal/keystore"
"github.com/harmony-one/harmony/internal/utils"
contract_constants "github.com/harmony-one/harmony/internal/utils/contract"
@ -114,11 +115,12 @@ func (s *Service) DoService() {
// return
//}
if msg := s.createStakingMessage(); msg != nil {
s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msg))
utils.GetLogInstance().Info("Sent staking transaction to the network.")
} else {
if msg := s.createStakingMessage(); msg == nil {
utils.GetLogInstance().Error("Can not create staking transaction")
} else if err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msg)); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot send staking message")
} else {
utils.GetLogInstance().Info("Sent staking transaction to the network.")
}
}

@ -6,6 +6,9 @@ import (
"net"
pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"google.golang.org/grpc"
)
@ -39,7 +42,12 @@ func (s *Server) Start(ip, port string) (*grpc.Server, error) {
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
pb.RegisterDownloaderServer(grpcServer, s)
go grpcServer.Serve(lis)
go func() {
if err := grpcServer.Serve(lis); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "(*grpc.Server).Serve failed")
}
}()
s.GrpcServer = grpcServer
return grpcServer, nil

@ -2,6 +2,7 @@ package syncing
import (
"bytes"
"encoding/hex"
"fmt"
"reflect"
"sort"
@ -324,7 +325,10 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
ss.stateSyncTaskQueue = queue.New(0)
ss.syncConfig.ForEachPeer(func(configPeer *SyncPeerConfig) (brk bool) {
for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
if err := ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash}); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot add task",
"taskIndex", id, "taskBlock", hex.EncodeToString(blockHash))
}
}
brk = true
return
@ -356,7 +360,11 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if count > TimesToFail {
break
}
ss.stateSyncTaskQueue.Put(syncTask)
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot add task",
"taskIndex", syncTask.index,
"taskBlock", hex.EncodeToString(syncTask.blockHash))
}
continue
}
@ -370,7 +378,11 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
if count > TimesToFail {
break
}
ss.stateSyncTaskQueue.Put(syncTask)
if err := ss.stateSyncTaskQueue.Put(syncTask); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot add task",
"taskIndex", syncTask.index,
"taskBlock", hex.EncodeToString(syncTask.blockHash))
}
continue
}
ss.syncMux.Lock()
@ -467,7 +479,9 @@ func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChai
return false
}
ss.syncMux.Lock()
worker.UpdateCurrent()
if err := worker.UpdateCurrent(); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "(*Worker).UpdateCurrent failed")
}
ss.syncMux.Unlock()
utils.GetLogInstance().Info("[SYNC] new block added to blockchain", "blockHeight", bc.CurrentBlock().NumberU64(), "blockHex", bc.CurrentBlock().Hash().Hex())
return true

@ -13,6 +13,7 @@ import (
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/ethereum/go-ethereum/crypto"
@ -198,7 +199,10 @@ syncLoop:
if block.NumberU64()-txGen.Blockchain().CurrentBlock().NumberU64() == 1 {
txGen.AddNewBlock(block)
stateMutex.Lock()
txGen.Worker.UpdateCurrent()
if err := txGen.Worker.UpdateCurrent(); err != nil {
ctxerror.Warn(utils.GetLogger(), err,
"(*Worker).UpdateCurrent failed")
}
stateMutex.Unlock()
readySignal <- shardID
}

@ -22,6 +22,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
@ -252,7 +253,10 @@ func createWalletNode() *node.Node {
}
func processNewCommnad() {
newCommand.Parse(os.Args[2:])
if err := newCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("failed to parse flags").WithCause(err))
return
}
noPass := *newCommandNoPassPtr
password := ""
@ -290,7 +294,10 @@ func _exportAccount(account accounts.Account) {
if err != nil {
panic("Failed to write to keystore")
}
f.Close()
if err := f.Close(); err != nil {
fmt.Println(ctxerror.New("cannot close key file",
"filename", filename).WithCause(err))
}
fmt.Printf("Exported keyfile to: %v\n", filename)
return
}
@ -303,7 +310,10 @@ func _exportAccount(account accounts.Account) {
}
func processListCommand() {
listCommand.Parse(os.Args[2:])
if err := listCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("failed to parse flags").WithCause(err))
return
}
allAccounts := ks.Accounts()
for _, account := range allAccounts {
@ -313,7 +323,10 @@ func processListCommand() {
}
func processExportCommand() {
exportCommand.Parse(os.Args[2:])
if err := exportCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("failed to parse flags").WithCause(err))
return
}
acc := *exportCommandAccountPtr
allAccounts := ks.Accounts()
@ -325,7 +338,10 @@ func processExportCommand() {
}
func processImportCommnad() {
accountImportCommand.Parse(os.Args[2:])
if err := accountImportCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("failed to parse flags").WithCause(err))
return
}
priKey := *accountImportPtr
if priKey == "" {
fmt.Println("Error: --privateKey is required")
@ -349,7 +365,10 @@ func processImportCommnad() {
}
func processBalancesCommand() {
balanceCommand.Parse(os.Args[2:])
if err := balanceCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("failed to parse flags").WithCause(err))
return
}
if *balanceAddressPtr == "" {
allAccounts := ks.Accounts()
@ -370,7 +389,10 @@ func processBalancesCommand() {
}
func processGetFreeToken() {
freeTokenCommand.Parse(os.Args[2:])
if err := freeTokenCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("Failed to parse flags").WithCause(err))
return
}
if *freeTokenAddressPtr == "" {
fmt.Println("Error: --address is required")
@ -381,9 +403,8 @@ func processGetFreeToken() {
}
func processTransferCommand() {
transferCommand.Parse(os.Args[2:])
if !transferCommand.Parsed() {
fmt.Println("Failed to parse flags")
if err := transferCommand.Parse(os.Args[2:]); err != nil {
fmt.Println(ctxerror.New("Failed to parse flags").WithCause(err))
return
}
sender := *transferSenderPtr
@ -470,7 +491,10 @@ func processTransferCommand() {
return
}
submitTransaction(tx, walletNode, uint32(shardID))
if err := submitTransaction(tx, walletNode, uint32(shardID)); err != nil {
fmt.Println(ctxerror.New("submitTransaction failed",
"tx", tx, "shardID", shardID).WithCause(err))
}
}
func convertBalanceIntoReadableFormat(balance *big.Int) string {
@ -581,7 +605,11 @@ func clearKeystore() {
panic("Failed to read keystore directory")
}
for _, d := range dir {
os.RemoveAll(path.Join([]string{keystoreDir, d.Name()}...))
subdir := path.Join([]string{keystoreDir, d.Name()}...)
if err := os.RemoveAll(subdir); err != nil {
fmt.Println(ctxerror.New("cannot remove directory",
"path", subdir).WithCause(err))
}
}
fmt.Println("All existing accounts deleted...")
}

@ -19,6 +19,7 @@ import (
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/drand"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
hmykey "github.com/harmony-one/harmony/internal/keystore"
memprofiling "github.com/harmony-one/harmony/internal/memprofiling"
"github.com/harmony-one/harmony/internal/profiler"
@ -262,7 +263,10 @@ func createGlobalConfig() *nodeconfig.ConfigType {
panic("unable to new host in harmony")
}
nodeConfig.Host.AddPeer(&nodeConfig.Leader)
if err := nodeConfig.Host.AddPeer(&nodeConfig.Leader); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "(*p2p.Host).AddPeer failed",
"peer", &nodeConfig.Leader)
}
nodeConfig.DBDir = *dbDir
@ -362,7 +366,11 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node {
// This needs to be executed after consensus and drand are setup
if !*isNewNode || *shardID > -1 { // initial staking new node doesn't need to initialize shard state
currentNode.InitShardState(*shardID == -1 && !*isNewNode) // TODO: Have a better why to distinguish non-genesis node
// TODO: Have a better why to distinguish non-genesis node
if err := currentNode.InitShardState(*shardID == -1 && !*isNewNode); err != nil {
ctxerror.Crit(utils.GetLogger(), err, "InitShardState failed",
"shardID", *shardID, "isNewNode", *isNewNode)
}
}
// Set the consensus ID to be the current block number
@ -423,7 +431,9 @@ func main() {
}
go currentNode.SupportSyncing()
currentNode.ServiceManagerSetup()
currentNode.StartRPC(*port)
if err := currentNode.StartRPC(*port); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "StartRPC failed")
}
currentNode.RunServices()
currentNode.StartServer()
}

@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
protobuf "github.com/golang/protobuf/proto"
"github.com/harmony-one/harmony/api/proto"
@ -62,8 +63,13 @@ func TestConstructPreparedMessage(test *testing.T) {
message := "test string"
consensus.prepareSigs[common.Address{}] = leaderPriKey.Sign(message)
consensus.prepareSigs[common.Address{}] = validatorPriKey.Sign(message)
consensus.prepareBitmap.SetKey(leaderPubKey, true)
consensus.prepareBitmap.SetKey(validatorPubKey, true)
// According to RJ these failures are benign.
if err := consensus.prepareBitmap.SetKey(leaderPubKey, true); err != nil {
test.Log(ctxerror.New("prepareBitmap.SetKey").WithCause(err))
}
if err := consensus.prepareBitmap.SetKey(validatorPubKey, true); err != nil {
test.Log(ctxerror.New("prepareBitmap.SetKey").WithCause(err))
}
msgBytes, _ := consensus.constructPreparedMessage()
msgPayload, _ := proto.GetConsensusMessagePayload(msgBytes)

@ -61,7 +61,7 @@ func (consensus *Consensus) GetNextRnd() ([32]byte, [32]byte, error) {
func (consensus *Consensus) SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewLegacyKeccak256()
rlp.Encode(hasher, []interface{}{
if err := rlp.Encode(hasher, []interface{}{
header.ParentHash,
header.Coinbase,
header.Root,
@ -74,7 +74,9 @@ func (consensus *Consensus) SealHash(header *types.Header) (hash common.Hash) {
header.GasUsed,
header.Time,
header.Extra,
})
}); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "rlp.Encode failed")
}
hasher.Sum(hash[:0])
return hash
}
@ -442,7 +444,9 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.leader.ConsensusPubKey, consensus.ShardID)
buffer := pong.ConstructPongMessage()
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), buffer))
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), buffer)); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot send pong message")
}
}
return count2
@ -596,7 +600,9 @@ func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offse
utils.GetLogInstance().Warn("onNewView unable to setup mask for prepared message", "err", err)
return nil, nil, errors.New("unable to setup mask from payload")
}
mask.SetMask(bitmap)
if err := mask.SetMask(bitmap); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "mask.SetMask failed")
}
return &aggSig, mask, nil
}

@ -99,8 +99,12 @@ func (consensus *Consensus) tryAnnounce(block *types.Block) {
consensus.prepareSigs[consensus.SelfAddress] = consensus.priKey.SignHash(consensus.blockHash[:])
// Construct broadcast p2p message
utils.GetLogger().Warn("sent announce message", "viewID", consensus.viewID, "block", consensus.blockNum, "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum, "groupID", p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send announce message")
} else {
logger.Debug("sent announce message")
}
}
func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
@ -214,9 +218,13 @@ func (consensus *Consensus) tryPrepare(blockHash common.Hash) {
// Construct and send prepare message
msgToSend := consensus.constructPrepareMessage()
utils.GetLogger().Info("sent prepare message", "viewID", consensus.viewID, "block", consensus.blockNum)
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum)
// TODO: this will not return immediatey, may block
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send prepare message")
} else {
logger.Info("sent prepare message")
}
}
// TODO: move to consensus_leader.go later
@ -288,7 +296,10 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
utils.GetLogger().Debug("Received new prepare signature", "numReceivedSoFar", len(prepareSigs), "validatorAddress", validatorAddress, "PublicKeys", len(consensus.PublicKeys))
prepareSigs[validatorAddress] = &sign
prepareBitmap.SetKey(validatorPubKey, true) // Set the bitmap indicating that this validator signed.
// Set the bitmap indicating that this validator signed.
if err := prepareBitmap.SetKey(validatorPubKey, true); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "prepareBitmap.SetKey failed")
}
if len(prepareSigs) >= consensus.Quorum() {
consensus.switchPhase(Commit, false)
@ -307,8 +318,12 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) {
}
consensus.pbftLog.AddMessage(pbftMsg)
utils.GetLogger().Warn("sent prepared message", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum)
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send prepared message")
} else {
logger.Debug("sent prepared message")
}
// Leader add commit phase signature
blockNumHash := make([]byte, 8)
@ -410,8 +425,12 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) {
binary.LittleEndian.PutUint64(blockNumHash, consensus.blockNum)
commitPayload := append(blockNumHash, consensus.blockHash[:]...)
msgToSend := consensus.constructCommitMessage(commitPayload)
utils.GetLogger().Debug("sent commit message", "viewID", consensus.viewID, "block", consensus.blockNum)
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
logger := utils.GetLogger().New("viewID", consensus.viewID, "block", consensus.blockNum)
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(logger, err, "cannot send commit message")
} else {
logger.Debug("sent commit message")
}
consensus.switchPhase(Commit, false)
@ -502,7 +521,9 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) {
utils.GetLogger().Debug("Received new commit message", "numReceivedSoFar", len(commitSigs), "viewID", recvMsg.ViewID, "block", recvMsg.BlockNum, "validatorAddress", validatorAddress)
commitSigs[validatorAddress] = &sign
// Set the bitmap indicating that this validator signed.
commitBitmap.SetKey(validatorPubKey, true)
if err := commitBitmap.SetKey(validatorPubKey, true); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "commitBitmap.SetKey failed")
}
if len(commitSigs) >= consensus.Quorum() {
utils.GetLogger().Info("Enough commits received!", "num", len(commitSigs), "phase", consensus.phase)
@ -518,8 +539,11 @@ func (consensus *Consensus) finalizeCommits() {
msgToSend, aggSig := consensus.constructCommittedMessage()
consensus.aggregatedCommitSig = aggSig
utils.GetLogger().Warn("[Consensus]", "sent committed message", len(msgToSend))
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend))
if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil {
ctxerror.Warn(utils.GetLogger(), err, "cannot send committed message")
} else {
utils.GetLogger().Debug("sent committed message", "len", len(msgToSend))
}
var blockObj types.Block
err := rlp.DecodeBytes(consensus.block, &blockObj)

Loading…
Cancel
Save