mirror of https://github.com/hyperledger/besu
Events Plugin - Add initial "NewBlockPropagated" event message (#1463)
* It returns for any block we would broadcast to other peers, when we would broadcast them. * It returns a JSON String containing hash, number, and timestamp * This event data is not set in stone, it may change in type or content. * Acceptance tests and unit tests got a re-work away from the assumption that there is only one plugin type. Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>pull/2/head
parent
a8fb76ba63
commit
8d1e224c96
@ -0,0 +1,61 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.tests.acceptance.plugins; |
||||
|
||||
import tech.pegasys.pantheon.tests.acceptance.dsl.AcceptanceTestBase; |
||||
import tech.pegasys.pantheon.tests.acceptance.dsl.node.PantheonNode; |
||||
|
||||
import java.io.File; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.util.Collections; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Stream; |
||||
|
||||
import org.awaitility.Awaitility; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
public class PantheonEventsPluginTest extends AcceptanceTestBase { |
||||
private PantheonNode pluginNode; |
||||
private PantheonNode minerNode; |
||||
|
||||
@Before |
||||
public void setUp() throws Exception { |
||||
minerNode = pantheon.createMinerNode("minerNode"); |
||||
pluginNode = |
||||
pantheon.createPluginsNode( |
||||
"node1", Collections.singletonList("testPlugin"), Collections.emptyList()); |
||||
cluster.start(pluginNode, minerNode); |
||||
} |
||||
|
||||
@Test |
||||
public void blockIsAnnounded() { |
||||
waitForFile(pluginNode.homeDirectory().resolve("plugins/newBlock.2")); |
||||
} |
||||
|
||||
private void waitForFile(final Path path) { |
||||
final File file = path.toFile(); |
||||
Awaitility.waitAtMost(30, TimeUnit.SECONDS) |
||||
.until( |
||||
() -> { |
||||
if (file.exists()) { |
||||
try (final Stream<String> s = Files.lines(path)) { |
||||
return s.count() > 0; |
||||
} |
||||
} else { |
||||
return false; |
||||
} |
||||
}); |
||||
} |
||||
} |
@ -0,0 +1,53 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.Block; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.BlockBroadcaster; |
||||
import tech.pegasys.pantheon.plugins.services.PantheonEvents; |
||||
|
||||
import com.google.common.collect.ImmutableMap; |
||||
import io.vertx.core.json.Json; |
||||
|
||||
public class PantheonEventsImpl implements PantheonEvents { |
||||
private final BlockBroadcaster blockBroadcaster; |
||||
|
||||
public PantheonEventsImpl(final BlockBroadcaster blockBroadcaster) { |
||||
this.blockBroadcaster = blockBroadcaster; |
||||
} |
||||
|
||||
@Override |
||||
public Object addNewBlockPropagatedListener(final NewBlockPropagatedListener listener) { |
||||
return blockBroadcaster.subscribePropagateNewBlocks( |
||||
block -> dispatchNewBlockPropagatedMessage(block, listener)); |
||||
} |
||||
|
||||
@Override |
||||
public void removeNewBlockPropagatedListener(final Object listenerIdentifier) { |
||||
if (listenerIdentifier instanceof Long) { |
||||
blockBroadcaster.unsubscribePropagateNewBlocks((Long) listenerIdentifier); |
||||
} |
||||
} |
||||
|
||||
private void dispatchNewBlockPropagatedMessage( |
||||
final Block block, final NewBlockPropagatedListener listener) { |
||||
final ImmutableMap<Object, Object> result = |
||||
new ImmutableMap.Builder<>() |
||||
.put("type", "NewBlock") |
||||
.put("blockHash", block.getHash().toString()) |
||||
.put("blockNumber", block.getHeader().getNumber()) |
||||
.put("timestamp", block.getHeader().getTimestamp()) |
||||
.build(); |
||||
listener.newBlockPropagated(Json.encode(result)); |
||||
} |
||||
} |
@ -0,0 +1,31 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services; |
||||
|
||||
import tech.pegasys.pantheon.plugins.services.PicoCLIOptions; |
||||
|
||||
import picocli.CommandLine; |
||||
|
||||
public class PicoCLIOptionsImpl implements PicoCLIOptions { |
||||
|
||||
private final CommandLine commandLine; |
||||
|
||||
public PicoCLIOptionsImpl(final CommandLine commandLine) { |
||||
this.commandLine = commandLine; |
||||
} |
||||
|
||||
@Override |
||||
public void addPicoCLIOptions(final String namespace, final Object optionObject) { |
||||
commandLine.addMixin("Plugin " + namespace, optionObject); |
||||
} |
||||
} |
@ -0,0 +1,95 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
import tech.pegasys.pantheon.ethereum.core.Block; |
||||
import tech.pegasys.pantheon.ethereum.core.BlockBody; |
||||
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; |
||||
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; |
||||
import tech.pegasys.pantheon.ethereum.eth.sync.BlockBroadcaster; |
||||
import tech.pegasys.pantheon.util.uint.UInt256; |
||||
|
||||
import java.util.Collections; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
import java.util.stream.Stream; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.junit.MockitoJUnitRunner; |
||||
|
||||
@RunWith(MockitoJUnitRunner.class) |
||||
public class PantheonEventsImplTest { |
||||
|
||||
@Mock private EthPeers ethPeers; |
||||
@Mock private EthContext mockEthContext; |
||||
private BlockBroadcaster blockBroadcaster; |
||||
private PantheonEventsImpl serviceImpl; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
when(ethPeers.streamAvailablePeers()).thenReturn(Stream.empty()).thenReturn(Stream.empty()); |
||||
when(mockEthContext.getEthPeers()).thenReturn(ethPeers); |
||||
|
||||
blockBroadcaster = new BlockBroadcaster(mockEthContext); |
||||
serviceImpl = new PantheonEventsImpl(blockBroadcaster); |
||||
} |
||||
|
||||
@Test |
||||
public void eventFiresAfterSubscribe() { |
||||
final AtomicReference<String> result = new AtomicReference<>(); |
||||
serviceImpl.addNewBlockPropagatedListener(result::set); |
||||
|
||||
assertThat(result.get()).isNull(); |
||||
blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); |
||||
|
||||
assertThat(result.get()).isNotEmpty(); |
||||
} |
||||
|
||||
@Test |
||||
public void eventDoesNotFireAfterUnsubscribe() { |
||||
final AtomicReference<String> result = new AtomicReference<>(); |
||||
final Object id = serviceImpl.addNewBlockPropagatedListener(result::set); |
||||
|
||||
assertThat(result.get()).isNull(); |
||||
blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); |
||||
|
||||
serviceImpl.removeNewBlockPropagatedListener(id); |
||||
result.set(null); |
||||
|
||||
blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); |
||||
assertThat(result.get()).isNull(); |
||||
} |
||||
|
||||
@Test |
||||
public void propagationWithoutSubscriptionsCompletes() { |
||||
blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); |
||||
} |
||||
|
||||
@Test |
||||
public void uselessUnsubscribesCompletes() { |
||||
serviceImpl.removeNewBlockPropagatedListener("doesNotExist"); |
||||
serviceImpl.removeNewBlockPropagatedListener(5); |
||||
serviceImpl.removeNewBlockPropagatedListener(5L); |
||||
} |
||||
|
||||
private Block generateBlock() { |
||||
final BlockBody body = new BlockBody(Collections.emptyList(), Collections.emptyList()); |
||||
return new Block(new BlockHeaderTestFixture().buildHeader(), body); |
||||
} |
||||
} |
@ -0,0 +1,80 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.services; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import picocli.CommandLine; |
||||
import picocli.CommandLine.Command; |
||||
import picocli.CommandLine.Option; |
||||
import picocli.CommandLine.UnmatchedArgumentException; |
||||
|
||||
public class PicoCLIOptionsImplTest { |
||||
|
||||
@Command |
||||
static final class SimpleCommand { |
||||
|
||||
@Option(names = "--existing") |
||||
String existingOption = "defaultexisting"; |
||||
} |
||||
|
||||
static final class MixinOptions { |
||||
@Option(names = "--mixin") |
||||
String mixinOption = "defaultmixin"; |
||||
} |
||||
|
||||
private SimpleCommand command; |
||||
private MixinOptions mixin; |
||||
private CommandLine commandLine; |
||||
private PicoCLIOptionsImpl serviceImpl; |
||||
|
||||
@Before |
||||
public void setUp() throws Exception { |
||||
command = new SimpleCommand(); |
||||
mixin = new MixinOptions(); |
||||
commandLine = new CommandLine(command); |
||||
serviceImpl = new PicoCLIOptionsImpl(commandLine); |
||||
|
||||
serviceImpl.addPicoCLIOptions("Test 1", mixin); |
||||
} |
||||
|
||||
@Test |
||||
public void testSimpleOptionParse() { |
||||
commandLine.parseArgs("--existing", "1", "--mixin", "2"); |
||||
assertThat(command.existingOption).isEqualTo("1"); |
||||
assertThat(mixin.mixinOption).isEqualTo("2"); |
||||
} |
||||
|
||||
@Test |
||||
public void testUnsetOptionLeavesDefault() { |
||||
commandLine.parseArgs("--existing", "1"); |
||||
assertThat(command.existingOption).isEqualTo("1"); |
||||
assertThat(mixin.mixinOption).isEqualTo("defaultmixin"); |
||||
} |
||||
|
||||
@Test |
||||
public void testMixinOptionOnly() { |
||||
commandLine.parseArgs("--mixin", "2"); |
||||
assertThat(command.existingOption).isEqualTo("defaultexisting"); |
||||
assertThat(mixin.mixinOption).isEqualTo("2"); |
||||
} |
||||
|
||||
@Test |
||||
public void testNotExistantOptionsFail() { |
||||
assertThatExceptionOfType(UnmatchedArgumentException.class) |
||||
.isThrownBy(() -> commandLine.parseArgs("--does-not-exist", "1")); |
||||
} |
||||
} |
@ -0,0 +1,77 @@ |
||||
/* |
||||
* Copyright 2019 ConsenSys AG. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
||||
* specific language governing permissions and limitations under the License. |
||||
*/ |
||||
package tech.pegasys.pantheon.plugins; |
||||
|
||||
import tech.pegasys.pantheon.plugins.services.PantheonEvents; |
||||
|
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.nio.file.Files; |
||||
import java.util.Collections; |
||||
import java.util.Optional; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
import org.apache.logging.log4j.LogManager; |
||||
import org.apache.logging.log4j.Logger; |
||||
|
||||
@AutoService(PantheonPlugin.class) |
||||
public class TestPantheonEventsPlugin implements PantheonPlugin { |
||||
private static final Logger LOG = LogManager.getLogger(); |
||||
private Optional<Object> subscriptionId; |
||||
|
||||
private PantheonContext context; |
||||
private final AtomicInteger blockCounter = new AtomicInteger(); |
||||
|
||||
@Override |
||||
public void register(final PantheonContext context) { |
||||
this.context = context; |
||||
LOG.info("Regisgered"); |
||||
} |
||||
|
||||
@Override |
||||
public void start() { |
||||
subscriptionId = |
||||
context |
||||
.getService(PantheonEvents.class) |
||||
.map(events -> events.addNewBlockPropagatedListener(this::onBlockAnnounce)); |
||||
LOG.info("Listening with ID#" + subscriptionId); |
||||
} |
||||
|
||||
@Override |
||||
public void stop() { |
||||
subscriptionId.ifPresent( |
||||
id -> |
||||
context |
||||
.getService(PantheonEvents.class) |
||||
.ifPresent(pantheonEvents -> pantheonEvents.removeNewBlockPropagatedListener(id))); |
||||
LOG.info("No longer listening with ID#" + subscriptionId); |
||||
} |
||||
|
||||
private void onBlockAnnounce(final String json) { |
||||
final int blockCount = blockCounter.incrementAndGet(); |
||||
LOG.info("I got a new block! (I've seen {}) - {}", blockCount, json); |
||||
try { |
||||
final File callbackFile = |
||||
new File(System.getProperty("pantheon.plugins.dir", "plugins"), "newBlock." + blockCount); |
||||
if (!callbackFile.getParentFile().exists()) { |
||||
callbackFile.getParentFile().mkdirs(); |
||||
callbackFile.getParentFile().deleteOnExit(); |
||||
} |
||||
Files.write(callbackFile.toPath(), Collections.singletonList(json)); |
||||
callbackFile.deleteOnExit(); |
||||
} catch (final IOException ioe) { |
||||
throw new RuntimeException(ioe); |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue