commit
897b6cd119
@ -1,2 +1 @@ |
||||
protoc -I ./ message.proto --go_out=plugins=grpc:./ |
||||
# protoc -I ./ message.proto --go_out=./ |
||||
protoc -I ./ message.proto --go_out=. --go-grpc_out=. |
||||
|
@ -1,3 +1,3 @@ |
||||
package message |
||||
|
||||
//go:generate protoc message.proto --go_out=plugins=grpc:.
|
||||
//go:generate protoc message.proto --go_out=. --go-grpc_out=.
|
||||
|
@ -0,0 +1,101 @@ |
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
|
||||
package message |
||||
|
||||
import ( |
||||
context "context" |
||||
grpc "google.golang.org/grpc" |
||||
codes "google.golang.org/grpc/codes" |
||||
status "google.golang.org/grpc/status" |
||||
) |
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7 |
||||
|
||||
// ClientServiceClient is the client API for ClientService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type ClientServiceClient interface { |
||||
Process(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Response, error) |
||||
} |
||||
|
||||
type clientServiceClient struct { |
||||
cc grpc.ClientConnInterface |
||||
} |
||||
|
||||
func NewClientServiceClient(cc grpc.ClientConnInterface) ClientServiceClient { |
||||
return &clientServiceClient{cc} |
||||
} |
||||
|
||||
func (c *clientServiceClient) Process(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Response, error) { |
||||
out := new(Response) |
||||
err := c.cc.Invoke(ctx, "/message.ClientService/Process", in, out, opts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
// ClientServiceServer is the server API for ClientService service.
|
||||
// All implementations must embed UnimplementedClientServiceServer
|
||||
// for forward compatibility
|
||||
type ClientServiceServer interface { |
||||
Process(context.Context, *Message) (*Response, error) |
||||
mustEmbedUnimplementedClientServiceServer() |
||||
} |
||||
|
||||
// UnimplementedClientServiceServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedClientServiceServer struct { |
||||
} |
||||
|
||||
func (UnimplementedClientServiceServer) Process(context.Context, *Message) (*Response, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method Process not implemented") |
||||
} |
||||
func (UnimplementedClientServiceServer) mustEmbedUnimplementedClientServiceServer() {} |
||||
|
||||
// UnsafeClientServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ClientServiceServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeClientServiceServer interface { |
||||
mustEmbedUnimplementedClientServiceServer() |
||||
} |
||||
|
||||
func RegisterClientServiceServer(s grpc.ServiceRegistrar, srv ClientServiceServer) { |
||||
s.RegisterService(&ClientService_ServiceDesc, srv) |
||||
} |
||||
|
||||
func _ClientService_Process_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(Message) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(ClientServiceServer).Process(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: "/message.ClientService/Process", |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(ClientServiceServer).Process(ctx, req.(*Message)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
// ClientService_ServiceDesc is the grpc.ServiceDesc for ClientService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var ClientService_ServiceDesc = grpc.ServiceDesc{ |
||||
ServiceName: "message.ClientService", |
||||
HandlerType: (*ClientServiceServer)(nil), |
||||
Methods: []grpc.MethodDesc{ |
||||
{ |
||||
MethodName: "Process", |
||||
Handler: _ClientService_Process_Handler, |
||||
}, |
||||
}, |
||||
Streams: []grpc.StreamDesc{}, |
||||
Metadata: "message.proto", |
||||
} |
@ -1,2 +1,5 @@ |
||||
# used versions |
||||
#go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26 |
||||
#go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1 |
||||
SRC_DIR=$(dirname $0) |
||||
protoc -I ${SRC_DIR}/proto/ ${SRC_DIR}/proto/downloader.proto --go_out=plugins=grpc:${SRC_DIR}/proto |
||||
protoc -I ${SRC_DIR}/proto/ ${SRC_DIR}/proto/downloader.proto --go_out=${SRC_DIR}/proto --go-grpc_out=${SRC_DIR}/proto |
||||
|
@ -1,3 +1,4 @@ |
||||
package downloader |
||||
|
||||
//go:generate protoc downloader.proto --go_out=plugins=grpc:.
|
||||
///go:generate protoc downloader.proto --go_out=plugins=grpc:.
|
||||
//go:generate protoc downloader.proto --go_out=. --go-grpc_out=.
|
||||
|
@ -0,0 +1,101 @@ |
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
|
||||
package downloader |
||||
|
||||
import ( |
||||
context "context" |
||||
grpc "google.golang.org/grpc" |
||||
codes "google.golang.org/grpc/codes" |
||||
status "google.golang.org/grpc/status" |
||||
) |
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7 |
||||
|
||||
// DownloaderClient is the client API for Downloader service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type DownloaderClient interface { |
||||
Query(ctx context.Context, in *DownloaderRequest, opts ...grpc.CallOption) (*DownloaderResponse, error) |
||||
} |
||||
|
||||
type downloaderClient struct { |
||||
cc grpc.ClientConnInterface |
||||
} |
||||
|
||||
func NewDownloaderClient(cc grpc.ClientConnInterface) DownloaderClient { |
||||
return &downloaderClient{cc} |
||||
} |
||||
|
||||
func (c *downloaderClient) Query(ctx context.Context, in *DownloaderRequest, opts ...grpc.CallOption) (*DownloaderResponse, error) { |
||||
out := new(DownloaderResponse) |
||||
err := c.cc.Invoke(ctx, "/downloader.Downloader/Query", in, out, opts...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return out, nil |
||||
} |
||||
|
||||
// DownloaderServer is the server API for Downloader service.
|
||||
// All implementations must embed UnimplementedDownloaderServer
|
||||
// for forward compatibility
|
||||
type DownloaderServer interface { |
||||
Query(context.Context, *DownloaderRequest) (*DownloaderResponse, error) |
||||
mustEmbedUnimplementedDownloaderServer() |
||||
} |
||||
|
||||
// UnimplementedDownloaderServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedDownloaderServer struct { |
||||
} |
||||
|
||||
func (UnimplementedDownloaderServer) Query(context.Context, *DownloaderRequest) (*DownloaderResponse, error) { |
||||
return nil, status.Errorf(codes.Unimplemented, "method Query not implemented") |
||||
} |
||||
func (UnimplementedDownloaderServer) mustEmbedUnimplementedDownloaderServer() {} |
||||
|
||||
// UnsafeDownloaderServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to DownloaderServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeDownloaderServer interface { |
||||
mustEmbedUnimplementedDownloaderServer() |
||||
} |
||||
|
||||
func RegisterDownloaderServer(s grpc.ServiceRegistrar, srv DownloaderServer) { |
||||
s.RegisterService(&Downloader_ServiceDesc, srv) |
||||
} |
||||
|
||||
func _Downloader_Query_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { |
||||
in := new(DownloaderRequest) |
||||
if err := dec(in); err != nil { |
||||
return nil, err |
||||
} |
||||
if interceptor == nil { |
||||
return srv.(DownloaderServer).Query(ctx, in) |
||||
} |
||||
info := &grpc.UnaryServerInfo{ |
||||
Server: srv, |
||||
FullMethod: "/downloader.Downloader/Query", |
||||
} |
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) { |
||||
return srv.(DownloaderServer).Query(ctx, req.(*DownloaderRequest)) |
||||
} |
||||
return interceptor(ctx, in, info, handler) |
||||
} |
||||
|
||||
// Downloader_ServiceDesc is the grpc.ServiceDesc for Downloader service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var Downloader_ServiceDesc = grpc.ServiceDesc{ |
||||
ServiceName: "downloader.Downloader", |
||||
HandlerType: (*DownloaderServer)(nil), |
||||
Methods: []grpc.MethodDesc{ |
||||
{ |
||||
MethodName: "Query", |
||||
Handler: _Downloader_Query_Handler, |
||||
}, |
||||
}, |
||||
Streams: []grpc.StreamDesc{}, |
||||
Metadata: "downloader.proto", |
||||
} |
@ -0,0 +1,418 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math/big" |
||||
"os" |
||||
"time" |
||||
|
||||
lru "github.com/hashicorp/golang-lru" |
||||
|
||||
"github.com/spf13/cobra" |
||||
|
||||
"github.com/ethereum/go-ethereum/common" |
||||
"github.com/ethereum/go-ethereum/common/hexutil" |
||||
ethRawDB "github.com/ethereum/go-ethereum/core/rawdb" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/ethereum/go-ethereum/params" |
||||
"github.com/harmony-one/harmony/block" |
||||
"github.com/harmony-one/harmony/core/rawdb" |
||||
"github.com/harmony-one/harmony/core/state" |
||||
"github.com/harmony-one/harmony/core/types" |
||||
"github.com/harmony-one/harmony/hmy" |
||||
"github.com/harmony-one/harmony/internal/cli" |
||||
|
||||
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" |
||||
shardingconfig "github.com/harmony-one/harmony/internal/configs/sharding" |
||||
) |
||||
|
||||
var batchFlag = cli.IntFlag{ |
||||
Name: "batch", |
||||
Shorthand: "b", |
||||
Usage: "batch size limit in MB", |
||||
DefValue: 512, |
||||
} |
||||
|
||||
var dumpDBCmd = &cobra.Command{ |
||||
Use: "dumpdb srcdb destdb [startKey [endKey [firstStateStartKey [firstStateEndKey]", |
||||
Short: "dump a snapshot db.", |
||||
Long: "dump a snapshot db.", |
||||
Example: "harmony dumpdb /srcDir/harmony_db_0 /destDir/harmony_db_0", |
||||
Args: cobra.RangeArgs(2, 6), |
||||
Run: func(cmd *cobra.Command, args []string) { |
||||
srcDBDir, destDBDir := args[0], args[1] |
||||
var batchLimitMB int |
||||
var startKey []byte |
||||
var endKey []byte |
||||
var firstStateStartKey []byte |
||||
var firstStateEndKey []byte |
||||
if len(args) > 2 { |
||||
_startKey, err := hexutil.Decode(args[2]) |
||||
if err != nil { |
||||
fmt.Println("invalid startKey:", err) |
||||
os.Exit(-1) |
||||
} |
||||
startKey = _startKey |
||||
} |
||||
if len(args) > 3 { |
||||
_endKey, err := hexutil.Decode(args[3]) |
||||
if err != nil { |
||||
fmt.Println("invalid endKey:", err) |
||||
os.Exit(-1) |
||||
} |
||||
endKey = _endKey |
||||
} |
||||
if len(args) > 4 { |
||||
_startKey, err := hexutil.Decode(args[4]) |
||||
if err != nil { |
||||
fmt.Println("invalid stateStartKey:", err) |
||||
os.Exit(-1) |
||||
} |
||||
firstStateStartKey = _startKey |
||||
} |
||||
if len(args) > 5 { |
||||
_endKey, err := hexutil.Decode(args[5]) |
||||
if err != nil { |
||||
fmt.Println("invalid stateEndKey:", err) |
||||
os.Exit(-1) |
||||
} |
||||
firstStateEndKey = _endKey |
||||
} |
||||
batchLimitMB = cli.GetIntFlagValue(cmd, batchFlag) |
||||
networkType := getNetworkType(cmd) |
||||
shardSchedule = getShardSchedule(networkType) |
||||
if shardSchedule == nil { |
||||
fmt.Println("unsupported network type") |
||||
os.Exit(-1) |
||||
} |
||||
fmt.Println(srcDBDir, destDBDir, batchLimitMB, hexutil.Encode(startKey), hexutil.Encode(endKey), hexutil.Encode(firstStateStartKey), hexutil.Encode(firstStateEndKey)) |
||||
dumpMain(srcDBDir, destDBDir, batchLimitMB*MB, startKey, endKey, firstStateStartKey, firstStateEndKey) |
||||
os.Exit(0) |
||||
}, |
||||
} |
||||
|
||||
func getShardSchedule(networkType nodeconfig.NetworkType) shardingconfig.Schedule { |
||||
switch networkType { |
||||
case nodeconfig.Mainnet: |
||||
return shardingconfig.MainnetSchedule |
||||
case nodeconfig.Testnet: |
||||
return shardingconfig.TestnetSchedule |
||||
case nodeconfig.Pangaea: |
||||
return shardingconfig.PangaeaSchedule |
||||
case nodeconfig.Localnet: |
||||
return shardingconfig.LocalnetSchedule |
||||
case nodeconfig.Partner: |
||||
return shardingconfig.PartnerSchedule |
||||
case nodeconfig.Stressnet: |
||||
return shardingconfig.StressNetSchedule |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func registerDumpDBFlags() error { |
||||
return cli.RegisterFlags(dumpDBCmd, []cli.Flag{batchFlag, networkTypeFlag}) |
||||
} |
||||
|
||||
type KakashiDB struct { |
||||
ethdb.Database |
||||
toDB ethdb.Database |
||||
toDBBatch ethdb.Batch |
||||
batchLimit int |
||||
cache *lru.Cache |
||||
} |
||||
|
||||
const ( |
||||
MB = 1024 * 1024 |
||||
BLOCKS_DUMP = 512 // must >= 256
|
||||
EPOCHS_DUMP = 10 |
||||
STATEDB_CACHE_SIZE = 64 // size in MB
|
||||
LEVELDB_CACHE_SIZE = 256 |
||||
LEVELDB_HANDLES = 1024 |
||||
LRU_CACHE_SIZE = 64 * 1024 * 1024 |
||||
) |
||||
|
||||
const ( |
||||
NONE = iota |
||||
ON_ACCOUNT_START |
||||
ON_ACCOUNT_STATE |
||||
ON_ACCOUNT_END |
||||
) |
||||
|
||||
var ( |
||||
totalSize = 0 // current dump size
|
||||
printSize = 0 // last print dump size
|
||||
flushSize = 0 // size flushed into db
|
||||
accountCount = 0 // number of accounts
|
||||
lastAccount = state.DumpAccount{ |
||||
Address: &common.Address{}, |
||||
} |
||||
savedStateKey hexutil.Bytes |
||||
accountState = NONE |
||||
emptyHash = common.Hash{} |
||||
shardSchedule shardingconfig.Schedule // init by cli flag
|
||||
) |
||||
|
||||
func dumpPrint(prefix string, showAccount bool) { |
||||
if totalSize-printSize > MB || showAccount { |
||||
now := time.Now().Unix() |
||||
fmt.Println(now, prefix, accountCount, totalSize, printSize/MB, flushSize/MB) |
||||
if showAccount { |
||||
fmt.Println("account:", lastAccount.Address.Hex(), lastAccount.Balance, len(lastAccount.Code), accountState, lastAccount.SecureKey.String(), savedStateKey.String()) |
||||
} |
||||
printSize = totalSize |
||||
} |
||||
} |
||||
|
||||
func (db *KakashiDB) Get(key []byte) ([]byte, error) { |
||||
value, err := db.Database.Get(key) |
||||
if exist, _ := db.cache.ContainsOrAdd(string(key), nil); !exist { |
||||
db.copyKV(key, value) |
||||
} |
||||
return value, err |
||||
} |
||||
func (db *KakashiDB) Put(key []byte, value []byte) error { |
||||
return nil |
||||
} |
||||
|
||||
// Delete removes the key from the key-value data store.
|
||||
func (db *KakashiDB) Delete(key []byte) error { |
||||
return nil |
||||
} |
||||
|
||||
// copy key,value to toDB
|
||||
func (db *KakashiDB) copyKV(key, value []byte) { |
||||
db.toDBBatch.Put(key, value) |
||||
totalSize += len(key) + len(value) |
||||
dumpPrint("copyKV", false) |
||||
} |
||||
|
||||
func (db *KakashiDB) flush() { |
||||
dumpPrint("KakashiDB batch writhing", true) |
||||
db.toDBBatch.Write() |
||||
db.toDBBatch.Reset() |
||||
flushSize = totalSize |
||||
dumpPrint("KakashiDB flushed", false) |
||||
} |
||||
|
||||
func (db *KakashiDB) Close() error { |
||||
db.toDBBatch.Reset() // drop dirty cache
|
||||
fmt.Println("KakashiDB Close") |
||||
db.toDB.Close() |
||||
return db.Database.Close() |
||||
} |
||||
|
||||
func (db *KakashiDB) OnRoot(common.Hash) {} |
||||
|
||||
// OnAccount implements DumpCollector interface
|
||||
func (db *KakashiDB) OnAccountStart(addr common.Address, acc state.DumpAccount) { |
||||
accountState = ON_ACCOUNT_START |
||||
lastAccount = acc |
||||
lastAccount.Address = &addr |
||||
} |
||||
|
||||
// OnAccount implements DumpCollector interface
|
||||
func (db *KakashiDB) OnAccountState(addr common.Address, StateSecureKey hexutil.Bytes, key, value []byte) { |
||||
accountState = ON_ACCOUNT_STATE |
||||
if totalSize-flushSize > int(db.batchLimit) { |
||||
savedStateKey = StateSecureKey |
||||
db.flush() |
||||
} |
||||
} |
||||
|
||||
// OnAccount implements DumpCollector interface
|
||||
func (db *KakashiDB) OnAccountEnd(addr common.Address, acc state.DumpAccount) { |
||||
accountCount++ |
||||
accountState = ON_ACCOUNT_END |
||||
if totalSize-flushSize > int(db.batchLimit) { |
||||
db.flush() |
||||
} |
||||
} |
||||
|
||||
func (db *KakashiDB) getHashByNumber(number uint64) common.Hash { |
||||
hash := rawdb.ReadCanonicalHash(db, number) |
||||
return hash |
||||
} |
||||
func (db *KakashiDB) GetHeaderByNumber(number uint64) *block.Header { |
||||
hash := db.getHashByNumber(number) |
||||
if hash == (common.Hash{}) { |
||||
return nil |
||||
} |
||||
return db.GetHeader(hash, number) |
||||
} |
||||
|
||||
func (db *KakashiDB) GetHeader(hash common.Hash, number uint64) *block.Header { |
||||
header := rawdb.ReadHeader(db, hash, number) |
||||
return header |
||||
} |
||||
|
||||
func (db *KakashiDB) GetHeaderByHash(hash common.Hash) *block.Header { |
||||
number := rawdb.ReadHeaderNumber(db, hash) |
||||
return rawdb.ReadHeader(db, hash, *number) |
||||
} |
||||
|
||||
// GetBlock retrieves a block from the database by hash and number
|
||||
func (db *KakashiDB) GetBlock(hash common.Hash, number uint64) *types.Block { |
||||
block := rawdb.ReadBlock(db, hash, number) |
||||
return block |
||||
} |
||||
|
||||
// GetBlockNumber retrieves the block number belonging to the given hash
|
||||
// from the database
|
||||
func (db *KakashiDB) GetBlockNumber(hash common.Hash) *uint64 { |
||||
return rawdb.ReadHeaderNumber(db, hash) |
||||
} |
||||
|
||||
// GetBlockByHash retrieves a block from the database by hash
|
||||
func (db *KakashiDB) GetBlockByHash(hash common.Hash) *types.Block { |
||||
number := db.GetBlockNumber(hash) |
||||
return db.GetBlock(hash, *number) |
||||
} |
||||
|
||||
// GetBlockByNumber retrieves a block from the database by number
|
||||
func (db *KakashiDB) GetBlockByNumber(number uint64) *types.Block { |
||||
hash := rawdb.ReadCanonicalHash(db, number) |
||||
return db.GetBlock(hash, number) |
||||
} |
||||
|
||||
func (db *KakashiDB) indexerDataDump(block *types.Block) { |
||||
fmt.Println("indexerDataDump:") |
||||
bloomIndexer := hmy.NewBloomIndexer(db, params.BloomBitsBlocks, params.BloomConfirms) |
||||
bloomIndexer.Close() // just stop event loop
|
||||
section, blkno, blkhash := bloomIndexer.Sections() |
||||
bloomIndexer.AddCheckpoint(section-1, blkhash) |
||||
for i := blkno; i <= block.NumberU64(); i++ { |
||||
db.GetHeaderByNumber(i) |
||||
} |
||||
db.flush() |
||||
} |
||||
|
||||
func (db *KakashiDB) offchainDataDump(block *types.Block) { |
||||
fmt.Println("offchainDataDump:") |
||||
rawdb.WriteHeadBlockHash(db.toDBBatch, block.Hash()) |
||||
rawdb.WriteHeadHeaderHash(db.toDBBatch, block.Hash()) |
||||
db.GetHeaderByNumber(0) |
||||
db.GetBlockByNumber(0) |
||||
db.GetHeaderByHash(block.Hash()) |
||||
// EVM may access the last 256 block hash
|
||||
for i := 0; i <= BLOCKS_DUMP; i++ { |
||||
if block.NumberU64() < uint64(i) { |
||||
break |
||||
} |
||||
latestNumber := block.NumberU64() - uint64(i) |
||||
latestBlock := db.GetBlockByNumber(latestNumber) |
||||
db.GetBlockByHash(latestBlock.Hash()) |
||||
db.GetHeaderByHash(latestBlock.Hash()) |
||||
db.GetBlockByHash(latestBlock.Hash()) |
||||
rawdb.ReadBlockRewardAccumulator(db, latestNumber) |
||||
rawdb.ReadBlockCommitSig(db, latestNumber) |
||||
epoch := block.Epoch() |
||||
epochInstance := shardSchedule.InstanceForEpoch(epoch) |
||||
for shard := 0; shard < int(epochInstance.NumShards()); shard++ { |
||||
rawdb.ReadCrossLinkShardBlock(db, uint32(shard), latestNumber) |
||||
} |
||||
} |
||||
headEpoch := block.Epoch() |
||||
epochInstance := shardSchedule.InstanceForEpoch(headEpoch) |
||||
for shard := 0; shard < int(epochInstance.NumShards()); shard++ { |
||||
rawdb.ReadShardLastCrossLink(db, uint32(shard)) |
||||
} |
||||
|
||||
rawdb.IteratorValidatorStats(db, func(it ethdb.Iterator, addr common.Address) bool { |
||||
db.copyKV(it.Key(), it.Value()) |
||||
return true |
||||
}) |
||||
rawdb.ReadPendingCrossLinks(db) |
||||
|
||||
rawdb.IteratorDelegatorDelegations(db, func(it ethdb.Iterator, delegator common.Address) bool { |
||||
db.copyKV(it.Key(), it.Value()) |
||||
return true |
||||
}) |
||||
for i := 0; i < EPOCHS_DUMP; i++ { |
||||
epoch := new(big.Int).Sub(headEpoch, big.NewInt(int64(i))) |
||||
if epoch.Sign() < 0 { |
||||
break |
||||
} |
||||
rawdb.ReadShardState(db, epoch) |
||||
rawdb.ReadEpochBlockNumber(db, epoch) |
||||
rawdb.ReadEpochVrfBlockNums(db, epoch) |
||||
rawdb.ReadEpochVdfBlockNum(db, epoch) |
||||
var validators []common.Address |
||||
rawdb.IteratorValidatorSnapshot(db, func(addr common.Address, _epoch *big.Int) bool { |
||||
if _epoch.Cmp(epoch) == 0 { |
||||
validator, err := rawdb.ReadValidatorSnapshot(db, addr, epoch) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
validators = append(validators, validator.Validator.Address) |
||||
} |
||||
return true |
||||
}) |
||||
if i == 0 { |
||||
rawdb.ReadValidatorList(db) |
||||
} |
||||
} |
||||
|
||||
rawdb.IteratorCXReceiptsProofSpent(db, func(it ethdb.Iterator, shardID uint32, number uint64) bool { |
||||
db.copyKV(it.Key(), it.Value()) |
||||
return true |
||||
}) |
||||
db.flush() |
||||
} |
||||
|
||||
func (db *KakashiDB) stateDataDump(block *types.Block, startKey, endKey, firstStateStartKey, firstStateEndKey []byte) { |
||||
fmt.Println("stateDataDump:") |
||||
stateDB0 := state.NewDatabaseWithCache(db, STATEDB_CACHE_SIZE) |
||||
rootHash := block.Root() |
||||
stateDB, err := state.New(rootHash, stateDB0) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
config := new(state.DumpConfig) |
||||
config.Start = startKey |
||||
config.End = endKey |
||||
config.StateStart = firstStateStartKey |
||||
config.StateEnd = firstStateEndKey |
||||
stateDB.DumpToCollector(db, config) |
||||
db.flush() |
||||
} |
||||
|
||||
func dumpMain(srcDBDir, destDBDir string, batchLimit int, startKey, endKey, firstStateStartKey, firstStateEndKey []byte) { |
||||
fmt.Println("===dumpMain===") |
||||
srcDB, err := ethRawDB.NewLevelDBDatabase(srcDBDir, LEVELDB_CACHE_SIZE, LEVELDB_HANDLES, "") |
||||
if err != nil { |
||||
fmt.Println("open src db error:", err) |
||||
os.Exit(-1) |
||||
} |
||||
destDB, err := ethRawDB.NewLevelDBDatabase(destDBDir, LEVELDB_CACHE_SIZE, LEVELDB_HANDLES, "") |
||||
if err != nil { |
||||
fmt.Println("open dest db error:", err) |
||||
os.Exit(-1) |
||||
} |
||||
|
||||
headHash := rawdb.ReadHeadBlockHash(srcDB) |
||||
headNumber := *rawdb.ReadHeaderNumber(srcDB, headHash) |
||||
fmt.Println("head-block:", headNumber, headHash.Hex()) |
||||
|
||||
if headHash == emptyHash { |
||||
fmt.Println("empty head block hash") |
||||
os.Exit(-1) |
||||
} |
||||
block := rawdb.ReadBlock(srcDB, headHash, headNumber) |
||||
if block == nil { |
||||
fmt.Println("ReadBlock error:") |
||||
os.Exit(-1) |
||||
} |
||||
fmt.Println("start copying...") |
||||
cache, _ := lru.New(LRU_CACHE_SIZE) |
||||
copier := &KakashiDB{ |
||||
Database: srcDB, |
||||
toDB: destDB, |
||||
toDBBatch: destDB.NewBatch(), |
||||
batchLimit: batchLimit, |
||||
cache: cache, |
||||
} |
||||
defer copier.Close() |
||||
copier.offchainDataDump(block) |
||||
copier.indexerDataDump(block) |
||||
copier.stateDataDump(block, startKey, endKey, firstStateStartKey, firstStateEndKey) |
||||
} |
@ -0,0 +1,37 @@ |
||||
package leveldb_shard |
||||
|
||||
import ( |
||||
"hash/crc32" |
||||
"sync" |
||||
"sync/atomic" |
||||
) |
||||
|
||||
func mapDBIndex(key []byte, dbCount uint32) uint32 { |
||||
return crc32.ChecksumIEEE(key) % dbCount |
||||
} |
||||
|
||||
func parallelRunAndReturnErr(parallelNum int, cb func(index int) error) error { |
||||
wg := sync.WaitGroup{} |
||||
errAtomic := atomic.Value{} |
||||
|
||||
for i := 0; i < parallelNum; i++ { |
||||
wg.Add(1) |
||||
|
||||
go func(i int) { |
||||
defer wg.Done() |
||||
|
||||
err := cb(i) |
||||
if err != nil { |
||||
errAtomic.Store(err) |
||||
} |
||||
}(i) |
||||
} |
||||
|
||||
wg.Wait() |
||||
|
||||
if err := errAtomic.Load(); err != nil { |
||||
return errAtomic.Load().(error) |
||||
} else { |
||||
return nil |
||||
} |
||||
} |
@ -0,0 +1,200 @@ |
||||
package leveldb_shard |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"path/filepath" |
||||
"strings" |
||||
"sync" |
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/syndtr/goleveldb/leveldb" |
||||
"github.com/syndtr/goleveldb/leveldb/comparer" |
||||
"github.com/syndtr/goleveldb/leveldb/filter" |
||||
"github.com/syndtr/goleveldb/leveldb/iterator" |
||||
"github.com/syndtr/goleveldb/leveldb/opt" |
||||
"github.com/syndtr/goleveldb/leveldb/util" |
||||
) |
||||
|
||||
type LeveldbShard struct { |
||||
dbs []*leveldb.DB |
||||
dbCount uint32 |
||||
} |
||||
|
||||
var shardIdxKey = []byte("__DB_SHARED_INDEX__") |
||||
|
||||
func NewLeveldbShard(savePath string, diskCount int, diskShards int) (shard *LeveldbShard, err error) { |
||||
shard = &LeveldbShard{ |
||||
dbs: make([]*leveldb.DB, diskCount*diskShards), |
||||
dbCount: uint32(diskCount * diskShards), |
||||
} |
||||
|
||||
// clean when error
|
||||
defer func() { |
||||
if err != nil { |
||||
for _, db := range shard.dbs { |
||||
if db != nil { |
||||
_ = db.Close() |
||||
} |
||||
} |
||||
|
||||
shard = nil |
||||
} |
||||
}() |
||||
|
||||
levelDBOptions := &opt.Options{ |
||||
OpenFilesCacheCapacity: 128, |
||||
WriteBuffer: 8 << 20, //8MB, max memory occupyv = 8*2*diskCount*diskShards
|
||||
BlockCacheCapacity: 16 << 20, //16MB
|
||||
Filter: filter.NewBloomFilter(8), |
||||
DisableSeeksCompaction: true, |
||||
} |
||||
|
||||
// async open
|
||||
wg := sync.WaitGroup{} |
||||
for i := 0; i < diskCount; i++ { |
||||
for j := 0; j < diskShards; j++ { |
||||
shardPath := filepath.Join(savePath, fmt.Sprintf("disk%02d", i), fmt.Sprintf("block%02d", j)) |
||||
dbIndex := i*diskShards + j |
||||
wg.Add(1) |
||||
go func() { |
||||
defer wg.Done() |
||||
|
||||
ldb, openErr := leveldb.OpenFile(shardPath, levelDBOptions) |
||||
if openErr != nil { |
||||
err = openErr |
||||
return |
||||
} |
||||
|
||||
indexByte := make([]byte, 8) |
||||
binary.BigEndian.PutUint64(indexByte, uint64(dbIndex)) |
||||
inDBIndex, getErr := ldb.Get(shardIdxKey, nil) |
||||
if getErr != nil { |
||||
if getErr == leveldb.ErrNotFound { |
||||
putErr := ldb.Put(shardIdxKey, indexByte, nil) |
||||
if putErr != nil { |
||||
err = putErr |
||||
return |
||||
} |
||||
} else { |
||||
err = getErr |
||||
return |
||||
} |
||||
} else if bytes.Compare(indexByte, inDBIndex) != 0 { |
||||
err = fmt.Errorf("db shard index error, need %v, got %v", indexByte, inDBIndex) |
||||
return |
||||
} |
||||
|
||||
shard.dbs[dbIndex] = ldb |
||||
}() |
||||
} |
||||
} |
||||
|
||||
wg.Wait() |
||||
|
||||
return shard, err |
||||
} |
||||
|
||||
func (l *LeveldbShard) mapDB(key []byte) *leveldb.DB { |
||||
return l.dbs[mapDBIndex(key, l.dbCount)] |
||||
} |
||||
|
||||
// Has retrieves if a key is present in the key-value data store.
|
||||
func (l *LeveldbShard) Has(key []byte) (bool, error) { |
||||
return l.mapDB(key).Has(key, nil) |
||||
} |
||||
|
||||
// Get retrieves the given key if it's present in the key-value data store.
|
||||
func (l *LeveldbShard) Get(key []byte) ([]byte, error) { |
||||
return l.mapDB(key).Get(key, nil) |
||||
} |
||||
|
||||
// Put inserts the given value into the key-value data store.
|
||||
func (l *LeveldbShard) Put(key []byte, value []byte) error { |
||||
return l.mapDB(key).Put(key, value, nil) |
||||
} |
||||
|
||||
// Delete removes the key from the key-value data store.
|
||||
func (l *LeveldbShard) Delete(key []byte) error { |
||||
return l.mapDB(key).Delete(key, nil) |
||||
} |
||||
|
||||
// NewBatch creates a write-only database that buffers changes to its host db
|
||||
// until a final write is called.
|
||||
func (l *LeveldbShard) NewBatch() ethdb.Batch { |
||||
return NewLeveldbShardBatch(l) |
||||
} |
||||
|
||||
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
|
||||
// contained within the key-value database.
|
||||
func (l *LeveldbShard) NewIterator() ethdb.Iterator { |
||||
return l.iterator(nil) |
||||
} |
||||
|
||||
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
|
||||
// database content starting at a particular initial key (or after, if it does
|
||||
// not exist).
|
||||
func (l *LeveldbShard) NewIteratorWithStart(start []byte) ethdb.Iterator { |
||||
return l.iterator(&util.Range{Start: start}) |
||||
} |
||||
|
||||
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
|
||||
// of database content with a particular key prefix.
|
||||
func (l *LeveldbShard) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { |
||||
return l.iterator(util.BytesPrefix(prefix)) |
||||
} |
||||
|
||||
func (l *LeveldbShard) iterator(slice *util.Range) ethdb.Iterator { |
||||
iters := make([]iterator.Iterator, l.dbCount) |
||||
|
||||
for i, db := range l.dbs { |
||||
iter := db.NewIterator(slice, nil) |
||||
iters[i] = iter |
||||
} |
||||
|
||||
return iterator.NewMergedIterator(iters, comparer.DefaultComparer, true) |
||||
} |
||||
|
||||
// Stat returns a particular internal stat of the database.
|
||||
func (l *LeveldbShard) Stat(property string) (string, error) { |
||||
sb := strings.Builder{} |
||||
|
||||
for i, db := range l.dbs { |
||||
getProperty, err := db.GetProperty(property) |
||||
if err != nil { |
||||
return "", err |
||||
} |
||||
|
||||
sb.WriteString(fmt.Sprintf("=== shard %02d ===\n", i)) |
||||
sb.WriteString(getProperty) |
||||
sb.WriteString("\n") |
||||
} |
||||
|
||||
return sb.String(), nil |
||||
} |
||||
|
||||
// Compact flattens the underlying data store for the given key range. In essence,
|
||||
// deleted and overwritten versions are discarded, and the data is rearranged to
|
||||
// reduce the cost of operations needed to access them.
|
||||
//
|
||||
// A nil start is treated as a key before all keys in the data store; a nil limit
|
||||
// is treated as a key after all keys in the data store. If both is nil then it
|
||||
// will compact entire data store.
|
||||
func (l *LeveldbShard) Compact(start []byte, limit []byte) (err error) { |
||||
return parallelRunAndReturnErr(int(l.dbCount), func(i int) error { |
||||
return l.dbs[i].CompactRange(util.Range{Start: start, Limit: limit}) |
||||
}) |
||||
} |
||||
|
||||
// Close all the DB
|
||||
func (l *LeveldbShard) Close() error { |
||||
for _, db := range l.dbs { |
||||
err := db.Close() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,119 @@ |
||||
package leveldb_shard |
||||
|
||||
import ( |
||||
"runtime" |
||||
"sync" |
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/syndtr/goleveldb/leveldb" |
||||
) |
||||
|
||||
var batchesPool = sync.Pool{ |
||||
New: func() interface{} { |
||||
return &leveldb.Batch{} |
||||
}, |
||||
} |
||||
|
||||
type LeveldbShardBatch struct { |
||||
shard *LeveldbShard |
||||
batches []*leveldb.Batch |
||||
batchesCount uint32 |
||||
} |
||||
|
||||
func NewLeveldbShardBatch(shard *LeveldbShard) *LeveldbShardBatch { |
||||
shardBatch := &LeveldbShardBatch{ |
||||
batches: make([]*leveldb.Batch, shard.dbCount), |
||||
batchesCount: shard.dbCount, |
||||
shard: shard, |
||||
} |
||||
|
||||
for i := uint32(0); i < shard.dbCount; i++ { |
||||
shardBatch.batches[i] = batchesPool.Get().(*leveldb.Batch) |
||||
} |
||||
|
||||
runtime.SetFinalizer(shardBatch, func(o *LeveldbShardBatch) { |
||||
for _, batch := range o.batches { |
||||
batch.Reset() |
||||
batchesPool.Put(batch) |
||||
} |
||||
|
||||
o.batches = nil |
||||
}) |
||||
|
||||
return shardBatch |
||||
} |
||||
|
||||
func (l *LeveldbShardBatch) mapBatch(key []byte) *leveldb.Batch { |
||||
return l.batches[mapDBIndex(key, l.batchesCount)] |
||||
} |
||||
|
||||
// Put inserts the given value into the key-value data store.
|
||||
func (l *LeveldbShardBatch) Put(key []byte, value []byte) error { |
||||
l.mapBatch(key).Put(key, value) |
||||
return nil |
||||
} |
||||
|
||||
// Delete removes the key from the key-value data store.
|
||||
func (l *LeveldbShardBatch) Delete(key []byte) error { |
||||
l.mapBatch(key).Delete(key) |
||||
return nil |
||||
} |
||||
|
||||
// ValueSize retrieves the amount of data queued up for writing.
|
||||
func (l *LeveldbShardBatch) ValueSize() int { |
||||
size := 0 |
||||
for _, batch := range l.batches { |
||||
size += batch.Len() |
||||
} |
||||
return size |
||||
} |
||||
|
||||
// Write flushes any accumulated data to disk.
|
||||
func (l *LeveldbShardBatch) Write() (err error) { |
||||
return parallelRunAndReturnErr(int(l.batchesCount), func(i int) error { |
||||
return l.shard.dbs[i].Write(l.batches[i], nil) |
||||
}) |
||||
} |
||||
|
||||
// Reset resets the batch for reuse.
|
||||
func (l *LeveldbShardBatch) Reset() { |
||||
for _, batch := range l.batches { |
||||
batch.Reset() |
||||
} |
||||
} |
||||
|
||||
// Replay replays the batch contents.
|
||||
func (l *LeveldbShardBatch) Replay(w ethdb.KeyValueWriter) error { |
||||
for _, batch := range l.batches { |
||||
err := batch.Replay(&replayer{writer: w}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// replayer is a small wrapper to implement the correct replay methods.
|
||||
type replayer struct { |
||||
writer ethdb.KeyValueWriter |
||||
failure error |
||||
} |
||||
|
||||
// Put inserts the given value into the key-value data store.
|
||||
func (r *replayer) Put(key, value []byte) { |
||||
// If the replay already failed, stop executing ops
|
||||
if r.failure != nil { |
||||
return |
||||
} |
||||
r.failure = r.writer.Put(key, value) |
||||
} |
||||
|
||||
// Delete removes the key from the key-value data store.
|
||||
func (r *replayer) Delete(key []byte) { |
||||
// If the replay already failed, stop executing ops
|
||||
if r.failure != nil { |
||||
return |
||||
} |
||||
r.failure = r.writer.Delete(key) |
||||
} |
@ -0,0 +1,22 @@ |
||||
package local_cache |
||||
|
||||
import ( |
||||
"reflect" |
||||
"unsafe" |
||||
) |
||||
|
||||
func String(b []byte) (s string) { |
||||
if len(b) == 0 { |
||||
return "" |
||||
} |
||||
return *(*string)(unsafe.Pointer(&b)) |
||||
} |
||||
|
||||
func StringBytes(s string) []byte { |
||||
var b []byte |
||||
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) |
||||
hdr.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data |
||||
hdr.Cap = len(s) |
||||
hdr.Len = len(s) |
||||
return b |
||||
} |
@ -0,0 +1,83 @@ |
||||
package local_cache |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
) |
||||
|
||||
type LocalCacheBatch struct { |
||||
db *LocalCacheDatabase |
||||
lock sync.Mutex |
||||
|
||||
size int |
||||
batchWriteKey [][]byte |
||||
batchWriteValue [][]byte |
||||
batchDeleteKey [][]byte |
||||
} |
||||
|
||||
func newLocalCacheBatch(db *LocalCacheDatabase) *LocalCacheBatch { |
||||
return &LocalCacheBatch{db: db} |
||||
} |
||||
|
||||
func (b *LocalCacheBatch) Put(key []byte, value []byte) error { |
||||
b.lock.Lock() |
||||
defer b.lock.Unlock() |
||||
|
||||
b.batchWriteKey = append(b.batchWriteKey, key) |
||||
b.batchWriteValue = append(b.batchWriteValue, value) |
||||
b.size += len(key) + len(value) |
||||
return nil |
||||
} |
||||
|
||||
func (b *LocalCacheBatch) Delete(key []byte) error { |
||||
b.lock.Lock() |
||||
defer b.lock.Unlock() |
||||
|
||||
b.batchDeleteKey = append(b.batchDeleteKey, key) |
||||
b.size += len(key) |
||||
return nil |
||||
} |
||||
|
||||
func (b *LocalCacheBatch) ValueSize() int { |
||||
return b.size |
||||
} |
||||
|
||||
func (b *LocalCacheBatch) Write() error { |
||||
b.lock.Lock() |
||||
defer b.lock.Unlock() |
||||
|
||||
return b.db.batchWrite(b) |
||||
} |
||||
|
||||
func (b *LocalCacheBatch) Reset() { |
||||
b.lock.Lock() |
||||
defer b.lock.Unlock() |
||||
|
||||
b.batchWriteKey = b.batchWriteKey[:0] |
||||
b.batchWriteValue = b.batchWriteValue[:0] |
||||
b.batchDeleteKey = b.batchDeleteKey[:0] |
||||
b.size = 0 |
||||
} |
||||
|
||||
func (b *LocalCacheBatch) Replay(w ethdb.KeyValueWriter) error { |
||||
if len(b.batchWriteKey) > 0 { |
||||
for i, key := range b.batchWriteKey { |
||||
err := w.Put(key, b.batchWriteValue[i]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
if len(b.batchDeleteKey) > 0 { |
||||
for _, key := range b.batchDeleteKey { |
||||
err := w.Delete(key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,121 @@ |
||||
package local_cache |
||||
|
||||
import ( |
||||
"bytes" |
||||
"time" |
||||
|
||||
"github.com/allegro/bigcache" |
||||
"github.com/ethereum/go-ethereum/ethdb" |
||||
"github.com/harmony-one/harmony/internal/utils" |
||||
) |
||||
|
||||
type cacheWrapper struct { |
||||
*bigcache.BigCache |
||||
} |
||||
|
||||
type CacheConfig struct { |
||||
CacheTime time.Duration |
||||
CacheSize int |
||||
} |
||||
|
||||
func (c *cacheWrapper) Put(key []byte, value []byte) error { |
||||
return c.BigCache.Set(String(key), value) |
||||
} |
||||
|
||||
func (c *cacheWrapper) Delete(key []byte) error { |
||||
return c.BigCache.Delete(String(key)) |
||||
} |
||||
|
||||
type LocalCacheDatabase struct { |
||||
ethdb.KeyValueStore |
||||
|
||||
enableReadCache bool |
||||
|
||||
deleteMap map[string]bool |
||||
readCache *cacheWrapper |
||||
} |
||||
|
||||
func NewLocalCacheDatabase(remoteDB ethdb.KeyValueStore, cacheConfig CacheConfig) *LocalCacheDatabase { |
||||
config := bigcache.DefaultConfig(cacheConfig.CacheTime) |
||||
config.HardMaxCacheSize = cacheConfig.CacheSize |
||||
config.MaxEntriesInWindow = cacheConfig.CacheSize * 4 * int(cacheConfig.CacheTime.Seconds()) |
||||
cache, _ := bigcache.NewBigCache(config) |
||||
|
||||
db := &LocalCacheDatabase{ |
||||
KeyValueStore: remoteDB, |
||||
|
||||
enableReadCache: true, |
||||
deleteMap: make(map[string]bool), |
||||
readCache: &cacheWrapper{cache}, |
||||
} |
||||
|
||||
go func() { |
||||
for range time.Tick(time.Minute) { |
||||
utils.Logger().Info(). |
||||
Interface("stats", cache.Stats()). |
||||
Int("count", cache.Len()). |
||||
Int("size", cache.Capacity()). |
||||
Msg("local-cache stats") |
||||
} |
||||
}() |
||||
|
||||
return db |
||||
} |
||||
|
||||
func (c *LocalCacheDatabase) Has(key []byte) (bool, error) { |
||||
return c.KeyValueStore.Has(key) |
||||
} |
||||
|
||||
func (c *LocalCacheDatabase) Get(key []byte) (ret []byte, err error) { |
||||
if c.enableReadCache { |
||||
if bytes.Compare(key, []byte("LastBlock")) != 0 { |
||||
strKey := String(key) |
||||
ret, err = c.readCache.Get(strKey) |
||||
if err == nil { |
||||
return ret, nil |
||||
} |
||||
|
||||
defer func() { |
||||
if err == nil { |
||||
_ = c.readCache.Set(strKey, ret) |
||||
} |
||||
}() |
||||
} |
||||
} |
||||
|
||||
return c.KeyValueStore.Get(key) |
||||
} |
||||
|
||||
func (c *LocalCacheDatabase) Put(key []byte, value []byte) error { |
||||
if c.enableReadCache { |
||||
_ = c.readCache.Put(key, value) |
||||
} |
||||
|
||||
return c.KeyValueStore.Put(key, value) |
||||
} |
||||
|
||||
func (c *LocalCacheDatabase) Delete(key []byte) error { |
||||
if c.enableReadCache { |
||||
_ = c.readCache.Delete(key) |
||||
} |
||||
|
||||
return c.KeyValueStore.Delete(key) |
||||
} |
||||
|
||||
func (c *LocalCacheDatabase) NewBatch() ethdb.Batch { |
||||
return newLocalCacheBatch(c) |
||||
} |
||||
|
||||
func (c *LocalCacheDatabase) batchWrite(b *LocalCacheBatch) error { |
||||
if c.enableReadCache { |
||||
_ = b.Replay(c.readCache) |
||||
} |
||||
|
||||
batch := c.KeyValueStore.NewBatch() |
||||
err := b.Replay(batch) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return batch.Write() |
||||
} |
@ -0,0 +1,38 @@ |
||||
//package p2p
|
||||
package p2p |
||||
|
||||
import ( |
||||
eth_metrics "github.com/ethereum/go-ethereum/metrics" |
||||
"github.com/libp2p/go-libp2p-core/metrics" |
||||
) |
||||
|
||||
const ( |
||||
// ingressMeterName is the prefix of the per-packet inbound metrics.
|
||||
ingressMeterName = "p2p/ingress" |
||||
|
||||
// egressMeterName is the prefix of the per-packet outbound metrics.
|
||||
egressMeterName = "p2p/egress" |
||||
) |
||||
|
||||
var ( |
||||
ingressTrafficMeter = eth_metrics.NewRegisteredMeter(ingressMeterName, nil) |
||||
egressTrafficMeter = eth_metrics.NewRegisteredMeter(egressMeterName, nil) |
||||
) |
||||
|
||||
// Counter is a wrapper around a metrics.BandwidthCounter that meters both the
|
||||
// inbound and outbound network traffic.
|
||||
type Counter struct { |
||||
*metrics.BandwidthCounter |
||||
} |
||||
|
||||
func newCounter() *Counter { |
||||
return &Counter{metrics.NewBandwidthCounter()} |
||||
} |
||||
|
||||
func (c *Counter) LogRecvMessage(size int64) { |
||||
ingressTrafficMeter.Mark(size) |
||||
} |
||||
|
||||
func (c *Counter) LogSentMessage(size int64) { |
||||
egressTrafficMeter.Mark(size) |
||||
} |
@ -0,0 +1,40 @@ |
||||
package streammanager |
||||
|
||||
import ( |
||||
"container/list" |
||||
"time" |
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer" |
||||
|
||||
"github.com/whyrusleeping/timecache" |
||||
) |
||||
|
||||
const ( |
||||
coolDownPeriod = 1 * time.Minute |
||||
) |
||||
|
||||
type coolDownCache struct { |
||||
timeCache *timecache.TimeCache |
||||
} |
||||
|
||||
func newCoolDownCache() *coolDownCache { |
||||
tl := timecache.NewTimeCache(coolDownPeriod) |
||||
return &coolDownCache{ |
||||
timeCache: tl, |
||||
} |
||||
} |
||||
|
||||
// Has check and add the peer ID to the cache
|
||||
func (cache *coolDownCache) Has(id peer.ID) bool { |
||||
has := cache.timeCache.Has(string(id)) |
||||
if !has { |
||||
cache.timeCache.Add(string(id)) |
||||
} |
||||
return has |
||||
} |
||||
|
||||
// Reset the cool down cache
|
||||
func (cache *coolDownCache) Reset() { |
||||
cache.timeCache.Q = list.New() |
||||
cache.timeCache.M = make(map[string]time.Time) |
||||
} |
@ -0,0 +1,29 @@ |
||||
version: "2" |
||||
|
||||
services: |
||||
online-node: |
||||
build: |
||||
context: . |
||||
dockerfile: Dockerfile |
||||
privileged: true |
||||
ports: |
||||
- "8080:9700" |
||||
environment: |
||||
- "MODE=online" |
||||
- "NETWORK=testnet" |
||||
volumes: |
||||
- "./data:/root/data/" |
||||
labels: |
||||
service_group: rosetta |
||||
offline-node: |
||||
build: |
||||
context: . |
||||
dockerfile: Dockerfile |
||||
privileged: true |
||||
ports: |
||||
- "8081:9700" |
||||
environment: |
||||
- "MODE=offline" |
||||
- "NETWORK=testnet" |
||||
labels: |
||||
service_group: rosetta |
@ -0,0 +1,29 @@ |
||||
version: "2" |
||||
|
||||
services: |
||||
online-node: |
||||
build: |
||||
context: . |
||||
dockerfile: Dockerfile |
||||
privileged: true |
||||
ports: |
||||
- "8080:9700" |
||||
environment: |
||||
- "MODE=online" |
||||
- "NETWORK=mainnet-22816573" |
||||
volumes: |
||||
- "./data:/root/data/" |
||||
labels: |
||||
service_group: rosetta |
||||
offline-node: |
||||
build: |
||||
context: . |
||||
dockerfile: Dockerfile |
||||
privileged: true |
||||
ports: |
||||
- "8081:9700" |
||||
environment: |
||||
- "MODE=offline" |
||||
- "NETWORK=mainnet" |
||||
labels: |
||||
service_group: rosetta |
@ -0,0 +1,8 @@ |
||||
[release] |
||||
type = s3 |
||||
provider = AWS |
||||
env_auth = false |
||||
region = us-west-1 |
||||
acl = public-read |
||||
server_side_encryption = AES256 |
||||
storage_class = REDUCED_REDUNDANCY |
@ -1,12 +1,38 @@ |
||||
#!/usr/bin/env bash |
||||
set -e |
||||
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" |
||||
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)" |
||||
DATA="$DIR/data" |
||||
LOGS="$DATA/logs" |
||||
BASE_ARGS=(--http.ip "0.0.0.0" --ws.ip "0.0.0.0" --http.rosetta --node_type "explorer" --datadir "$DATA" --log.dir "$LOGS") |
||||
DATA_NAME="${DATA_NAME:=harmony_sharddb_0}" |
||||
|
||||
MAINNET_22816573_SNAPSHOT="release:pub.harmony.one/mainnet.min.22816573/harmony_sharddb_0" |
||||
|
||||
case "$NETWORK" in |
||||
mainnet) |
||||
CONFIG_PATH="-c /root/harmony-mainnet.conf" |
||||
;; |
||||
mainnet-22816573) |
||||
CONFIG_PATH="-c /root/harmony-mainnet.conf" |
||||
rclone -P -L sync $MAINNET_22816573_SNAPSHOT $DATA/$DATA_NAME --transfers=64 |
||||
;; |
||||
testnet) |
||||
CONFIG_PATH="-c /root/harmony-pstn.conf" |
||||
;; |
||||
*) |
||||
echo "unknown network" |
||||
exit 1 |
||||
;; |
||||
esac |
||||
|
||||
if [ "$MODE" = "offline" ]; then |
||||
BASE_ARGS=(--datadir "$DATA" --log.dir "$LOGS" --run.offline) |
||||
else |
||||
BASE_ARGS=(--datadir "$DATA" --log.dir "$LOGS") |
||||
fi |
||||
|
||||
mkdir -p "$LOGS" |
||||
echo -e NODE ARGS: \" "$@" "${BASE_ARGS[@]}" \" |
||||
echo "NODE VERSION: $(./harmony --version)" |
||||
"$DIR/harmony" "$@" "${BASE_ARGS[@]}" |
||||
echo -e NODE ARGS: \" $CONFIG_PATH "$@" "${BASE_ARGS[@]}" \" |
||||
echo "NODE VERSION: $($DIR/harmony --version)" |
||||
|
||||
"$DIR/harmony" $CONFIG_PATH "$@" "${BASE_ARGS[@]}" |
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue