mirror of https://github.com/hyperledger/besu
Fix ETH65 causes crashes issues (#1601)
This PR adds a waiting list for NewPooledTransactionHashesMessage in order to group several hashes into a single GetPooledTransactionsFromPeerTask Signed-off-by: Karim TAAM <karim.t2am@gmail.com> Co-authored-by: Ratan Rai Sur <ratan.r.sur@gmail.com>pull/1647/head
parent
55f7c502d3
commit
d4a330b81e
@ -0,0 +1,77 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.manager.task; |
||||
|
||||
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Queue; |
||||
|
||||
import com.google.common.collect.EvictingQueue; |
||||
import com.google.common.collect.Queues; |
||||
|
||||
@SuppressWarnings("UnstableApiUsage") |
||||
public class BufferedGetPooledTransactionsFromPeerFetcher { |
||||
|
||||
private static final int MAX_HASHES = 256; |
||||
|
||||
private final EthPeer peer; |
||||
private final PendingTransactionsMessageProcessor processor; |
||||
private final Queue<Hash> txAnnounces; |
||||
|
||||
public BufferedGetPooledTransactionsFromPeerFetcher( |
||||
final EthPeer peer, final PendingTransactionsMessageProcessor processor) { |
||||
this.peer = peer; |
||||
this.processor = processor; |
||||
this.txAnnounces = Queues.synchronizedQueue(EvictingQueue.create(MAX_PENDING_TRANSACTIONS)); |
||||
} |
||||
|
||||
public void requestTransactions() { |
||||
for (List<Hash> txAnnounces = getTxAnnounces(); |
||||
!txAnnounces.isEmpty(); |
||||
txAnnounces = getTxAnnounces()) { |
||||
final GetPooledTransactionsFromPeerTask task = |
||||
GetPooledTransactionsFromPeerTask.forHashes( |
||||
processor.getEthContext(), txAnnounces, processor.getMetricsSystem()); |
||||
task.assignPeer(peer); |
||||
processor |
||||
.getEthContext() |
||||
.getScheduler() |
||||
.scheduleSyncWorkerTask(task) |
||||
.thenAccept( |
||||
result -> processor.getTransactionPool().addRemoteTransactions(result.getResult())); |
||||
} |
||||
} |
||||
|
||||
public void addHash(final Hash hash) { |
||||
txAnnounces.add(hash); |
||||
} |
||||
|
||||
private List<Hash> getTxAnnounces() { |
||||
List<Hash> retrieved = new ArrayList<>(); |
||||
while (retrieved.size() < MAX_HASHES && !txAnnounces.isEmpty()) { |
||||
final Hash txAnnounce = txAnnounces.poll(); |
||||
if (processor.getTransactionPool().getTransactionByHash(txAnnounce).isEmpty()) { |
||||
retrieved.add(txAnnounce); |
||||
} |
||||
} |
||||
return retrieved; |
||||
} |
||||
} |
@ -0,0 +1,122 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.manager.task; |
||||
|
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.anyList; |
||||
import static org.mockito.Mockito.never; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyNoInteractions; |
||||
import static org.mockito.Mockito.verifyNoMoreInteractions; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; |
||||
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionsMessageProcessor; |
||||
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; |
||||
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.InjectMocks; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class BufferedGetPooledTransactionsFromPeerFetcherTest { |
||||
|
||||
@Mock EthPeer ethPeer; |
||||
@Mock PendingTransactionsMessageProcessor processor; |
||||
@Mock TransactionPool transactionPool; |
||||
@Mock EthContext ethContext; |
||||
@Mock EthScheduler ethScheduler; |
||||
|
||||
@InjectMocks BufferedGetPooledTransactionsFromPeerFetcher fetcher; |
||||
|
||||
private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); |
||||
|
||||
private final BlockDataGenerator generator = new BlockDataGenerator(); |
||||
|
||||
@Before |
||||
public void setup() { |
||||
when(processor.getTransactionPool()).thenReturn(transactionPool); |
||||
when(processor.getMetricsSystem()).thenReturn(metricsSystem); |
||||
when(processor.getEthContext()).thenReturn(ethContext); |
||||
when(ethContext.getScheduler()).thenReturn(ethScheduler); |
||||
} |
||||
|
||||
@Test |
||||
public void requestTransactionShouldStartTaskWhenUnknownTransaction() { |
||||
|
||||
final Hash hash = generator.transaction().getHash(); |
||||
final List<Transaction> taskResult = Collections.singletonList(Transaction.builder().build()); |
||||
final AbstractPeerTask.PeerTaskResult<List<Transaction>> peerTaskResult = |
||||
new AbstractPeerTask.PeerTaskResult<>(ethPeer, taskResult); |
||||
when(ethScheduler.scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class))) |
||||
.thenReturn(CompletableFuture.completedFuture(peerTaskResult)); |
||||
|
||||
fetcher.addHash(hash); |
||||
fetcher.requestTransactions(); |
||||
|
||||
verify(ethScheduler).scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class)); |
||||
verifyNoMoreInteractions(ethScheduler); |
||||
|
||||
verify(transactionPool, times(1)).addRemoteTransactions(taskResult); |
||||
} |
||||
|
||||
@Test |
||||
public void requestTransactionShouldSplitRequestIntoSeveralTasks() { |
||||
for (int i = 0; i < 257; i++) { |
||||
fetcher.addHash(generator.transaction().getHash()); |
||||
} |
||||
final AbstractPeerTask.PeerTaskResult<List<Transaction>> peerTaskResult = |
||||
new AbstractPeerTask.PeerTaskResult<>(ethPeer, new ArrayList<>()); |
||||
when(ethScheduler.scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class))) |
||||
.thenReturn(CompletableFuture.completedFuture(peerTaskResult)); |
||||
|
||||
fetcher.requestTransactions(); |
||||
|
||||
verify(ethScheduler, times(2)) |
||||
.scheduleSyncWorkerTask(any(GetPooledTransactionsFromPeerTask.class)); |
||||
verifyNoMoreInteractions(ethScheduler); |
||||
} |
||||
|
||||
@Test |
||||
public void requestTransactionShouldNotStartTaskWhenTransactionAlreadyInPool() { |
||||
|
||||
final Hash hash = generator.transaction().getHash(); |
||||
when(transactionPool.getTransactionByHash(hash)) |
||||
.thenReturn(Optional.of(Transaction.builder().build())); |
||||
|
||||
fetcher.addHash(hash); |
||||
fetcher.requestTransactions(); |
||||
|
||||
verifyNoInteractions(ethScheduler); |
||||
verify(transactionPool, never()).addRemoteTransactions(anyList()); |
||||
} |
||||
} |
Loading…
Reference in new issue