[PIE-7] Ignore transactions from the network while behind chain head (#1228)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
S. Matthew English 6 years ago committed by GitHub
parent d3a19718a2
commit 817a7b2429
  1. 7
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java
  2. 10
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java
  3. 7
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java
  4. 4
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTest.java
  5. 11
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TestNode.java
  6. 77
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java
  7. 6
      ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java
  8. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java
  9. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java
  10. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java
  11. 3
      pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java

@ -81,9 +81,12 @@ public class SyncState {
}
public boolean isInSync() {
return isInSync(SYNC_TOLERANCE);
}
public boolean isInSync(final long syncTolerance) {
return syncTarget
.map(
t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= SYNC_TOLERANCE)
.map(t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= syncTolerance)
.orElse(true);
}

@ -25,6 +25,7 @@ import tech.pegasys.pantheon.ethereum.core.Account;
import tech.pegasys.pantheon.ethereum.core.AccountFilter;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator.TransactionInvalidReason;
@ -49,21 +50,25 @@ import org.apache.logging.log4j.Logger;
*/
public class TransactionPool implements BlockAddedObserver {
private static final Logger LOG = getLogger();
private static final long SYNC_TOLERANCE = 100L;
private final PendingTransactions pendingTransactions;
private final ProtocolSchedule<?> protocolSchedule;
private final ProtocolContext<?> protocolContext;
private final TransactionBatchAddedListener transactionBatchAddedListener;
private final SyncState syncState;
private Optional<AccountFilter> accountFilter = Optional.empty();
public TransactionPool(
final PendingTransactions pendingTransactions,
final ProtocolSchedule<?> protocolSchedule,
final ProtocolContext<?> protocolContext,
final TransactionBatchAddedListener transactionBatchAddedListener) {
final TransactionBatchAddedListener transactionBatchAddedListener,
final SyncState syncState) {
this.pendingTransactions = pendingTransactions;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.transactionBatchAddedListener = transactionBatchAddedListener;
this.syncState = syncState;
}
public List<Transaction> getLocalTransactions() {
@ -88,6 +93,9 @@ public class TransactionPool implements BlockAddedObserver {
public void addRemoteTransactions(final Collection<Transaction> transactions) {
final Set<Transaction> addedTransactions = new HashSet<>();
for (final Transaction transaction : sortByNonce(transactions)) {
if (!syncState.isInSync(SYNC_TOLERANCE)) {
return;
}
final ValidationResult<TransactionInvalidReason> validationResult =
validateTransaction(transaction);
if (validationResult.isValid()) {

@ -15,6 +15,7 @@ package tech.pegasys.pantheon.ethereum.eth.transactions;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.MetricsSystem;
@ -28,7 +29,8 @@ public class TransactionPoolFactory {
final EthContext ethContext,
final Clock clock,
final int maxPendingTransactions,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncState syncState) {
final PendingTransactions pendingTransactions =
new PendingTransactions(maxPendingTransactions, clock, metricsSystem);
@ -41,7 +43,8 @@ public class TransactionPoolFactory {
pendingTransactions,
protocolSchedule,
protocolContext,
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext));
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext),
syncState);
final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(

@ -49,6 +49,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.StatusMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPoolFactory;
import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule;
@ -1054,7 +1055,8 @@ public final class EthProtocolManagerTest {
ethManager.ethContext(),
TestClock.fixed(),
PendingTransactions.MAX_PENDING_TRANSACTIONS,
metricsSystem);
metricsSystem,
mock(SyncState.class));
// Send just a transaction message.
final PeerConnection peer = setupPeer(ethManager, (cap, msg, connection) -> {});

@ -14,6 +14,9 @@ package tech.pegasys.pantheon.ethereum.eth.transactions;
import static java.util.Collections.singletonList;
import static org.assertj.core.util.Preconditions.checkNotNull;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;
@ -29,6 +32,7 @@ import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.eth.EthereumWireProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction;
import tech.pegasys.pantheon.ethereum.p2p.NetworkRunner;
@ -139,6 +143,10 @@ public class TestNode implements Closeable {
(connection, reason, initiatedByPeer) -> disconnections.put(connection, reason));
final EthContext ethContext = ethProtocolManager.ethContext();
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
transactionPool =
TransactionPoolFactory.createTransactionPool(
protocolSchedule,
@ -146,7 +154,8 @@ public class TestNode implements Closeable {
ethContext,
TestClock.fixed(),
PendingTransactions.MAX_PENDING_TRANSACTIONS,
metricsSystem);
metricsSystem,
syncState);
networkRunner.start();
selfPeer = new DefaultPeer(id(), endpoint());

@ -48,6 +48,7 @@ import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.TransactionTestFixture;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSpec;
@ -85,21 +86,24 @@ public class TransactionPoolTest {
new PendingTransactions(MAX_TRANSACTIONS, TestClock.fixed(), metricsSystem);
private final Transaction transaction1 = createTransaction(1);
private final Transaction transaction2 = createTransaction(2);
private final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create();
private final ProtocolContext<Void> protocolContext = executionContext.getProtocolContext();
private TransactionPool transactionPool;
private long genesisBlockGasLimit;
private final AccountFilter accountFilter = mock(AccountFilter.class);
@Before
public void setUp() {
final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create();
blockchain = executionContext.getBlockchain();
final ProtocolContext<Void> protocolContext = executionContext.getProtocolContext();
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getTransactionValidator()).thenReturn(transactionValidator);
genesisBlockGasLimit = executionContext.getGenesis().getHeader().getGasLimit();
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
transactionPool =
new TransactionPool(transactions, protocolSchedule, protocolContext, batchAddedListener);
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
blockchain.observeBlockAdded(transactionPool);
}
@ -179,7 +183,7 @@ public class TransactionPoolTest {
}
@Test
public void shouldReaddTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs() {
public void shouldReadTransactionsFromThePreviousCanonicalHeadWhenAReorgOccurs() {
givenTransactionIsValid(transaction1);
givenTransactionIsValid(transaction2);
transactions.addRemoteTransaction(transaction1);
@ -205,7 +209,7 @@ public class TransactionPoolTest {
}
@Test
public void shouldNotReaddTransactionsThatAreInBothForksWhenReorgHappens() {
public void shouldNotReadTransactionsThatAreInBothForksWhenReorgHappens() {
givenTransactionIsValid(transaction1);
givenTransactionIsValid(transaction2);
transactions.addRemoteTransaction(transaction1);
@ -424,6 +428,69 @@ public class TransactionPoolTest {
assertTransactionPending(transaction1);
}
@Test
public void shouldRejectRemoteTransactionsWhenNotInSync() {
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(false);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1);
final Transaction transaction3 = builder.nonce(3).createTransaction(KEY_PAIR1);
when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction1), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction2), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction3), nullable(Account.class), eq(true)))
.thenReturn(valid());
transactionPool.addRemoteTransactions(asList(transaction3, transaction1, transaction2));
assertTransactionNotPending(transaction1);
assertTransactionNotPending(transaction2);
assertTransactionNotPending(transaction3);
verifyZeroInteractions(batchAddedListener);
}
@Test
public void shouldAllowRemoteTransactionsWhenInSync() {
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1);
final Transaction transaction3 = builder.nonce(3).createTransaction(KEY_PAIR1);
when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction1), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction2), nullable(Account.class), eq(true)))
.thenReturn(valid());
when(transactionValidator.validateForSender(
eq(transaction3), nullable(Account.class), eq(true)))
.thenReturn(valid());
transactionPool.addRemoteTransactions(asList(transaction3, transaction1, transaction2));
assertTransactionPending(transaction1);
assertTransactionPending(transaction2);
assertTransactionPending(transaction3);
}
private void assertTransactionPending(final Transaction t) {
assertThat(transactions.getTransactionByHash(t.hash())).contains(t);
}

@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
@ -29,6 +30,7 @@ import tech.pegasys.pantheon.ethereum.core.ExecutionContextTestFixture;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
@ -76,6 +78,7 @@ public class EthGetFilterChangesIntegrationTest {
private final JsonRpcParameter parameters = new JsonRpcParameter();
private FilterManager filterManager;
private EthGetFilterChanges method;
private final SyncState syncState = mock(SyncState.class);
@Before
public void setUp() {
@ -87,7 +90,8 @@ public class EthGetFilterChangesIntegrationTest {
transactions,
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener);
batchAddedListener,
syncState);
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive());
filterManager =

@ -181,7 +181,8 @@ public class CliquePantheonController implements PantheonController<CliqueContex
ethProtocolManager.ethContext(),
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final CliqueMinerExecutor miningExecutor =

@ -185,7 +185,8 @@ public class IbftLegacyPantheonController implements PantheonController<IbftCont
istanbul64ProtocolManager.ethContext(),
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);
return new IbftLegacyPantheonController(
protocolSchedule,

@ -210,7 +210,8 @@ public class IbftPantheonController implements PantheonController<IbftContext> {
ethContext,
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);
final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit());

@ -158,7 +158,8 @@ public class MainnetPantheonController implements PantheonController<Void> {
ethProtocolManager.ethContext(),
clock,
maxPendingTransactions,
metricsSystem);
metricsSystem,
syncState);
final ExecutorService minerThreadPool = Executors.newCachedThreadPool();
final EthHashMinerExecutor executor =

Loading…
Cancel
Save