@ -64,12 +64,11 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ExecutionException ;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeoutException ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.Supplier ;
import java.util.stream.Collectors ;
import java.util.stream.Stream ;
@ -664,7 +663,7 @@ public class TransactionPool implements BlockAddedObserver {
}
class SaveRestoreManager {
private final Lock diskAccessLock = new ReentrantLock ( ) ;
private final Semaphore diskAccessLock = new Semaphore ( 1 , true ) ;
private final AtomicReference < CompletableFuture < Void > > writeInProgress =
new AtomicReference < > ( CompletableFuture . completedFuture ( null ) ) ;
private final AtomicReference < CompletableFuture < Void > > readInProgress =
@ -685,25 +684,20 @@ public class TransactionPool implements BlockAddedObserver {
final AtomicReference < CompletableFuture < Void > > operationInProgress ) {
if ( configuration . getEnableSaveRestore ( ) ) {
try {
if ( diskAccessLock . tryLock ( 1 , TimeUnit . MINUTES ) ) {
try {
if ( ! operationInProgress . get ( ) . isDone ( ) ) {
isCancelled . set ( true ) ;
try {
operationInProgress . get ( ) . get ( ) ;
} catch ( ExecutionException ee ) {
// nothing to do
}
if ( diskAccessLock . tryAcquire ( 1 , TimeUnit . MINUTES ) ) {
if ( ! operationInProgress . get ( ) . isDone ( ) ) {
isCancelled . set ( true ) ;
try {
operationInProgress . get ( ) . get ( ) ;
} catch ( ExecutionException ee ) {
// nothing to do
}
isCancelled . set ( false ) ;
operationInProgress . set ( CompletableFuture . runAsync ( operation ) ) ;
return operationInProgress . get ( ) ;
} catch ( InterruptedException ie ) {
isCancelled . set ( false ) ;
} finally {
diskAccessLock . unlock ( ) ;
}
isCancelled . set ( false ) ;
operationInProgress . set (
CompletableFuture . runAsync ( operation ) . thenRun ( diskAccessLock : : release ) ) ;
return operationInProgress . get ( ) ;
} else {
CompletableFuture . failedFuture (
new TimeoutException ( "Timeout waiting for disk access lock" ) ) ;