[PAN-1339] Send local transactions to new peers (#1253)

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
S. Matthew English 6 years ago committed by GitHub
parent d2d1d82df7
commit 4b88458e84
  1. 2
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/PeerTransactionTracker.java
  2. 17
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPool.java
  3. 4
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolFactory.java
  4. 73
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionPoolTest.java
  5. 14
      ethereum/jsonrpc/src/integration-test/java/tech/pegasys/pantheon/ethereum/jsonrpc/methods/EthGetFilterChangesIntegrationTest.java

@ -26,7 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
class PeerTransactionTracker implements DisconnectCallback {
public class PeerTransactionTracker implements DisconnectCallback {
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 10_000;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();

@ -25,6 +25,8 @@ import tech.pegasys.pantheon.ethereum.core.Account;
import tech.pegasys.pantheon.ethereum.core.AccountFilter;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
@ -57,18 +59,31 @@ public class TransactionPool implements BlockAddedObserver {
private final TransactionBatchAddedListener transactionBatchAddedListener;
private final SyncState syncState;
private Optional<AccountFilter> accountFilter = Optional.empty();
private final PeerTransactionTracker peerTransactionTracker;
public TransactionPool(
final PendingTransactions pendingTransactions,
final ProtocolSchedule<?> protocolSchedule,
final ProtocolContext<?> protocolContext,
final TransactionBatchAddedListener transactionBatchAddedListener,
final SyncState syncState) {
final SyncState syncState,
final EthContext ethContext,
final PeerTransactionTracker peerTransactionTracker) {
this.pendingTransactions = pendingTransactions;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.transactionBatchAddedListener = transactionBatchAddedListener;
this.syncState = syncState;
this.peerTransactionTracker = peerTransactionTracker;
ethContext.getEthPeers().subscribeConnect(this::handleConnect);
}
private void handleConnect(final EthPeer peer) {
List<Transaction> localTransactions = getLocalTransactions();
for (Transaction transaction : localTransactions) {
peerTransactionTracker.addToPeerSendQueue(peer, transaction);
}
}
public List<Transaction> getLocalTransactions() {

@ -44,7 +44,9 @@ public class TransactionPoolFactory {
protocolSchedule,
protocolContext,
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext),
syncState);
syncState,
ethContext,
transactionTracker);
final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(

@ -48,6 +48,11 @@ import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.TransactionTestFixture;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
@ -59,7 +64,9 @@ import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.uint.UInt256;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
@ -91,6 +98,9 @@ public class TransactionPoolTest {
private TransactionPool transactionPool;
private long genesisBlockGasLimit;
private final AccountFilter accountFilter = mock(AccountFilter.class);
private SyncState syncState;
private EthContext ethContext;
private PeerTransactionTracker peerTransactionTracker;
@Before
public void setUp() {
@ -98,12 +108,21 @@ public class TransactionPoolTest {
when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(protocolSpec);
when(protocolSpec.getTransactionValidator()).thenReturn(transactionValidator);
genesisBlockGasLimit = executionContext.getGenesis().getHeader().getGasLimit();
SyncState syncState = mock(SyncState.class);
syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
peerTransactionTracker = mock(PeerTransactionTracker.class);
transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
blockchain.observeBlockAdded(transactionPool);
}
@ -434,7 +453,13 @@ public class TransactionPoolTest {
when(syncState.isInSync(anyLong())).thenReturn(false);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
@ -462,12 +487,6 @@ public class TransactionPoolTest {
@Test
public void shouldAllowRemoteTransactionsWhenInSync() {
SyncState syncState = mock(SyncState.class);
when(syncState.isInSync(anyLong())).thenReturn(true);
TransactionPool transactionPool =
new TransactionPool(
transactions, protocolSchedule, protocolContext, batchAddedListener, syncState);
final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transaction2 = builder.nonce(2).createTransaction(KEY_PAIR1);
@ -491,6 +510,40 @@ public class TransactionPoolTest {
assertTransactionPending(transaction3);
}
@Test
public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() {
EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();
EthContext ethContext = ethProtocolManager.ethContext();
PeerTransactionTracker peerTransactionTracker = new PeerTransactionTracker();
TransactionPool transactionPool =
new TransactionPool(
transactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transactionLocal = builder.nonce(1).createTransaction(KEY_PAIR1);
final Transaction transactionRemote = builder.nonce(2).createTransaction(KEY_PAIR1);
when(transactionValidator.validate(any(Transaction.class))).thenReturn(valid());
when(transactionValidator.validateForSender(
any(Transaction.class), nullable(Account.class), eq(true)))
.thenReturn(valid());
transactionPool.addLocalTransaction(transactionLocal);
transactionPool.addRemoteTransactions(Collections.singletonList(transactionRemote));
RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager);
Set<Transaction> transactionsToSendToPeer =
peerTransactionTracker.claimTransactionsToSendToPeer(peer.getEthPeer());
assertThat(transactionsToSendToPeer).containsExactly(transactionLocal);
}
private void assertTransactionPending(final Transaction t) {
assertThat(transactions.getTransactionByHash(t.hash())).contains(t);
}

@ -17,6 +17,7 @@ import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
@ -30,7 +31,10 @@ import tech.pegasys.pantheon.ethereum.core.ExecutionContextTestFixture;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.core.Wei;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.transactions.PeerTransactionTracker;
import tech.pegasys.pantheon.ethereum.eth.transactions.PendingTransactions;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool;
import tech.pegasys.pantheon.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
@ -85,13 +89,21 @@ public class EthGetFilterChangesIntegrationTest {
final ExecutionContextTestFixture executionContext = ExecutionContextTestFixture.create();
blockchain = executionContext.getBlockchain();
final ProtocolContext<Void> protocolContext = executionContext.getProtocolContext();
PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
EthContext ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
transactionPool =
new TransactionPool(
transactions,
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener,
syncState);
syncState,
ethContext,
peerTransactionTracker);
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive());
filterManager =

Loading…
Cancel
Save