[PAN-2630] Synchronizer should disconnect the sync target peer on invalid block data (#1578)

* [PAN-2630] Synchronizer should disconnect the sync target peer on invalid block data

- check if headers are sequential
- if sequential, check if they form a chain, if not, disconnect the peer

* update behaviour and add tests

- change checks order
- add log message
- write tests

* disconnect sync target when InvalidBlockException

* remove power mockito

- remove powermockito
- write tests in subclasses of the `AbstractGetHeadersFromPeerTaskTest`

* fix Exception check bug

* fix PR discussion first pass

- add assertion to check when the peer is disconnected
- add assertion to check the peer has not been disonnected
- undo remove final on class
- remove sync target field

* remove spy invocation

* spotlessApply

* shouldDisconnectPeerIfInvalidBlockException

Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
pull/2/head
Abdelhamid Bakhta 6 years ago committed by GitHub
parent 041a99ffc4
commit a6c678944b
  1. 1
      ethereum/eth/build.gradle
  2. 20
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java
  3. 11
      ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java
  4. 75
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByHashTaskTest.java
  5. 57
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetHeadersFromPeerByNumberTaskTest.java
  6. 20
      ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java

@ -55,7 +55,6 @@ dependencies {
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
testImplementation 'org.mockito:mockito-core'
jmhImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts')
integrationTestImplementation project(path: ':config', configuration: 'testSupportArtifacts')

@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import java.util.ArrayList;
@ -86,16 +87,27 @@ public abstract class AbstractGetHeadersFromPeerTask
final List<BlockHeader> headersList = new ArrayList<>();
headersList.add(firstHeader);
long prevNumber = firstHeader.getNumber();
BlockHeader prevBlockHeader = firstHeader;
final int expectedDelta = reverse ? -(skip + 1) : (skip + 1);
for (int i = 1; i < headers.size(); i++) {
final BlockHeader header = headers.get(i);
if (header.getNumber() != prevNumber + expectedDelta) {
if (header.getNumber() != prevBlockHeader.getNumber() + expectedDelta) {
// Skip doesn't match, this isn't our data
return Optional.empty();
}
prevNumber = header.getNumber();
// if headers are supposed to be sequential check if a chain is formed
if (Math.abs(expectedDelta) == 1) {
final BlockHeader parent = reverse ? header : prevBlockHeader;
final BlockHeader child = reverse ? prevBlockHeader : header;
if (!parent.getHash().equals(child.getParentHash())) {
LOG.debug(
"Sequential headers must form a chain through hashes, disconnecting peer: {}",
peer.toString());
peer.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
return Optional.empty();
}
}
prevBlockHeader = header;
headersList.add(header);
}

@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricsSystem;
@ -121,6 +122,16 @@ public class PipelineChainDownloader<C> implements ChainDownloader {
// Allowing the normal looping logic to retry after a brief delay.
return scheduler.scheduleFutureTask(() -> completedFuture(null), PAUSE_AFTER_ERROR_DURATION);
}
if (ExceptionUtils.rootCause(error) instanceof InvalidBlockException) {
syncState
.syncTarget()
.ifPresent(
syncTarget ->
syncTarget
.peer()
.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL));
}
logDownloadFailure("Chain download failed.", error);
// Propagate the error out, terminating this chain download.
return completedExceptionally(error);

@ -13,23 +13,43 @@
package tech.pegasys.pantheon.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.ethereum.vm.TestBlockchain.generateTestBlockHash;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<BlockHeader>> {
@Mock private EthPeer peerMock;
@Override
protected void assertPartialResultMatchesExpectation(
@ -124,4 +144,59 @@ public class GetHeadersFromPeerByHashTaskTest extends PeerMessageTaskTest<List<B
assertThat(actualResult.get().getPeer()).isEqualTo(respondingPeer.getEthPeer());
assertThat(actualResult.get().getResult()).isEqualTo(expectedHeaders);
}
@Test
public void checkThatSequentialHeadersFormingAChainWorks() {
final int startNumber = 1;
final BlockHeader block1 =
new BlockHeaderTestFixture().number(1).parentHash(generateTestBlockHash(0)).buildHeader();
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(block1.getHash()).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
block1.getHash(),
startNumber,
2,
0,
false,
metricsSystem);
Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
assertNotNull(optionalBlockHeaders);
assertTrue(optionalBlockHeaders.isPresent());
List<BlockHeader> blockHeaders = optionalBlockHeaders.get();
MatcherAssert.assertThat(blockHeaders, hasSize(2));
verify(peerMock, times(0)).disconnect(any());
}
@Test
public void checkThatSequentialHeadersNotFormingAChainFails() {
final int startNumber = 1;
final BlockHeader block1 =
new BlockHeaderTestFixture().number(1).parentHash(generateTestBlockHash(0)).buildHeader();
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(generateTestBlockHash(1)).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByHashTask(
protocolSchedule,
ethContext,
block1.getHash(),
startNumber,
2,
0,
false,
metricsSystem);
Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
assertNotNull(optionalBlockHeaders);
assertFalse(optionalBlockHeaders.isPresent());
verify(peerMock).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
}

@ -13,21 +13,41 @@
package tech.pegasys.pantheon.ethereum.eth.manager.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.ethereum.vm.TestBlockchain.generateTestBlockHash;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.PeerMessageTaskTest;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List<BlockHeader>> {
@Mock private EthPeer peerMock;
@Override
protected void assertPartialResultMatchesExpectation(
@ -111,4 +131,41 @@ public class GetHeadersFromPeerByNumberTaskTest extends PeerMessageTaskTest<List
assertThat(actualResult.get().getPeer()).isEqualTo(respondingPeer.getEthPeer());
assertThat(actualResult.get().getResult()).isEqualTo(expectedHeaders);
}
@Test
public void checkThatSequentialHeadersFormingAChainWorks() {
final BlockHeader block1 =
new BlockHeaderTestFixture().number(1).parentHash(generateTestBlockHash(0)).buildHeader();
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(block1.getHash()).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, block1.getNumber(), 2, 0, false, metricsSystem);
Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
assertNotNull(optionalBlockHeaders);
assertTrue(optionalBlockHeaders.isPresent());
List<BlockHeader> blockHeaders = optionalBlockHeaders.get();
MatcherAssert.assertThat(blockHeaders, hasSize(2));
verify(peerMock, times(0)).disconnect(any());
}
@Test
public void checkThatSequentialHeadersNotFormingAChainFails() {
final BlockHeader block1 =
new BlockHeaderTestFixture().number(1).parentHash(generateTestBlockHash(0)).buildHeader();
final BlockHeader block2 =
new BlockHeaderTestFixture().number(2).parentHash(generateTestBlockHash(1)).buildHeader();
final List<BlockHeader> headers = Arrays.asList(block1, block2);
final AbstractGetHeadersFromPeerTask task =
new GetHeadersFromPeerByNumberTask(
protocolSchedule, ethContext, block1.getNumber(), 2, 0, false, metricsSystem);
Optional<List<BlockHeader>> optionalBlockHeaders =
task.processResponse(false, BlockHeadersMessage.create(headers), peerMock);
assertNotNull(optionalBlockHeaders);
assertFalse(optionalBlockHeaders.isPresent());
verify(peerMock).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
}

@ -31,6 +31,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;
@ -44,6 +46,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@ -251,6 +254,23 @@ public class PipelineChainDownloaderTest {
assertCancelled(result);
}
@Test
public void shouldDisconnectPeerIfInvalidBlockException() {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(false);
when(syncTargetManager.findSyncTarget(Optional.empty()))
.thenReturn(selectTargetFuture)
.thenReturn(new CompletableFuture<>());
final EthPeer ethPeer = Mockito.mock(EthPeer.class);
final BlockHeader commonAncestor = Mockito.mock(BlockHeader.class);
final SyncTarget target = new SyncTarget(ethPeer, commonAncestor);
when(syncState.syncTarget()).thenReturn(Optional.of(target));
chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
selectTargetFuture.completeExceptionally(new InvalidBlockException("", 1, null));
verify(ethPeer).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
private CompletableFuture<Void> expectPipelineStarted(final SyncTarget syncTarget) {
return expectPipelineStarted(syncTarget, downloadPipeline);
}

Loading…
Cancel
Save