|
|
|
@ -43,7 +43,7 @@ import com.fasterxml.jackson.annotation.JsonGetter; |
|
|
|
|
import org.apache.logging.log4j.LogManager; |
|
|
|
|
import org.apache.logging.log4j.Logger; |
|
|
|
|
|
|
|
|
|
public class TransactionLogsIndexer { |
|
|
|
|
public class TransactionLogBloomCacher { |
|
|
|
|
|
|
|
|
|
private static final Logger LOG = LogManager.getLogger(); |
|
|
|
|
|
|
|
|
@ -59,9 +59,9 @@ public class TransactionLogsIndexer { |
|
|
|
|
|
|
|
|
|
private final Path cacheDir; |
|
|
|
|
|
|
|
|
|
private final IndexingStatus indexingStatus = new IndexingStatus(); |
|
|
|
|
private final CachingStatus cachingStatus = new CachingStatus(); |
|
|
|
|
|
|
|
|
|
public TransactionLogsIndexer( |
|
|
|
|
public TransactionLogBloomCacher( |
|
|
|
|
final Blockchain blockchain, final Path cacheDir, final EthScheduler scheduler) { |
|
|
|
|
this.blockchain = blockchain; |
|
|
|
|
this.cacheDir = cacheDir; |
|
|
|
@ -69,7 +69,7 @@ public class TransactionLogsIndexer { |
|
|
|
|
this.cachedSegments = new TreeMap<>(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void indexAll() { |
|
|
|
|
public void cacheAll() { |
|
|
|
|
ensurePreviousSegmentsArePresent(blockchain.getChainHeadBlockNumber()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -81,24 +81,24 @@ public class TransactionLogsIndexer { |
|
|
|
|
return calculateCacheFileName(Long.toString(blockNumber / BLOCKS_PER_BLOOM_CACHE), cacheDir); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public IndexingStatus generateLogBloomCache(final long start, final long stop) { |
|
|
|
|
public CachingStatus generateLogBloomCache(final long start, final long stop) { |
|
|
|
|
checkArgument( |
|
|
|
|
start % BLOCKS_PER_BLOOM_CACHE == 0, "Start block must be at the beginning of a file"); |
|
|
|
|
try { |
|
|
|
|
indexingStatus.indexing = true; |
|
|
|
|
cachingStatus.caching = true; |
|
|
|
|
LOG.info( |
|
|
|
|
"Generating transaction log indexes from block {} to block {} in {}", |
|
|
|
|
"Generating transaction log bloom cache from block {} to block {} in {}", |
|
|
|
|
start, |
|
|
|
|
stop, |
|
|
|
|
cacheDir); |
|
|
|
|
if (!Files.isDirectory(cacheDir) && !cacheDir.toFile().mkdirs()) { |
|
|
|
|
LOG.error("Cache directory '{}' does not exist and could not be made.", cacheDir); |
|
|
|
|
return indexingStatus; |
|
|
|
|
return cachingStatus; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
final File pendingFile = calculateCacheFileName(PENDING, cacheDir); |
|
|
|
|
for (long blockNum = start; blockNum < stop; blockNum += BLOCKS_PER_BLOOM_CACHE) { |
|
|
|
|
LOG.info("Indexing segment at {}", blockNum); |
|
|
|
|
LOG.info("Caching segment at {}", blockNum); |
|
|
|
|
try (final FileOutputStream fos = new FileOutputStream(pendingFile)) { |
|
|
|
|
final long blockCount = fillCacheFile(blockNum, blockNum + BLOCKS_PER_BLOOM_CACHE, fos); |
|
|
|
|
if (blockCount == BLOCKS_PER_BLOOM_CACHE) { |
|
|
|
@ -114,12 +114,12 @@ public class TransactionLogsIndexer { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} catch (final Exception e) { |
|
|
|
|
LOG.error("Unhandled indexing exception", e); |
|
|
|
|
LOG.error("Unhandled caching exception", e); |
|
|
|
|
} finally { |
|
|
|
|
indexingStatus.indexing = false; |
|
|
|
|
LOG.info("Indexing request complete"); |
|
|
|
|
cachingStatus.caching = false; |
|
|
|
|
LOG.info("Caching request complete"); |
|
|
|
|
} |
|
|
|
|
return indexingStatus; |
|
|
|
|
return cachingStatus; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private long fillCacheFile( |
|
|
|
@ -131,7 +131,7 @@ public class TransactionLogsIndexer { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
fillCacheFileWithBlock(maybeHeader.get(), fos); |
|
|
|
|
indexingStatus.currentBlock = blockNum; |
|
|
|
|
cachingStatus.currentBlock = blockNum; |
|
|
|
|
blockNum++; |
|
|
|
|
} |
|
|
|
|
return blockNum - startBlock; |
|
|
|
@ -152,12 +152,12 @@ public class TransactionLogsIndexer { |
|
|
|
|
writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); |
|
|
|
|
} |
|
|
|
|
} catch (IOException e) { |
|
|
|
|
LOG.error("Unhandled indexing exception.", e); |
|
|
|
|
LOG.error("Unhandled caching exception.", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void ensurePreviousSegmentsArePresent(final long blockNumber) { |
|
|
|
|
if (!indexingStatus.isIndexing()) { |
|
|
|
|
if (!cachingStatus.isCaching()) { |
|
|
|
|
scheduler.scheduleFutureTask( |
|
|
|
|
() -> { |
|
|
|
|
long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1; |
|
|
|
@ -188,15 +188,15 @@ public class TransactionLogsIndexer { |
|
|
|
|
return logs; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) { |
|
|
|
|
public CachingStatus requestCaching(final long fromBlock, final long toBlock) { |
|
|
|
|
boolean requestAccepted = false; |
|
|
|
|
try { |
|
|
|
|
if ((fromBlock < toBlock) && submissionLock.tryLock(100, TimeUnit.MILLISECONDS)) { |
|
|
|
|
try { |
|
|
|
|
if (!indexingStatus.indexing) { |
|
|
|
|
if (!cachingStatus.caching) { |
|
|
|
|
requestAccepted = true; |
|
|
|
|
indexingStatus.startBlock = fromBlock; |
|
|
|
|
indexingStatus.endBlock = toBlock; |
|
|
|
|
cachingStatus.startBlock = fromBlock; |
|
|
|
|
cachingStatus.endBlock = toBlock; |
|
|
|
|
scheduler.scheduleComputationTask( |
|
|
|
|
() -> |
|
|
|
|
generateLogBloomCache( |
|
|
|
@ -209,8 +209,8 @@ public class TransactionLogsIndexer { |
|
|
|
|
} catch (final InterruptedException e) { |
|
|
|
|
// ignore
|
|
|
|
|
} |
|
|
|
|
indexingStatus.requestAccepted = requestAccepted; |
|
|
|
|
return indexingStatus; |
|
|
|
|
cachingStatus.requestAccepted = requestAccepted; |
|
|
|
|
return cachingStatus; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public EthScheduler getScheduler() { |
|
|
|
@ -221,11 +221,11 @@ public class TransactionLogsIndexer { |
|
|
|
|
return cacheDir; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public static final class IndexingStatus { |
|
|
|
|
public static final class CachingStatus { |
|
|
|
|
long startBlock; |
|
|
|
|
long endBlock; |
|
|
|
|
volatile long currentBlock; |
|
|
|
|
volatile boolean indexing; |
|
|
|
|
volatile boolean caching; |
|
|
|
|
boolean requestAccepted; |
|
|
|
|
|
|
|
|
|
@JsonGetter |
|
|
|
@ -244,8 +244,8 @@ public class TransactionLogsIndexer { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@JsonGetter |
|
|
|
|
public boolean isIndexing() { |
|
|
|
|
return indexing; |
|
|
|
|
public boolean isCaching() { |
|
|
|
|
return caching; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@JsonGetter |