From 2bd6ce74ba786f882bca4493046d9551762cedb5 Mon Sep 17 00:00:00 2001 From: Roberto Saltini <38655434+saltiniroberto@users.noreply.github.com> Date: Thu, 18 Oct 2018 10:27:52 +1100 Subject: [PATCH] [NC-1508] Added iBFT 2.0 BlockTimerExpiry Event and realated timer management class (#58) --- .../pantheon/consensus/ibft/BlockTimer.java | 94 +++++++ .../pantheon/consensus/ibft/IbftEvents.java | 3 +- .../ibft/ibftevent/BlockTimerExpiry.java | 66 +++++ .../consensus/ibft/BlockTimerTest.java | 239 ++++++++++++++++++ 4 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/BlockTimer.java create mode 100644 consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/BlockTimerExpiry.java create mode 100644 consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/BlockTimerTest.java diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/BlockTimer.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/BlockTimer.java new file mode 100644 index 0000000000..ec311cc0ca --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/BlockTimer.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.util.time.Clock; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** Class for starting and keeping organised block timers */ +public class BlockTimer { + private final ScheduledExecutorService timerExecutor; + private Optional> currentTimerTask; + private final IbftEventQueue queue; + private final long minimumTimeBetweenBlocksMillis; + private final Clock clock; + + /** + * Construct a BlockTimer with primed executor service ready to start timers + * + * @param queue The queue in which to put block expiry events + * @param minimumTimeBetweenBlocksMillis Minimum timestamp difference between blocks + * @param timerExecutor Executor service that timers can be scheduled with + * @param clock System clock + */ + public BlockTimer( + final IbftEventQueue queue, + final long minimumTimeBetweenBlocksMillis, + final ScheduledExecutorService timerExecutor, + final Clock clock) { + this.queue = queue; + this.timerExecutor = timerExecutor; + this.currentTimerTask = Optional.empty(); + this.minimumTimeBetweenBlocksMillis = minimumTimeBetweenBlocksMillis; + this.clock = clock; + } + + /** Cancels the current running round timer if there is one */ + public synchronized void cancelTimer() { + currentTimerTask.ifPresent(t -> t.cancel(false)); + currentTimerTask = Optional.empty(); + } + + /** + * Whether there is a timer currently running or not + * + * @return boolean of whether a timer is ticking or not + */ + public synchronized boolean isRunning() { + return currentTimerTask.map(t -> !t.isDone()).orElse(false); + } + + /** + * Starts a timer for the supplied round cancelling any previously active block timer + * + * @param round The round identifier which this timer is tracking + * @param chainHeadHeader The header of the chain head + */ + public synchronized void startTimer( + final ConsensusRoundIdentifier round, final BlockHeader chainHeadHeader) { + cancelTimer(); + + final long now = clock.millisecondsSinceEpoch(); + + // absolute time when the timer is supposed to expire + final long expiryTime = chainHeadHeader.getTimestamp() * 1_000 + minimumTimeBetweenBlocksMillis; + + if (expiryTime > now) { + final long delay = expiryTime - now; + + final Runnable newTimerRunnable = () -> queue.add(new BlockTimerExpiry(round)); + + final ScheduledFuture newTimerTask = + timerExecutor.schedule(newTimerRunnable, delay, TimeUnit.MILLISECONDS); + currentTimerTask = Optional.of(newTimerTask); + } else { + queue.add(new BlockTimerExpiry(round)); + } + } +} diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftEvents.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftEvents.java index f4ec228f8b..dc6cf9a50c 100644 --- a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftEvents.java +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/IbftEvents.java @@ -22,6 +22,7 @@ public class IbftEvents { public enum Type { ROUND_EXPIRY, - NEW_CHAIN_HEAD_HEADER + NEW_CHAIN_HEAD_HEADER, + BLOCK_TIMER_EXPIRY } } diff --git a/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/BlockTimerExpiry.java b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/BlockTimerExpiry.java new file mode 100644 index 0000000000..a63079fd78 --- /dev/null +++ b/consensus/ibft/src/main/java/tech/pegasys/pantheon/consensus/ibft/ibftevent/BlockTimerExpiry.java @@ -0,0 +1,66 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft.ibftevent; + +import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier; +import tech.pegasys.pantheon.consensus.ibft.IbftEvent; +import tech.pegasys.pantheon.consensus.ibft.IbftEvents.Type; + +import java.util.Objects; + +import com.google.common.base.MoreObjects; + +/** Event indicating a block timer has expired */ +public final class BlockTimerExpiry implements IbftEvent { + private final ConsensusRoundIdentifier roundIdentifier; + + /** + * Constructor for a BlockTimerExpiry event + * + * @param roundIdentifier The roundIdentifier that the expired timer belonged to + */ + public BlockTimerExpiry(final ConsensusRoundIdentifier roundIdentifier) { + this.roundIdentifier = roundIdentifier; + } + + @Override + public Type getType() { + return Type.BLOCK_TIMER_EXPIRY; + } + + public ConsensusRoundIdentifier getRoundIndentifier() { + return roundIdentifier; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("Round Identifier", roundIdentifier).toString(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final BlockTimerExpiry that = (BlockTimerExpiry) o; + return Objects.equals(roundIdentifier, that.roundIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(roundIdentifier); + } +} diff --git a/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/BlockTimerTest.java b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/BlockTimerTest.java new file mode 100644 index 0000000000..c164f099f5 --- /dev/null +++ b/consensus/ibft/src/test/java/tech/pegasys/pantheon/consensus/ibft/BlockTimerTest.java @@ -0,0 +1,239 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.consensus.ibft; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.util.time.Clock; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) +public class BlockTimerTest { + private ScheduledExecutorService mockExecutorService; + private IbftEventQueue mockQueue; + private Clock mockClock; + + @Before + public void initialise() { + mockExecutorService = mock(ScheduledExecutorService.class); + mockQueue = mock(IbftEventQueue.class); + mockClock = mock(Clock.class); + } + + @Test + public void cancelTimerCancelsWhenNoTimer() { + final BlockTimer timer = new BlockTimer(mockQueue, 15_000, mockExecutorService, mockClock); + // Starts with nothing running + assertThat(timer.isRunning()).isFalse(); + // cancel shouldn't die if there's nothing running + timer.cancelTimer(); + // there is still nothing running + assertThat(timer.isRunning()).isFalse(); + } + + @Test + public void startTimerSchedulesCorrectlyWhenExpiryIsInTheFuture() { + final long MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS = 15_000; + final long NOW_MILLIS = 505_000L; + final long BLOCK_TIME_STAMP = 500L; + final long EXPECTED_DELAY = 10_000L; + + final BlockTimer timer = + new BlockTimer( + mockQueue, MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS, mockExecutorService, mockClock); + + when(mockClock.millisecondsSinceEpoch()).thenReturn(NOW_MILLIS); + + BlockHeader header = new BlockHeaderTestFixture().timestamp(BLOCK_TIME_STAMP).buildHeader(); + final ConsensusRoundIdentifier round = + new ConsensusRoundIdentifier(0xFEDBCA9876543210L, 0x12345678); + + final ScheduledFuture mockedFuture = mock(ScheduledFuture.class); + Mockito.>when( + mockExecutorService.schedule(any(Runnable.class), anyLong(), any())) + .thenReturn(mockedFuture); + + timer.startTimer(round, header); + verify(mockExecutorService) + .schedule(any(Runnable.class), eq(EXPECTED_DELAY), eq(TimeUnit.MILLISECONDS)); + } + + @Test + public void aBlockTimerExpiryEventIsAddedToTheQueueOnExpiry() { + + final long MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS = 1_000; + final long NOW_MILLIS = 300_500L; + final long BLOCK_TIME_STAMP = 300; + final long EXPECTED_DELAY = 500; + + when(mockClock.millisecondsSinceEpoch()).thenReturn(NOW_MILLIS); + + BlockHeader header = new BlockHeaderTestFixture().timestamp(BLOCK_TIME_STAMP).buildHeader(); + final ConsensusRoundIdentifier round = + new ConsensusRoundIdentifier(0xFEDBCA9876543210L, 0x12345678); + + BlockTimer timer = + new BlockTimer( + mockQueue, + MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS, + Executors.newSingleThreadScheduledExecutor(), + mockClock); + timer.startTimer(round, header); + + // Verify that the event will not be added to the queue immediately + verify(mockQueue, never()).add(any()); + + await() + .atMost(EXPECTED_DELAY + 200, TimeUnit.MILLISECONDS) + .atLeast(EXPECTED_DELAY - 200, TimeUnit.MILLISECONDS) + .until(timer::isRunning, equalTo(false)); + + ArgumentCaptor ibftEventCaptor = ArgumentCaptor.forClass(IbftEvent.class); + verify(mockQueue).add(ibftEventCaptor.capture()); + + assertThat(ibftEventCaptor.getValue() instanceof BlockTimerExpiry).isTrue(); + assertThat(((BlockTimerExpiry) ibftEventCaptor.getValue()).getRoundIndentifier()) + .isEqualToComparingFieldByField(round); + } + + @Test + public void eventIsImmediatelyAddedToTheQueueIfAbsoluteExpiryIsEqualToNow() { + final long MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS = 15_000; + final long NOW_MILLIS = 515_000L; + final long BLOCK_TIME_STAMP = 500; + + final BlockTimer timer = + new BlockTimer( + mockQueue, MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS, mockExecutorService, mockClock); + + when(mockClock.millisecondsSinceEpoch()).thenReturn(NOW_MILLIS); + + BlockHeader header = new BlockHeaderTestFixture().timestamp(BLOCK_TIME_STAMP).buildHeader(); + final ConsensusRoundIdentifier round = + new ConsensusRoundIdentifier(0xFEDBCA9876543210L, 0x12345678); + + timer.startTimer(round, header); + verify(mockExecutorService, never()).schedule(any(Runnable.class), anyLong(), any()); + + ArgumentCaptor ibftEventCaptor = ArgumentCaptor.forClass(IbftEvent.class); + verify(mockQueue).add(ibftEventCaptor.capture()); + + assertThat(ibftEventCaptor.getValue() instanceof BlockTimerExpiry).isTrue(); + assertThat(((BlockTimerExpiry) ibftEventCaptor.getValue()).getRoundIndentifier()) + .isEqualToComparingFieldByField(round); + } + + @Test + public void eventIsImmediatelyAddedToTheQueueIfAbsoluteExpiryIsInThePast() { + final long MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS = 15_000; + final long NOW_MILLIS = 520_000L; + final long BLOCK_TIME_STAMP = 500L; + + final BlockTimer timer = + new BlockTimer( + mockQueue, MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS, mockExecutorService, mockClock); + + when(mockClock.millisecondsSinceEpoch()).thenReturn(NOW_MILLIS); + + BlockHeader header = new BlockHeaderTestFixture().timestamp(BLOCK_TIME_STAMP).buildHeader(); + final ConsensusRoundIdentifier round = + new ConsensusRoundIdentifier(0xFEDBCA9876543210L, 0x12345678); + + timer.startTimer(round, header); + verify(mockExecutorService, never()).schedule(any(Runnable.class), anyLong(), any()); + + ArgumentCaptor ibftEventCaptor = ArgumentCaptor.forClass(IbftEvent.class); + verify(mockQueue).add(ibftEventCaptor.capture()); + + assertThat(ibftEventCaptor.getValue() instanceof BlockTimerExpiry).isTrue(); + assertThat(((BlockTimerExpiry) ibftEventCaptor.getValue()).getRoundIndentifier()) + .isEqualToComparingFieldByField(round); + } + + @Test + public void startTimerCancelsExistingTimer() { + final long MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS = 15_0000; + final long NOW_MILLIS = 500_000L; + final long BLOCK_TIME_STAMP = 500L; + + final BlockTimer timer = + new BlockTimer( + mockQueue, MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS, mockExecutorService, mockClock); + + when(mockClock.millisecondsSinceEpoch()).thenReturn(NOW_MILLIS); + + BlockHeader header = new BlockHeaderTestFixture().timestamp(BLOCK_TIME_STAMP).buildHeader(); + final ConsensusRoundIdentifier round = + new ConsensusRoundIdentifier(0xFEDBCA9876543210L, 0x12345678); + final ScheduledFuture mockedFuture = mock(ScheduledFuture.class); + + Mockito.>when( + mockExecutorService.schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS))) + .thenReturn(mockedFuture); + timer.startTimer(round, header); + verify(mockedFuture, times(0)).cancel(false); + timer.startTimer(round, header); + verify(mockedFuture, times(1)).cancel(false); + } + + @Test + public void runningFollowsTheStateOfTheTimer() { + final long MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS = 15_0000; + final long NOW_MILLIS = 500_000L; + final long BLOCK_TIME_STAMP = 500L; + + final BlockTimer timer = + new BlockTimer( + mockQueue, MINIMAL_TIME_BETWEEN_BLOCKS_MILLIS, mockExecutorService, mockClock); + + when(mockClock.millisecondsSinceEpoch()).thenReturn(NOW_MILLIS); + + BlockHeader header = new BlockHeaderTestFixture().timestamp(BLOCK_TIME_STAMP).buildHeader(); + final ConsensusRoundIdentifier round = + new ConsensusRoundIdentifier(0xFEDBCA9876543210L, 0x12345678); + + final ScheduledFuture mockedFuture = mock(ScheduledFuture.class); + Mockito.>when( + mockExecutorService.schedule(any(Runnable.class), anyLong(), eq(TimeUnit.MILLISECONDS))) + .thenReturn(mockedFuture); + timer.startTimer(round, header); + when(mockedFuture.isDone()).thenReturn(false); + assertThat(timer.isRunning()).isTrue(); + when(mockedFuture.isDone()).thenReturn(true); + assertThat(timer.isRunning()).isFalse(); + } +}