Add totalDiffculty to BlockPropagated events. (#97)

* Add totalDiffculty to BlockPropagated events.
The chain head block can be contentious with many fork blocks (ommers) propagating on the network.  We should add a totalDifficulty to make it easier to see which block is most likely the current head.
- added `BlockPropagated` interface in `plugin-api`.
- updated `BesuEvents.onBlockPropagated` method to take a `BlockPropagated` instead of a `BlockHeader`.
- created `BlockPropagatedSubscriber` in `BlockBroadcaster`.
- changed type of `BlockBroadcaster.blockPropagatedSubscribers` from `Consumer<Block>` to `BlockPropagatedSubscriber`.
- updated unit tests accordingly to all changes.
- updated known hash in `build.gradle` file of `plugin-api`: new value is `4SAeaZIJMsDvUK5Wp2RzU8TlHacslALnM/4yvVhsMtY=`

Signed-off-by: Abdelhamid Bakhta <abdelhamid.bakhta@consensys.net>
pull/101/head
Abdelhamid Bakhta 5 years ago committed by GitHub
parent b4f1becc05
commit 5e819febbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java
  2. 4
      besu/src/test/java/org/hyperledger/besu/plugins/TestBesuEventsPlugin.java
  3. 17
      besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java
  4. 8
      build.gradle
  5. 14
      ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockBroadcaster.java
  6. 2
      plugin-api/build.gradle
  7. 36
      plugin-api/src/main/java/org/hyperledger/besu/plugin/data/PropagatedBlockContext.java
  8. 6
      plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java

@ -17,8 +17,13 @@ package org.hyperledger.besu.services;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.Quantity;
import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.BesuEvents;
import java.util.function.Supplier;
public class BesuEventsImpl implements BesuEvents { public class BesuEventsImpl implements BesuEvents {
private final BlockBroadcaster blockBroadcaster; private final BlockBroadcaster blockBroadcaster;
private final TransactionPool transactionPool; private final TransactionPool transactionPool;
@ -36,7 +41,9 @@ public class BesuEventsImpl implements BesuEvents {
@Override @Override
public long addBlockPropagatedListener(final BlockPropagatedListener listener) { public long addBlockPropagatedListener(final BlockPropagatedListener listener) {
return blockBroadcaster.subscribePropagateNewBlocks( return blockBroadcaster.subscribePropagateNewBlocks(
block -> listener.onBlockPropagated(block.getHeader())); (block, totalDifficulty) ->
listener.onBlockPropagated(
blockPropagatedContext(block::getHeader, () -> totalDifficulty)));
} }
@Override @Override
@ -75,4 +82,20 @@ public class BesuEventsImpl implements BesuEvents {
public void removeSyncStatusListener(final long listenerIdentifier) { public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.removeSyncStatusListener(listenerIdentifier); syncState.removeSyncStatusListener(listenerIdentifier);
} }
private static PropagatedBlockContext blockPropagatedContext(
final Supplier<BlockHeader> blockHeaderSupplier,
final Supplier<Quantity> totalDifficultySupplier) {
return new PropagatedBlockContext() {
@Override
public BlockHeader getBlockHeader() {
return blockHeaderSupplier.get();
}
@Override
public Quantity getTotalDifficulty() {
return totalDifficultySupplier.get();
}
};
}
} }

@ -17,6 +17,7 @@ package org.hyperledger.besu.plugins;
import org.hyperledger.besu.plugin.BesuContext; import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.BesuPlugin; import org.hyperledger.besu.plugin.BesuPlugin;
import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.BesuEvents;
import java.io.File; import java.io.File;
@ -66,7 +67,8 @@ public class TestBesuEventsPlugin implements BesuPlugin {
LOG.info("No longer listening with ID#" + subscriptionId); LOG.info("No longer listening with ID#" + subscriptionId);
} }
private void onBlockAnnounce(final BlockHeader header) { private void onBlockAnnounce(final PropagatedBlockContext propagatedBlockContext) {
final BlockHeader header = propagatedBlockContext.getBlockHeader();
final int blockCount = blockCounter.incrementAndGet(); final int blockCount = blockCounter.incrementAndGet();
LOG.info("I got a new block! (I've seen {}) - {}", blockCount, header); LOG.info("I got a new block! (I've seen {}) - {}", blockCount, header);
try { try {

@ -43,7 +43,7 @@ import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
import org.hyperledger.besu.ethereum.mainnet.ValidationResult; import org.hyperledger.besu.ethereum.mainnet.ValidationResult;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.data.Transaction; import org.hyperledger.besu.plugin.data.Transaction;
import org.hyperledger.besu.testutil.TestClock; import org.hyperledger.besu.testutil.TestClock;
@ -144,24 +144,29 @@ public class BesuEventsImplTest {
@Test @Test
public void newBlockEventFiresAfterSubscribe() { public void newBlockEventFiresAfterSubscribe() {
final AtomicReference<BlockHeader> result = new AtomicReference<>(); final AtomicReference<PropagatedBlockContext> result = new AtomicReference<>();
serviceImpl.addBlockPropagatedListener(result::set); serviceImpl.addBlockPropagatedListener(result::set);
final Block block = generateBlock();
assertThat(result.get()).isNull(); assertThat(result.get()).isNull();
blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); blockBroadcaster.propagate(block, UInt256.of(1));
assertThat(result.get()).isNotNull(); assertThat(result.get()).isNotNull();
assertThat(result.get().getBlockHeader()).isEqualTo(block.getHeader());
assertThat(result.get().getTotalDifficulty()).isEqualTo(UInt256.of(1));
} }
@Test @Test
public void newBlockEventDoesNotFireAfterUnsubscribe() { public void newBlockEventDoesNotFireAfterUnsubscribe() {
final AtomicReference<BlockHeader> result = new AtomicReference<>(); final AtomicReference<PropagatedBlockContext> result = new AtomicReference<>();
final long id = serviceImpl.addBlockPropagatedListener(result::set); final long id = serviceImpl.addBlockPropagatedListener(result::set);
assertThat(result.get()).isNull(); assertThat(result.get()).isNull();
blockBroadcaster.propagate(generateBlock(), UInt256.of(1)); final Block block = generateBlock();
blockBroadcaster.propagate(block, UInt256.of(2));
assertThat(result.get()).isNotNull(); assertThat(result.get()).isNotNull();
assertThat(result.get().getBlockHeader()).isEqualTo(block.getHeader());
assertThat(result.get().getTotalDifficulty()).isEqualTo(UInt256.of(2));
serviceImpl.removeBlockPropagatedListener(id); serviceImpl.removeBlockPropagatedListener(id);
result.set(null); result.set(null);

@ -268,7 +268,7 @@ allprojects {
task deploy() {} task deploy() {}
task checkMavenCoordianteCollisions { task checkMavenCoordinateCollisions {
doLast { doLast {
def coordinates = [:] def coordinates = [:]
getAllprojects().forEach { getAllprojects().forEach {
@ -276,8 +276,8 @@ task checkMavenCoordianteCollisions {
def coordinate = it.publishing?.publications[0].coordinates def coordinate = it.publishing?.publications[0].coordinates
if (coordinates.containsKey(coordinate)) { if (coordinates.containsKey(coordinate)) {
throw new GradleException("Duplicate maven coordinates detected, ${coordinate} is used by " + throw new GradleException("Duplicate maven coordinates detected, ${coordinate} is used by " +
"both ${coordinates[coordinate]} and ${it.path}.\n" + "both ${coordinates[coordinate]} and ${it.path}.\n" +
"Please add a `publishing` script block to one or both subprojects.") "Please add a `publishing` script block to one or both subprojects.")
} }
coordinates[coordinate] = it.path coordinates[coordinate] = it.path
} }
@ -287,7 +287,7 @@ task checkMavenCoordianteCollisions {
tasks.register('checkPluginAPIChanges', DefaultTask) { } tasks.register('checkPluginAPIChanges', DefaultTask) { }
checkPluginAPIChanges.dependsOn(':plugin-api:checkAPIChanges') checkPluginAPIChanges.dependsOn(':plugin-api:checkAPIChanges')
check.dependsOn('checkPluginAPIChanges', 'checkMavenCoordianteCollisions') check.dependsOn('checkPluginAPIChanges', 'checkMavenCoordinateCollisions')
subprojects { subprojects {

@ -21,8 +21,6 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.util.Subscribers; import org.hyperledger.besu.util.Subscribers;
import org.hyperledger.besu.util.uint.UInt256; import org.hyperledger.besu.util.uint.UInt256;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -30,13 +28,14 @@ public class BlockBroadcaster {
private static final Logger LOG = LogManager.getLogger(); private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext; private final EthContext ethContext;
private final Subscribers<Consumer<Block>> blockPropagatedSubscribers = Subscribers.create(); private final Subscribers<BlockPropagatedSubscriber> blockPropagatedSubscribers =
Subscribers.create();
public BlockBroadcaster(final EthContext ethContext) { public BlockBroadcaster(final EthContext ethContext) {
this.ethContext = ethContext; this.ethContext = ethContext;
} }
public long subscribePropagateNewBlocks(final Consumer<Block> callback) { public long subscribePropagateNewBlocks(final BlockPropagatedSubscriber callback) {
return blockPropagatedSubscribers.subscribe(callback); return blockPropagatedSubscribers.subscribe(callback);
} }
@ -45,7 +44,7 @@ public class BlockBroadcaster {
} }
public void propagate(final Block block, final UInt256 totalDifficulty) { public void propagate(final Block block, final UInt256 totalDifficulty) {
blockPropagatedSubscribers.forEach(listener -> listener.accept(block)); blockPropagatedSubscribers.forEach(listener -> listener.accept(block, totalDifficulty));
final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty); final NewBlockMessage newBlockMessage = NewBlockMessage.create(block, totalDifficulty);
ethContext ethContext
.getEthPeers() .getEthPeers()
@ -61,4 +60,9 @@ public class BlockBroadcaster {
} }
}); });
} }
@FunctionalInterface
public interface BlockPropagatedSubscriber {
void accept(Block block, UInt256 totalDifficulty);
}
} }

@ -56,7 +56,7 @@ Calculated : ${currentHash}
tasks.register('checkAPIChanges', FileStateChecker) { tasks.register('checkAPIChanges', FileStateChecker) {
description = "Checks that the API for the Plugin-API project does not change without deliberate thought" description = "Checks that the API for the Plugin-API project does not change without deliberate thought"
files = sourceSets.main.allJava.files files = sourceSets.main.allJava.files
knownHash = 'b8BCiNvy9vSYsTtS1NfszsVT0EMV8tiNc7igzXzTrak=' knownHash = 'XMBMdVNS2VAa+WN3lyEAKmXznu4ExSIHA2arYUeo0eo='
} }
check.dependsOn('checkAPIChanges') check.dependsOn('checkAPIChanges')

@ -0,0 +1,36 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.data;
import org.hyperledger.besu.plugin.Unstable;
/** The minimum set of data for a PropagatedBlockContext. */
@Unstable
public interface PropagatedBlockContext {
/**
* A {@link BlockHeader} object.
*
* @return A {@link BlockHeader}
*/
BlockHeader getBlockHeader();
/**
* A scalar value corresponding to the total difficulty.
*
* @return A scalar value corresponding to the total difficulty.
*/
Quantity getTotalDifficulty();
}

@ -15,7 +15,7 @@
package org.hyperledger.besu.plugin.services; package org.hyperledger.besu.plugin.services;
import org.hyperledger.besu.plugin.Unstable; import org.hyperledger.besu.plugin.Unstable;
import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.data.Transaction; import org.hyperledger.besu.plugin.data.Transaction;
@ -109,9 +109,9 @@ public interface BesuEvents {
* <p>The block may not have been imported to the local chain yet and may fail later * <p>The block may not have been imported to the local chain yet and may fail later
* validations. * validations.
* *
* @param blockHeader the new block header. * @param propagatedBlockContext block being propagated.
*/ */
void onBlockPropagated(BlockHeader blockHeader); void onBlockPropagated(PropagatedBlockContext propagatedBlockContext);
} }
/** The listener interface for receiving new transaction added events. */ /** The listener interface for receiving new transaction added events. */

Loading…
Cancel
Save