mirror of https://github.com/hyperledger/besu
Add eth65 support (#608)
* Add eth65 support Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Fix integration tests Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Fix acceptance tests Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * add acceptance test that checks that transactions are gossiped between peers Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Update ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/LimitedNewPooledTransactionHashesMessages.java Co-Authored-By: Danno Ferrin <danno.ferrin@shemnon.com> Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * code review comments Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Code review changes Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * Reviewing diffs Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> * smaller synchronized blocks Signed-off-by: Antoine Toulme <antoine@lunar-ocean.com> Co-authored-by: Danno Ferrin <danno.ferrin@shemnon.com>pull/634/head
parent
a93d06f182
commit
b9c6c4b3cc
@ -0,0 +1,51 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.tests.acceptance.dsl.condition.txpool; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.WaitUtils; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.condition.Condition; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.transaction.txpool.TxPoolTransactions; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
public class TxPoolConditions { |
||||
|
||||
private final TxPoolTransactions txPoolTransactions; |
||||
|
||||
public TxPoolConditions(final TxPoolTransactions txPoolTransactions) { |
||||
this.txPoolTransactions = txPoolTransactions; |
||||
} |
||||
|
||||
public Condition inTransactionPool(final Hash txHash) { |
||||
return node -> |
||||
WaitUtils.waitFor( |
||||
() -> { |
||||
List<Map<String, String>> poolContents = |
||||
node.execute(txPoolTransactions.getTxPoolContents()); |
||||
boolean found = false; |
||||
for (Map<String, String> txInfo : poolContents) { |
||||
if (Hash.fromHexString(txInfo.get("hash")).equals(txHash)) { |
||||
found = true; |
||||
break; |
||||
} |
||||
} |
||||
assertThat(found).isTrue(); |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,34 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.tests.acceptance.dsl.transaction.txpool; |
||||
|
||||
import org.hyperledger.besu.tests.acceptance.dsl.transaction.NodeRequests; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.transaction.Transaction; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
public class TxPoolBesuTransaction implements Transaction<List<Map<String, String>>> { |
||||
|
||||
@Override |
||||
public List<Map<String, String>> execute(final NodeRequests node) { |
||||
try { |
||||
return node.txPool().besuTransactions().send().getResult(); |
||||
} catch (IOException e) { |
||||
throw new RuntimeException(e); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,38 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.tests.acceptance.dsl.transaction.txpool; |
||||
|
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import org.web3j.protocol.Web3jService; |
||||
import org.web3j.protocol.core.Request; |
||||
import org.web3j.protocol.core.Response; |
||||
|
||||
public class TxPoolRequestFactory { |
||||
|
||||
private final Web3jService web3jService; |
||||
|
||||
public TxPoolRequestFactory(final Web3jService web3jService) { |
||||
this.web3jService = web3jService; |
||||
} |
||||
|
||||
Request<?, TransactionInfoResponse> besuTransactions() { |
||||
return new Request<>( |
||||
"txpool_besuTransactions", null, web3jService, TransactionInfoResponse.class); |
||||
} |
||||
|
||||
static class TransactionInfoResponse extends Response<List<Map<String, String>>> {} |
||||
} |
@ -0,0 +1,22 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.tests.acceptance.dsl.transaction.txpool; |
||||
|
||||
public class TxPoolTransactions { |
||||
|
||||
public TxPoolBesuTransaction getTxPoolContents() { |
||||
return new TxPoolBesuTransaction(); |
||||
} |
||||
} |
@ -0,0 +1,48 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.tests.acceptance; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.account.Account; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.blockchain.Amount; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.node.Node; |
||||
import org.hyperledger.besu.tests.acceptance.dsl.transaction.account.TransferTransaction; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
public class GossipTransactionAcceptanceTest extends AcceptanceTestBase { |
||||
|
||||
private Node archiveNode1; |
||||
|
||||
@Before |
||||
public void setUp() throws Exception { |
||||
archiveNode1 = besu.createArchiveNode("archiveNode1"); |
||||
cluster.start(archiveNode1, besu.createArchiveNode("archiveNode2")); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldGossipATransaction() { |
||||
final Account account = accounts.createAccount("account-one"); |
||||
final Amount balance = Amount.ether(20); |
||||
|
||||
TransferTransaction tx = accountTransactions.createTransfer(account, balance); |
||||
|
||||
Hash txHash = archiveNode1.execute(tx); |
||||
|
||||
cluster.verify(txPoolConditions.inTransactionPool(txHash)); |
||||
} |
||||
} |
@ -0,0 +1,92 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.manager.task; |
||||
|
||||
import static java.util.Collections.emptyList; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.manager.PendingPeerRequest; |
||||
import org.hyperledger.besu.ethereum.eth.messages.EthPV65; |
||||
import org.hyperledger.besu.ethereum.eth.messages.PooledTransactionsMessage; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
|
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
public class GetPooledTransactionsFromPeerTask extends AbstractPeerRequestTask<List<Transaction>> { |
||||
|
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
|
||||
private final List<Hash> hashes; |
||||
|
||||
private GetPooledTransactionsFromPeerTask( |
||||
final EthContext ethContext, final List<Hash> hashes, final MetricsSystem metricsSystem) { |
||||
super(ethContext, EthPV65.GET_POOLED_TRANSACTIONS, metricsSystem); |
||||
this.hashes = new ArrayList<>(hashes); |
||||
} |
||||
|
||||
public static GetPooledTransactionsFromPeerTask forHashes( |
||||
final EthContext ethContext, final List<Hash> hashes, final MetricsSystem metricsSystem) { |
||||
return new GetPooledTransactionsFromPeerTask(ethContext, hashes, metricsSystem); |
||||
} |
||||
|
||||
@Override |
||||
protected PendingPeerRequest sendRequest() { |
||||
return sendRequestToPeer( |
||||
peer -> { |
||||
LOG.debug("Requesting {} transaction pool entries from peer {}.", hashes.size(), peer); |
||||
return peer.getPooledTransactions(hashes); |
||||
}, |
||||
0); |
||||
} |
||||
|
||||
@Override |
||||
protected Optional<List<Transaction>> processResponse( |
||||
final boolean streamClosed, final MessageData message, final EthPeer peer) { |
||||
if (streamClosed) { |
||||
// We don't record this as a useless response because it's impossible to know if a peer has
|
||||
// the data we're requesting.
|
||||
return Optional.of(emptyList()); |
||||
} |
||||
final PooledTransactionsMessage pooledTransactionsMessage = |
||||
PooledTransactionsMessage.readFrom(message); |
||||
final List<Transaction> tx = pooledTransactionsMessage.transactions(); |
||||
if (tx.size() > hashes.size()) { |
||||
// Can't be the response to our request
|
||||
return Optional.empty(); |
||||
} |
||||
return mapNodeDataByHash(tx); |
||||
} |
||||
|
||||
private Optional<List<Transaction>> mapNodeDataByHash(final List<Transaction> transactions) { |
||||
final List<Transaction> result = new ArrayList<>(); |
||||
for (final Transaction tx : transactions) { |
||||
final Hash hash = tx.getHash(); |
||||
if (!hashes.contains(hash)) { |
||||
return Optional.empty(); |
||||
} |
||||
result.add(tx); |
||||
} |
||||
return Optional.of(result); |
||||
} |
||||
} |
@ -0,0 +1,28 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
public final class EthPV65 { |
||||
|
||||
public static final int NEW_POOLED_TRANSACTION_HASHES = 0x08; |
||||
|
||||
public static final int GET_POOLED_TRANSACTIONS = 0x09; |
||||
|
||||
public static final int POOLED_TRANSACTIONS = 0x0A; |
||||
|
||||
private EthPV65() { |
||||
// Holder for constants only
|
||||
} |
||||
} |
@ -0,0 +1,69 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractMessageData; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; |
||||
|
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public final class GetPooledTransactionsMessage extends AbstractMessageData { |
||||
|
||||
private static final int MESSAGE_CODE = EthPV65.GET_POOLED_TRANSACTIONS; |
||||
private List<Hash> pooledTransactions; |
||||
|
||||
private GetPooledTransactionsMessage(final Bytes rlp) { |
||||
super(rlp); |
||||
} |
||||
|
||||
@Override |
||||
public int getCode() { |
||||
return MESSAGE_CODE; |
||||
} |
||||
|
||||
public static GetPooledTransactionsMessage create(final List<Hash> pooledTransactions) { |
||||
List<Hash> tx = pooledTransactions; |
||||
final BytesValueRLPOutput out = new BytesValueRLPOutput(); |
||||
out.writeList(tx, (h, w) -> w.writeBytes(h)); |
||||
return new GetPooledTransactionsMessage(out.encoded()); |
||||
} |
||||
|
||||
public static GetPooledTransactionsMessage readFrom(final MessageData message) { |
||||
if (message instanceof GetPooledTransactionsMessage) { |
||||
return (GetPooledTransactionsMessage) message; |
||||
} |
||||
final int code = message.getCode(); |
||||
if (code != MESSAGE_CODE) { |
||||
throw new IllegalArgumentException( |
||||
String.format( |
||||
"Message has code %d and thus is not a GetPooledTransactionsMessage.", code)); |
||||
} |
||||
|
||||
return new GetPooledTransactionsMessage(message.getData()); |
||||
} |
||||
|
||||
public List<Hash> pooledTransactions() { |
||||
if (pooledTransactions == null) { |
||||
final BytesValueRLPInput in = new BytesValueRLPInput(getData(), false); |
||||
pooledTransactions = in.readList(rlp -> Hash.wrap(rlp.readBytes32())); |
||||
} |
||||
return pooledTransactions; |
||||
} |
||||
} |
@ -0,0 +1,70 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public final class LimitedNewPooledTransactionHashesMessages { |
||||
|
||||
static final int MAX_COUNT = 4096; |
||||
|
||||
private final NewPooledTransactionHashesMessage transactionsMessage; |
||||
private final List<Hash> includedTransactions; |
||||
|
||||
public LimitedNewPooledTransactionHashesMessages( |
||||
final NewPooledTransactionHashesMessage transactionsMessage, |
||||
final List<Hash> includedTransactions) { |
||||
this.transactionsMessage = transactionsMessage; |
||||
this.includedTransactions = includedTransactions; |
||||
} |
||||
|
||||
public static LimitedNewPooledTransactionHashesMessages createLimited( |
||||
final Iterable<Hash> hashes) { |
||||
final List<Hash> includedTransactions = new ArrayList<>(); |
||||
final BytesValueRLPOutput message = new BytesValueRLPOutput(); |
||||
int count = 0; |
||||
message.startList(); |
||||
for (final Hash txHash : hashes) { |
||||
final BytesValueRLPOutput encodedHashes = new BytesValueRLPOutput(); |
||||
encodedHashes.writeBytes(txHash); |
||||
Bytes encodedBytes = encodedHashes.encoded(); |
||||
|
||||
message.writeRLPUnsafe(encodedBytes); |
||||
includedTransactions.add(txHash); |
||||
// Check if last transaction to add to the message
|
||||
count++; |
||||
if (count >= MAX_COUNT) { |
||||
break; |
||||
} |
||||
} |
||||
message.endList(); |
||||
return new LimitedNewPooledTransactionHashesMessages( |
||||
new NewPooledTransactionHashesMessage(message.encoded()), includedTransactions); |
||||
} |
||||
|
||||
public final NewPooledTransactionHashesMessage getTransactionsMessage() { |
||||
return transactionsMessage; |
||||
} |
||||
|
||||
public final List<Hash> getIncludedTransactions() { |
||||
return includedTransactions; |
||||
} |
||||
} |
@ -0,0 +1,68 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractMessageData; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; |
||||
|
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public final class NewPooledTransactionHashesMessage extends AbstractMessageData { |
||||
|
||||
private static final int MESSAGE_CODE = EthPV65.NEW_POOLED_TRANSACTION_HASHES; |
||||
private List<Hash> pendingTransactions; |
||||
|
||||
NewPooledTransactionHashesMessage(final Bytes rlp) { |
||||
super(rlp); |
||||
} |
||||
|
||||
@Override |
||||
public int getCode() { |
||||
return MESSAGE_CODE; |
||||
} |
||||
|
||||
public static NewPooledTransactionHashesMessage create(final List<Hash> pendingTransactions) { |
||||
final BytesValueRLPOutput out = new BytesValueRLPOutput(); |
||||
out.writeList(pendingTransactions, (h, w) -> w.writeBytes(h)); |
||||
return new NewPooledTransactionHashesMessage(out.encoded()); |
||||
} |
||||
|
||||
public static NewPooledTransactionHashesMessage readFrom(final MessageData message) { |
||||
if (message instanceof NewPooledTransactionHashesMessage) { |
||||
return (NewPooledTransactionHashesMessage) message; |
||||
} |
||||
final int code = message.getCode(); |
||||
if (code != MESSAGE_CODE) { |
||||
throw new IllegalArgumentException( |
||||
String.format( |
||||
"Message has code %d and thus is not a NewPooledTransactionHashesMessage.", code)); |
||||
} |
||||
|
||||
return new NewPooledTransactionHashesMessage(message.getData()); |
||||
} |
||||
|
||||
public List<Hash> pendingTransactions() { |
||||
if (pendingTransactions == null) { |
||||
final BytesValueRLPInput in = new BytesValueRLPInput(getData(), false); |
||||
pendingTransactions = in.readList(rlp -> Hash.wrap(rlp.readBytes32())); |
||||
} |
||||
return pendingTransactions; |
||||
} |
||||
} |
@ -0,0 +1,68 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.AbstractMessageData; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPInput; |
||||
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; |
||||
|
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
|
||||
public final class PooledTransactionsMessage extends AbstractMessageData { |
||||
|
||||
private static final int MESSAGE_CODE = EthPV65.POOLED_TRANSACTIONS; |
||||
private List<Transaction> pooledTransactions; |
||||
|
||||
private PooledTransactionsMessage(final Bytes rlp) { |
||||
super(rlp); |
||||
} |
||||
|
||||
@Override |
||||
public int getCode() { |
||||
return MESSAGE_CODE; |
||||
} |
||||
|
||||
public static PooledTransactionsMessage create(final List<Transaction> transactions) { |
||||
List<Transaction> tx = transactions; |
||||
final BytesValueRLPOutput out = new BytesValueRLPOutput(); |
||||
out.writeList(tx, Transaction::writeTo); |
||||
return new PooledTransactionsMessage(out.encoded()); |
||||
} |
||||
|
||||
public static PooledTransactionsMessage readFrom(final MessageData message) { |
||||
if (message instanceof PooledTransactionsMessage) { |
||||
return (PooledTransactionsMessage) message; |
||||
} |
||||
final int code = message.getCode(); |
||||
if (code != MESSAGE_CODE) { |
||||
throw new IllegalArgumentException( |
||||
String.format("Message has code %d and thus is not a PooledTransactionsMessage.", code)); |
||||
} |
||||
|
||||
return new PooledTransactionsMessage(message.getData()); |
||||
} |
||||
|
||||
public List<Transaction> transactions() { |
||||
if (pooledTransactions == null) { |
||||
final BytesValueRLPInput in = new BytesValueRLPInput(getData(), false); |
||||
pooledTransactions = in.readList(Transaction::readFrom); |
||||
} |
||||
return pooledTransactions; |
||||
} |
||||
} |
@ -0,0 +1,94 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import static java.util.Collections.emptySet; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
|
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.LinkedHashMap; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.stream.Collectors; |
||||
|
||||
public class PeerPendingTransactionTracker implements EthPeer.DisconnectCallback { |
||||
private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 10_000; |
||||
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>(); |
||||
private final Map<EthPeer, Set<Hash>> transactionsToSend = new ConcurrentHashMap<>(); |
||||
private final PendingTransactions pendingTransactions; |
||||
|
||||
public PeerPendingTransactionTracker(final PendingTransactions pendingTransactions) { |
||||
this.pendingTransactions = pendingTransactions; |
||||
} |
||||
|
||||
public synchronized void markTransactionsHashesAsSeen( |
||||
final EthPeer peer, final Collection<Hash> transactions) { |
||||
final Set<Hash> seenTransactionsForPeer = getOrCreateSeenTransactionsForPeer(peer); |
||||
transactions.stream().forEach(seenTransactionsForPeer::add); |
||||
} |
||||
|
||||
public synchronized void addToPeerSendQueue(final EthPeer peer, final Hash hash) { |
||||
if (!hasPeerSeenTransaction(peer, hash)) { |
||||
transactionsToSend.computeIfAbsent(peer, key -> createTransactionsSet()).add(hash); |
||||
} |
||||
} |
||||
|
||||
public Iterable<EthPeer> getEthPeersWithUnsentTransactions() { |
||||
return transactionsToSend.keySet(); |
||||
} |
||||
|
||||
public synchronized Set<Hash> claimTransactionsToSendToPeer(final EthPeer peer) { |
||||
final Set<Hash> transactionsToSend = this.transactionsToSend.remove(peer); |
||||
if (transactionsToSend != null) { |
||||
markTransactionsHashesAsSeen( |
||||
peer, |
||||
transactionsToSend.stream() |
||||
.filter(h -> pendingTransactions.getTransactionByHash(h).isPresent()) |
||||
.collect(Collectors.toSet())); |
||||
return transactionsToSend; |
||||
} else { |
||||
return emptySet(); |
||||
} |
||||
} |
||||
|
||||
private Set<Hash> getOrCreateSeenTransactionsForPeer(final EthPeer peer) { |
||||
return seenTransactions.computeIfAbsent(peer, key -> createTransactionsSet()); |
||||
} |
||||
|
||||
private boolean hasPeerSeenTransaction(final EthPeer peer, final Hash hash) { |
||||
final Set<Hash> seenTransactionsForPeer = seenTransactions.get(peer); |
||||
return seenTransactionsForPeer != null && seenTransactionsForPeer.contains(hash); |
||||
} |
||||
|
||||
private <T> Set<T> createTransactionsSet() { |
||||
return Collections.newSetFromMap( |
||||
new LinkedHashMap<T, Boolean>(1 << 4, 0.75f, true) { |
||||
@Override |
||||
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) { |
||||
return size() > MAX_TRACKED_SEEN_TRANSACTIONS; |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public void onDisconnect(final EthPeer peer) { |
||||
seenTransactions.remove(peer); |
||||
transactionsToSend.remove(peer); |
||||
} |
||||
} |
@ -0,0 +1,50 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener; |
||||
|
||||
class PendingTransactionSender implements TransactionBatchAddedListener { |
||||
|
||||
private final PeerPendingTransactionTracker transactionTracker; |
||||
private final PendingTransactionsMessageSender transactionsMessageSender; |
||||
private final EthContext ethContext; |
||||
|
||||
public PendingTransactionSender( |
||||
final PeerPendingTransactionTracker transactionTracker, |
||||
final PendingTransactionsMessageSender transactionsMessageSender, |
||||
final EthContext ethContext) { |
||||
this.transactionTracker = transactionTracker; |
||||
this.transactionsMessageSender = transactionsMessageSender; |
||||
this.ethContext = ethContext; |
||||
} |
||||
|
||||
@Override |
||||
public void onTransactionsAdded(final Iterable<Transaction> transactions) { |
||||
ethContext |
||||
.getEthPeers() |
||||
.streamAvailablePeers() |
||||
.forEach( |
||||
peer -> |
||||
transactions.forEach( |
||||
transaction -> |
||||
transactionTracker.addToPeerSendQueue(peer, transaction.getHash()))); |
||||
ethContext |
||||
.getScheduler() |
||||
.scheduleSyncWorkerTask(transactionsMessageSender::sendTransactionsToPeers); |
||||
} |
||||
} |
@ -0,0 +1,52 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import static java.time.Instant.now; |
||||
|
||||
import org.hyperledger.besu.ethereum.eth.manager.EthMessage; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthMessages; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; |
||||
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; |
||||
|
||||
import java.time.Duration; |
||||
import java.time.Instant; |
||||
|
||||
class PendingTransactionsMessageHandler implements EthMessages.MessageCallback { |
||||
|
||||
private final PendingTransactionsMessageProcessor transactionsMessageProcessor; |
||||
private final EthScheduler scheduler; |
||||
private final Duration txMsgKeepAlive; |
||||
|
||||
public PendingTransactionsMessageHandler( |
||||
final EthScheduler scheduler, |
||||
final PendingTransactionsMessageProcessor transactionsMessageProcessor, |
||||
final int txMsgKeepAliveSeconds) { |
||||
this.scheduler = scheduler; |
||||
this.transactionsMessageProcessor = transactionsMessageProcessor; |
||||
this.txMsgKeepAlive = Duration.ofSeconds(txMsgKeepAliveSeconds); |
||||
} |
||||
|
||||
@Override |
||||
public void exec(final EthMessage message) { |
||||
final NewPooledTransactionHashesMessage transactionsMessage = |
||||
NewPooledTransactionHashesMessage.readFrom(message.getData()); |
||||
final Instant startedAt = now(); |
||||
scheduler.scheduleTxWorkerTask( |
||||
() -> |
||||
transactionsMessageProcessor.processNewPooledTransactionHashesMessage( |
||||
message.getPeer(), transactionsMessage, startedAt, txMsgKeepAlive)); |
||||
} |
||||
} |
@ -0,0 +1,120 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import static java.time.Instant.now; |
||||
import static org.apache.logging.log4j.LogManager.getLogger; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.manager.task.GetPooledTransactionsFromPeerTask; |
||||
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; |
||||
import org.hyperledger.besu.ethereum.rlp.RLPException; |
||||
import org.hyperledger.besu.metrics.RunnableCounter; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||
|
||||
import java.time.Duration; |
||||
import java.time.Instant; |
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
|
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
class PendingTransactionsMessageProcessor { |
||||
|
||||
private static final int MAX_HASHES = 256; |
||||
private static final int SKIPPED_MESSAGES_LOGGING_THRESHOLD = 1000; |
||||
private static final Logger LOG = getLogger(); |
||||
private final PeerPendingTransactionTracker transactionTracker; |
||||
private final Counter totalSkippedTransactionsMessageCounter; |
||||
private final TransactionPool transactionPool; |
||||
private final EthContext ethContext; |
||||
private final MetricsSystem metricsSystem; |
||||
|
||||
PendingTransactionsMessageProcessor( |
||||
final PeerPendingTransactionTracker transactionTracker, |
||||
final TransactionPool transactionPool, |
||||
final Counter metricsCounter, |
||||
final EthContext ethContext, |
||||
final MetricsSystem metricsSystem) { |
||||
this.transactionTracker = transactionTracker; |
||||
this.transactionPool = transactionPool; |
||||
this.ethContext = ethContext; |
||||
this.metricsSystem = metricsSystem; |
||||
this.totalSkippedTransactionsMessageCounter = |
||||
new RunnableCounter( |
||||
metricsCounter, |
||||
() -> |
||||
LOG.warn( |
||||
"{} expired transaction messages have been skipped.", |
||||
SKIPPED_MESSAGES_LOGGING_THRESHOLD), |
||||
SKIPPED_MESSAGES_LOGGING_THRESHOLD); |
||||
} |
||||
|
||||
void processNewPooledTransactionHashesMessage( |
||||
final EthPeer peer, |
||||
final NewPooledTransactionHashesMessage transactionsMessage, |
||||
final Instant startedAt, |
||||
final Duration keepAlive) { |
||||
// Check if message not expired.
|
||||
if (startedAt.plus(keepAlive).isAfter(now())) { |
||||
this.processNewPooledTransactionHashesMessage(peer, transactionsMessage); |
||||
} else { |
||||
totalSkippedTransactionsMessageCounter.inc(); |
||||
} |
||||
} |
||||
|
||||
private void processNewPooledTransactionHashesMessage( |
||||
final EthPeer peer, final NewPooledTransactionHashesMessage transactionsMessage) { |
||||
try { |
||||
LOG.trace("Received pooled transaction hashes message from {}", peer); |
||||
|
||||
final List<Hash> pendingHashes = transactionsMessage.pendingTransactions(); |
||||
transactionTracker.markTransactionsHashesAsSeen(peer, pendingHashes); |
||||
List<Hash> toRequest = new ArrayList<>(); |
||||
for (Hash hash : pendingHashes) { |
||||
if (transactionPool.addTransactionHashes(hash)) { |
||||
toRequest.add(hash); |
||||
} |
||||
} |
||||
while (!toRequest.isEmpty()) { |
||||
List<Hash> messageHashes = toRequest.subList(0, Math.min(toRequest.size(), MAX_HASHES)); |
||||
GetPooledTransactionsFromPeerTask task = |
||||
GetPooledTransactionsFromPeerTask.forHashes(ethContext, messageHashes, metricsSystem); |
||||
task.assignPeer(peer); |
||||
ethContext |
||||
.getScheduler() |
||||
.scheduleSyncWorkerTask(task) |
||||
.thenAccept( |
||||
result -> { |
||||
List<Transaction> txs = result.getResult(); |
||||
transactionPool.addRemoteTransactions(txs); |
||||
}); |
||||
|
||||
toRequest.removeAll(messageHashes); |
||||
} |
||||
} catch (final RLPException ex) { |
||||
if (peer != null) { |
||||
LOG.debug( |
||||
"Malformed pooled transaction hashes message received, disconnecting: {}", peer, ex); |
||||
peer.disconnect(DisconnectReason.BREACH_OF_PROTOCOL); |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,52 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.messages.LimitedNewPooledTransactionHashesMessages; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection.PeerNotConnected; |
||||
|
||||
import java.util.Set; |
||||
import java.util.stream.StreamSupport; |
||||
|
||||
class PendingTransactionsMessageSender { |
||||
|
||||
private final PeerPendingTransactionTracker transactionTracker; |
||||
|
||||
public PendingTransactionsMessageSender(final PeerPendingTransactionTracker transactionTracker) { |
||||
this.transactionTracker = transactionTracker; |
||||
} |
||||
|
||||
public void sendTransactionsToPeers() { |
||||
StreamSupport.stream(transactionTracker.getEthPeersWithUnsentTransactions().spliterator(), true) |
||||
.parallel() |
||||
.forEach(this::sendTransactionsToPeer); |
||||
} |
||||
|
||||
private void sendTransactionsToPeer(final EthPeer peer) { |
||||
final Set<Hash> allTxToSend = transactionTracker.claimTransactionsToSendToPeer(peer); |
||||
while (!allTxToSend.isEmpty()) { |
||||
final LimitedNewPooledTransactionHashesMessages limitedTransactionsMessages = |
||||
LimitedNewPooledTransactionHashesMessages.createLimited(allTxToSend); |
||||
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions()); |
||||
try { |
||||
peer.send(limitedTransactionsMessages.getTransactionsMessage()); |
||||
} catch (final PeerNotConnected e) { |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,87 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.manager.task; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
import org.hyperledger.besu.crypto.SECP256K1; |
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.core.TransactionTestFixture; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest; |
||||
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; |
||||
import org.hyperledger.besu.plugin.services.MetricsSystem; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import com.google.common.collect.Lists; |
||||
|
||||
public class GetPooledTransactionsFromPeerTaskTest extends PeerMessageTaskTest<List<Transaction>> { |
||||
|
||||
private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); |
||||
|
||||
@Override |
||||
protected List<Transaction> generateDataToBeRequested() { |
||||
|
||||
final List<Transaction> requestedData = new ArrayList<>(); |
||||
SECP256K1.KeyPair keyPair = SECP256K1.KeyPair.generate(); |
||||
for (int i = 0; i < 3; i++) { |
||||
Transaction tx = |
||||
new TransactionTestFixture() |
||||
.nonce(i) |
||||
.gasLimit(100000) |
||||
.chainId(Optional.empty()) |
||||
.createTransaction(keyPair); |
||||
assertThat(transactionPool.getPendingTransactions().addLocalTransaction(tx)).isTrue(); |
||||
requestedData.add(tx); |
||||
} |
||||
return requestedData; |
||||
} |
||||
|
||||
@Override |
||||
protected EthTask<AbstractPeerTask.PeerTaskResult<List<Transaction>>> createTask( |
||||
final List<Transaction> requestedData) { |
||||
final List<Hash> hashes = |
||||
Lists.newArrayList(requestedData).stream() |
||||
.map(Transaction::getHash) |
||||
.collect(Collectors.toList()); |
||||
return GetPooledTransactionsFromPeerTask.forHashes(ethContext, hashes, metricsSystem); |
||||
} |
||||
|
||||
@Override |
||||
protected void assertPartialResultMatchesExpectation( |
||||
final List<Transaction> requestedData, final List<Transaction> partialResponse) { |
||||
assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size()); |
||||
assertThat(partialResponse.size()).isGreaterThan(0); |
||||
for (Transaction data : partialResponse) { |
||||
assertThat(requestedData).contains(data); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
protected void assertResultMatchesExpectation( |
||||
final List<Transaction> requestedData, |
||||
final AbstractPeerTask.PeerTaskResult<List<Transaction>> response, |
||||
final EthPeer respondingPeer) { |
||||
assertThat(response.getResult().size()).isEqualTo(requestedData.size()); |
||||
for (Transaction data : response.getResult()) { |
||||
assertThat(requestedData).contains(data); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,47 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
import org.apache.tuweni.bytes.Bytes32; |
||||
import org.junit.Test; |
||||
|
||||
public class GetPooledTransactionsMessageTest { |
||||
|
||||
@Test |
||||
public void roundTripGetPooledTransactionsMessage() { |
||||
List<Hash> hashes = Arrays.asList(Hash.wrap(Bytes32.random())); |
||||
final GetPooledTransactionsMessage msg = GetPooledTransactionsMessage.create(hashes); |
||||
assertThat(msg.getCode()).isEqualTo(EthPV65.GET_POOLED_TRANSACTIONS); |
||||
assertThat(msg.pooledTransactions()).isEqualTo(hashes); |
||||
} |
||||
|
||||
@Test |
||||
public void readFromMessageWithWrongCodeThrows() { |
||||
final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, Bytes.of(0)); |
||||
|
||||
assertThatExceptionOfType(IllegalArgumentException.class) |
||||
.isThrownBy(() -> GetPooledTransactionsMessage.readFrom(rawMsg)); |
||||
} |
||||
} |
@ -0,0 +1,71 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
|
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.junit.Test; |
||||
|
||||
public class LimitedNewPooledTransactionHashesMessagesTest { |
||||
|
||||
private final BlockDataGenerator generator = new BlockDataGenerator(); |
||||
private final List<Hash> sampleTxs = |
||||
generator.transactions(1).stream().map(Transaction::getHash).collect(Collectors.toList()); |
||||
private final NewPooledTransactionHashesMessage sampleTransactionMessages = |
||||
NewPooledTransactionHashesMessage.create(sampleTxs); |
||||
private final LimitedNewPooledTransactionHashesMessages sampleLimitedTransactionsMessages = |
||||
new LimitedNewPooledTransactionHashesMessages(sampleTransactionMessages, sampleTxs); |
||||
|
||||
@Test |
||||
public void createLimited() { |
||||
final List<Hash> txs = |
||||
generator.transactions(6000).stream() |
||||
.map(Transaction::getHash) |
||||
.collect(Collectors.toList()); |
||||
final LimitedNewPooledTransactionHashesMessages firstMessage = |
||||
LimitedNewPooledTransactionHashesMessages.createLimited(txs); |
||||
assertThat(firstMessage.getIncludedTransactions().size()).isEqualTo(4096); |
||||
|
||||
txs.removeAll(firstMessage.getIncludedTransactions()); |
||||
assertThat(txs.size()).isEqualTo(6000 - 4096); |
||||
final LimitedNewPooledTransactionHashesMessages secondMessage = |
||||
LimitedNewPooledTransactionHashesMessages.createLimited(txs); |
||||
assertThat(secondMessage.getIncludedTransactions().size()).isEqualTo(6000 - 4096); |
||||
txs.removeAll(secondMessage.getIncludedTransactions()); |
||||
assertThat(txs.size()).isEqualTo(0); |
||||
assertThat( |
||||
firstMessage.getTransactionsMessage().getSize() |
||||
+ secondMessage.getTransactionsMessage().getSize()) |
||||
.isLessThan(2 * LimitedTransactionsMessages.LIMIT); |
||||
} |
||||
|
||||
@Test |
||||
public void getTransactionsMessage() { |
||||
assertThat(sampleLimitedTransactionsMessages.getTransactionsMessage()) |
||||
.isEqualTo(sampleTransactionMessages); |
||||
} |
||||
|
||||
@Test |
||||
public void getIncludedTransactions() { |
||||
assertThat(sampleLimitedTransactionsMessages.getIncludedTransactions()).isEqualTo(sampleTxs); |
||||
} |
||||
} |
@ -0,0 +1,47 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
import org.apache.tuweni.bytes.Bytes32; |
||||
import org.junit.Test; |
||||
|
||||
public class NewPooledTransactionHashesMessageTest { |
||||
|
||||
@Test |
||||
public void roundTripNewPooledTransactionHashesMessage() { |
||||
List<Hash> hashes = Arrays.asList(Hash.wrap(Bytes32.random())); |
||||
final NewPooledTransactionHashesMessage msg = NewPooledTransactionHashesMessage.create(hashes); |
||||
assertThat(msg.getCode()).isEqualTo(EthPV65.NEW_POOLED_TRANSACTION_HASHES); |
||||
assertThat(msg.pendingTransactions()).isEqualTo(hashes); |
||||
} |
||||
|
||||
@Test |
||||
public void readFromMessageWithWrongCodeThrows() { |
||||
final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, Bytes.of(0)); |
||||
|
||||
assertThatExceptionOfType(IllegalArgumentException.class) |
||||
.isThrownBy(() -> NewPooledTransactionHashesMessage.readFrom(rawMsg)); |
||||
} |
||||
} |
@ -0,0 +1,56 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.messages; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
||||
|
||||
import org.hyperledger.besu.crypto.SECP256K1; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.core.Wei; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.RawMessage; |
||||
|
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import org.apache.tuweni.bytes.Bytes; |
||||
import org.junit.Test; |
||||
|
||||
public class PooledTransactionsMessageTest { |
||||
|
||||
@Test |
||||
public void roundTripPooledTransactionsMessage() { |
||||
List<Transaction> tx = |
||||
Arrays.asList( |
||||
Transaction.builder() |
||||
.nonce(42) |
||||
.gasLimit(654321) |
||||
.gasPrice(Wei.of(2)) |
||||
.value(Wei.of(1337)) |
||||
.payload(Bytes.EMPTY) |
||||
.signAndBuild(SECP256K1.KeyPair.generate())); |
||||
final PooledTransactionsMessage msg = PooledTransactionsMessage.create(tx); |
||||
assertThat(msg.getCode()).isEqualTo(EthPV65.POOLED_TRANSACTIONS); |
||||
assertThat(msg.transactions()).isEqualTo(tx); |
||||
} |
||||
|
||||
@Test |
||||
public void readFromMessageWithWrongCodeThrows() { |
||||
final RawMessage rawMsg = new RawMessage(EthPV62.BLOCK_HEADERS, Bytes.of(0)); |
||||
|
||||
assertThatExceptionOfType(IllegalArgumentException.class) |
||||
.isThrownBy(() -> PooledTransactionsMessage.readFrom(rawMsg)); |
||||
} |
||||
} |
@ -0,0 +1,106 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
|
||||
import java.util.Optional; |
||||
|
||||
import com.google.common.collect.ImmutableSet; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
public class PeerPendingTransactionTrackerTest { |
||||
|
||||
private final EthPeer ethPeer1 = mock(EthPeer.class); |
||||
private final EthPeer ethPeer2 = mock(EthPeer.class); |
||||
private final BlockDataGenerator generator = new BlockDataGenerator(); |
||||
private final PendingTransactions pendingTransactions = mock(PendingTransactions.class); |
||||
private final PeerPendingTransactionTracker tracker = |
||||
new PeerPendingTransactionTracker(pendingTransactions); |
||||
private final Hash hash1 = generator.transaction().getHash(); |
||||
private final Hash hash2 = generator.transaction().getHash(); |
||||
private final Hash hash3 = generator.transaction().getHash(); |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
Transaction tx = mock(Transaction.class); |
||||
when(pendingTransactions.getTransactionByHash(any())).thenReturn(Optional.of(tx)); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldTrackTransactionsToSendToPeer() { |
||||
tracker.addToPeerSendQueue(ethPeer1, hash1); |
||||
tracker.addToPeerSendQueue(ethPeer1, hash2); |
||||
tracker.addToPeerSendQueue(ethPeer2, hash3); |
||||
|
||||
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1, hash2); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldExcludeAlreadySeenTransactionsFromTransactionsToSend() { |
||||
tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash2)); |
||||
|
||||
tracker.addToPeerSendQueue(ethPeer1, hash1); |
||||
tracker.addToPeerSendQueue(ethPeer1, hash2); |
||||
tracker.addToPeerSendQueue(ethPeer2, hash3); |
||||
|
||||
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldExcludeAlreadySeenTransactionsAsACollectionFromTransactionsToSend() { |
||||
tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash1, hash2)); |
||||
|
||||
tracker.addToPeerSendQueue(ethPeer1, hash1); |
||||
tracker.addToPeerSendQueue(ethPeer1, hash2); |
||||
tracker.addToPeerSendQueue(ethPeer2, hash3); |
||||
|
||||
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).isEmpty(); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldClearDataWhenPeerDisconnects() { |
||||
tracker.markTransactionsHashesAsSeen(ethPeer1, ImmutableSet.of(hash3)); |
||||
|
||||
tracker.addToPeerSendQueue(ethPeer1, hash2); |
||||
tracker.addToPeerSendQueue(ethPeer2, hash3); |
||||
|
||||
tracker.onDisconnect(ethPeer1); |
||||
|
||||
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2); |
||||
|
||||
// Should have cleared data that ethPeer1 has already seen transaction1
|
||||
tracker.addToPeerSendQueue(ethPeer1, hash1); |
||||
|
||||
assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer1, ethPeer2); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(hash1); |
||||
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(hash3); |
||||
} |
||||
} |
@ -0,0 +1,97 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import static java.time.Duration.ofMillis; |
||||
import static java.time.Duration.ofMinutes; |
||||
import static java.time.Instant.now; |
||||
import static java.util.Arrays.asList; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyZeroInteractions; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; |
||||
import org.hyperledger.besu.plugin.services.metrics.Counter; |
||||
|
||||
import java.util.Arrays; |
||||
|
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.InjectMocks; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class PendingTransactionsMessageProcessorTest { |
||||
|
||||
@Mock private TransactionPool transactionPool; |
||||
@Mock private PeerPendingTransactionTracker transactionTracker; |
||||
@Mock private Counter totalSkippedTransactionsMessageCounter; |
||||
@Mock private EthPeer peer1; |
||||
@InjectMocks private PendingTransactionsMessageProcessor messageHandler; |
||||
|
||||
private final BlockDataGenerator generator = new BlockDataGenerator(); |
||||
private final Hash hash1 = generator.transaction().getHash(); |
||||
private final Hash hash2 = generator.transaction().getHash(); |
||||
private final Hash hash3 = generator.transaction().getHash(); |
||||
|
||||
@Test |
||||
public void shouldMarkAllReceivedTransactionsAsSeen() { |
||||
messageHandler.processNewPooledTransactionHashesMessage( |
||||
peer1, |
||||
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)), |
||||
now(), |
||||
ofMinutes(1)); |
||||
|
||||
verify(transactionTracker) |
||||
.markTransactionsHashesAsSeen(peer1, Arrays.asList(hash1, hash2, hash3)); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldAddInitiatedRequestingTransactions() { |
||||
messageHandler.processNewPooledTransactionHashesMessage( |
||||
peer1, |
||||
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)), |
||||
now(), |
||||
ofMinutes(1)); |
||||
verify(transactionPool).addTransactionHashes(hash1); |
||||
verify(transactionPool).addTransactionHashes(hash2); |
||||
verify(transactionPool).addTransactionHashes(hash3); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldNotMarkReceivedExpiredTransactionsAsSeen() { |
||||
messageHandler.processNewPooledTransactionHashesMessage( |
||||
peer1, |
||||
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)), |
||||
now().minus(ofMinutes(1)), |
||||
ofMillis(1)); |
||||
verifyZeroInteractions(transactionTracker); |
||||
verify(totalSkippedTransactionsMessageCounter).inc(1); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() { |
||||
messageHandler.processNewPooledTransactionHashesMessage( |
||||
peer1, |
||||
NewPooledTransactionHashesMessage.create(asList(hash1, hash2, hash3)), |
||||
now().minus(ofMinutes(1)), |
||||
ofMillis(1)); |
||||
verifyZeroInteractions(transactionPool); |
||||
verify(totalSkippedTransactionsMessageCounter).inc(1); |
||||
} |
||||
} |
@ -0,0 +1,127 @@ |
||||
/* |
||||
* Copyright ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
* |
||||
* SPDX-License-Identifier: Apache-2.0 |
||||
*/ |
||||
package org.hyperledger.besu.ethereum.eth.transactions; |
||||
|
||||
import static com.google.common.collect.Sets.newHashSet; |
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.ArgumentMatchers.argThat; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.verifyNoMoreInteractions; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import org.hyperledger.besu.ethereum.core.BlockDataGenerator; |
||||
import org.hyperledger.besu.ethereum.core.Hash; |
||||
import org.hyperledger.besu.ethereum.core.Transaction; |
||||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; |
||||
import org.hyperledger.besu.ethereum.eth.messages.EthPV65; |
||||
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage; |
||||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; |
||||
|
||||
import java.util.List; |
||||
import java.util.Optional; |
||||
import java.util.Set; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import com.google.common.collect.Sets; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.mockito.ArgumentCaptor; |
||||
|
||||
public class PendingTransactionsMessageSenderTest { |
||||
|
||||
private final EthPeer peer1 = mock(EthPeer.class); |
||||
private final EthPeer peer2 = mock(EthPeer.class); |
||||
|
||||
private final BlockDataGenerator generator = new BlockDataGenerator(); |
||||
private final Hash transaction1 = generator.transaction().getHash(); |
||||
private final Hash transaction2 = generator.transaction().getHash(); |
||||
private final Hash transaction3 = generator.transaction().getHash(); |
||||
|
||||
private final PendingTransactions pendingTransactions = mock(PendingTransactions.class); |
||||
private final PeerPendingTransactionTracker transactionTracker = |
||||
new PeerPendingTransactionTracker(pendingTransactions); |
||||
private final PendingTransactionsMessageSender messageSender = |
||||
new PendingTransactionsMessageSender(transactionTracker); |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
Transaction tx = mock(Transaction.class); |
||||
when(pendingTransactions.getTransactionByHash(any())).thenReturn(Optional.of(tx)); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldSendPendingTransactionsToEachPeer() throws Exception { |
||||
|
||||
transactionTracker.addToPeerSendQueue(peer1, transaction1); |
||||
transactionTracker.addToPeerSendQueue(peer1, transaction2); |
||||
transactionTracker.addToPeerSendQueue(peer2, transaction3); |
||||
|
||||
messageSender.sendTransactionsToPeers(); |
||||
|
||||
verify(peer1).send(transactionsMessageContaining(transaction1, transaction2)); |
||||
verify(peer2).send(transactionsMessageContaining(transaction3)); |
||||
verifyNoMoreInteractions(peer1, peer2); |
||||
} |
||||
|
||||
@Test |
||||
public void shouldSendTransactionsInBatchesWithLimit() throws Exception { |
||||
final Set<Hash> transactions = |
||||
generator.transactions(6000).stream().map(Transaction::getHash).collect(Collectors.toSet()); |
||||
|
||||
transactions.forEach(transaction -> transactionTracker.addToPeerSendQueue(peer1, transaction)); |
||||
|
||||
messageSender.sendTransactionsToPeers(); |
||||
final ArgumentCaptor<MessageData> messageDataArgumentCaptor = |
||||
ArgumentCaptor.forClass(MessageData.class); |
||||
verify(peer1, times(2)).send(messageDataArgumentCaptor.capture()); |
||||
|
||||
final List<MessageData> sentMessages = messageDataArgumentCaptor.getAllValues(); |
||||
|
||||
assertThat(sentMessages).hasSize(2); |
||||
assertThat(sentMessages) |
||||
.allMatch(message -> message.getCode() == EthPV65.NEW_POOLED_TRANSACTION_HASHES); |
||||
final Set<Hash> firstBatch = getTransactionsFromMessage(sentMessages.get(0)); |
||||
final Set<Hash> secondBatch = getTransactionsFromMessage(sentMessages.get(1)); |
||||
|
||||
final int expectedFirstBatchSize = 4096, expectedSecondBatchSize = 1904, toleranceDelta = 0; |
||||
assertThat(firstBatch) |
||||
.hasSizeBetween( |
||||
expectedFirstBatchSize - toleranceDelta, expectedFirstBatchSize + toleranceDelta); |
||||
assertThat(secondBatch) |
||||
.hasSizeBetween( |
||||
expectedSecondBatchSize - toleranceDelta, expectedSecondBatchSize + toleranceDelta); |
||||
|
||||
assertThat(Sets.union(firstBatch, secondBatch)).isEqualTo(transactions); |
||||
} |
||||
|
||||
private MessageData transactionsMessageContaining(final Hash... transactions) { |
||||
return argThat( |
||||
message -> { |
||||
final Set<Hash> actualSentTransactions = getTransactionsFromMessage(message); |
||||
final Set<Hash> expectedTransactions = newHashSet(transactions); |
||||
return message.getCode() == EthPV65.NEW_POOLED_TRANSACTION_HASHES |
||||
&& actualSentTransactions.equals(expectedTransactions); |
||||
}); |
||||
} |
||||
|
||||
private Set<Hash> getTransactionsFromMessage(final MessageData message) { |
||||
final NewPooledTransactionHashesMessage transactionsMessage = |
||||
NewPooledTransactionHashesMessage.readFrom(message); |
||||
return newHashSet(transactionsMessage.pendingTransactions()); |
||||
} |
||||
} |
Loading…
Reference in new issue