@ -16,6 +16,7 @@ package org.hyperledger.besu.ethereum.p2p.discovery;
import static com.google.common.base.Preconditions.checkArgument ;
import static com.google.common.base.Preconditions.checkArgument ;
import static com.google.common.base.Preconditions.checkNotNull ;
import static com.google.common.base.Preconditions.checkNotNull ;
import static java.nio.charset.StandardCharsets.UTF_8 ;
import static org.apache.tuweni.bytes.Bytes.wrapBuffer ;
import static org.apache.tuweni.bytes.Bytes.wrapBuffer ;
import org.hyperledger.besu.crypto.NodeKey ;
import org.hyperledger.besu.crypto.NodeKey ;
@ -30,8 +31,12 @@ import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.p2p.peers.Peer ;
import org.hyperledger.besu.ethereum.p2p.peers.Peer ;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId ;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId ;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions ;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions ;
import org.hyperledger.besu.ethereum.storage.StorageProvider ;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier ;
import org.hyperledger.besu.nat.NatService ;
import org.hyperledger.besu.nat.NatService ;
import org.hyperledger.besu.plugin.services.MetricsSystem ;
import org.hyperledger.besu.plugin.services.MetricsSystem ;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage ;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction ;
import org.hyperledger.besu.util.NetworkUtility ;
import org.hyperledger.besu.util.NetworkUtility ;
import org.hyperledger.besu.util.Subscribers ;
import org.hyperledger.besu.util.Subscribers ;
@ -49,6 +54,12 @@ import com.google.common.net.InetAddresses;
import org.apache.logging.log4j.LogManager ;
import org.apache.logging.log4j.LogManager ;
import org.apache.logging.log4j.Logger ;
import org.apache.logging.log4j.Logger ;
import org.apache.tuweni.bytes.Bytes ;
import org.apache.tuweni.bytes.Bytes ;
import org.apache.tuweni.units.bigints.UInt64 ;
import org.ethereum.beacon.discovery.schema.EnrField ;
import org.ethereum.beacon.discovery.schema.IdentitySchema ;
import org.ethereum.beacon.discovery.schema.IdentitySchemaInterpreter ;
import org.ethereum.beacon.discovery.schema.NodeRecord ;
import org.ethereum.beacon.discovery.schema.NodeRecordFactory ;
/ * *
/ * *
* The peer discovery agent is the network component that sends and receives peer discovery messages
* The peer discovery agent is the network component that sends and receives peer discovery messages
@ -56,6 +67,7 @@ import org.apache.tuweni.bytes.Bytes;
* /
* /
public abstract class PeerDiscoveryAgent {
public abstract class PeerDiscoveryAgent {
private static final Logger LOG = LogManager . getLogger ( ) ;
private static final Logger LOG = LogManager . getLogger ( ) ;
private static final String SEQ_NO_STORE_KEY = "local-enr-seqno" ;
// The devp2p specification says only accept packets up to 1280, but some
// The devp2p specification says only accept packets up to 1280, but some
// clients ignore that, so we add in a little extra padding.
// clients ignore that, so we add in a little extra padding.
@ -83,12 +95,15 @@ public abstract class PeerDiscoveryAgent {
private boolean isActive = false ;
private boolean isActive = false ;
protected final Subscribers < PeerBondedObserver > peerBondedObservers = Subscribers . create ( ) ;
protected final Subscribers < PeerBondedObserver > peerBondedObservers = Subscribers . create ( ) ;
private final StorageProvider storageProvider ;
protected PeerDiscoveryAgent (
protected PeerDiscoveryAgent (
final NodeKey nodeKey ,
final NodeKey nodeKey ,
final DiscoveryConfiguration config ,
final DiscoveryConfiguration config ,
final PeerPermissions peerPermissions ,
final PeerPermissions peerPermissions ,
final NatService natService ,
final NatService natService ,
final MetricsSystem metricsSystem ) {
final MetricsSystem metricsSystem ,
final StorageProvider storageProvider ) {
this . metricsSystem = metricsSystem ;
this . metricsSystem = metricsSystem ;
checkArgument ( nodeKey ! = null , "nodeKey cannot be null" ) ;
checkArgument ( nodeKey ! = null , "nodeKey cannot be null" ) ;
checkArgument ( config ! = null , "provided configuration cannot be null" ) ;
checkArgument ( config ! = null , "provided configuration cannot be null" ) ;
@ -104,6 +119,8 @@ public abstract class PeerDiscoveryAgent {
this . nodeKey = nodeKey ;
this . nodeKey = nodeKey ;
id = nodeKey . getPublicKey ( ) . getEncodedBytes ( ) ;
id = nodeKey . getPublicKey ( ) . getEncodedBytes ( ) ;
this . storageProvider = storageProvider ;
}
}
protected abstract TimerUtil createTimer ( ) ;
protected abstract TimerUtil createTimer ( ) ;
@ -144,6 +161,7 @@ public abstract class PeerDiscoveryAgent {
isActive = true ;
isActive = true ;
LOG . info ( "P2P peer discovery agent started and listening on {}" , localAddress ) ;
LOG . info ( "P2P peer discovery agent started and listening on {}" , localAddress ) ;
startController ( ourNode ) ;
startController ( ourNode ) ;
addLocalNodeRecord ( id , advertisedAddress , tcpPort , discoveryPort ) ;
return discoveryPort ;
return discoveryPort ;
} ) ;
} ) ;
} else {
} else {
@ -152,6 +170,45 @@ public abstract class PeerDiscoveryAgent {
}
}
}
}
private void addLocalNodeRecord (
final Bytes nodeId ,
final String advertisedAddress ,
final Integer tcpPort ,
final Integer udpPort ) {
final KeyValueStorage keyValueStorage =
storageProvider . getStorageBySegmentIdentifier ( KeyValueSegmentIdentifier . BLOCKCHAIN ) ;
final NodeRecordFactory nodeRecordFactory = new NodeRecordFactory ( IdentitySchemaInterpreter . V4 ) ;
final Optional < NodeRecord > existingNodeRecord =
keyValueStorage
. get ( Bytes . of ( SEQ_NO_STORE_KEY . getBytes ( UTF_8 ) ) . toArray ( ) )
. map ( Bytes : : of )
. flatMap ( b - > Optional . of ( nodeRecordFactory . fromBytes ( b ) ) ) ;
final Bytes addressBytes = Bytes . of ( advertisedAddress . getBytes ( UTF_8 ) ) ;
if ( existingNodeRecord . isEmpty ( )
| | ! existingNodeRecord . get ( ) . getNodeId ( ) . equals ( nodeId )
| | ! addressBytes . equals ( existingNodeRecord . get ( ) . get ( EnrField . IP_V4 ) )
| | ! tcpPort . equals ( existingNodeRecord . get ( ) . get ( EnrField . TCP ) )
| | ! udpPort . equals ( existingNodeRecord . get ( ) . get ( EnrField . UDP ) ) ) {
final UInt64 sequenceNumber =
existingNodeRecord . map ( NodeRecord : : getSeq ) . orElse ( UInt64 . ZERO ) . add ( 1 ) ;
final NodeRecord nodeRecord =
nodeRecordFactory . createFromValues (
sequenceNumber ,
new EnrField ( EnrField . ID , IdentitySchema . V4 ) ,
new EnrField ( EnrField . PKEY_SECP256K1 , nodeId ) ,
new EnrField ( EnrField . IP_V4 , addressBytes ) ,
new EnrField ( EnrField . TCP , tcpPort ) ,
new EnrField ( EnrField . UDP , udpPort ) ) ;
final KeyValueStorageTransaction keyValueStorageTransaction =
keyValueStorage . startTransaction ( ) ;
keyValueStorageTransaction . put (
Bytes . wrap ( SEQ_NO_STORE_KEY . getBytes ( UTF_8 ) ) . toArray ( ) , nodeRecord . serialize ( ) . toArray ( ) ) ;
keyValueStorageTransaction . commit ( ) ;
}
}
public void addPeerRequirement ( final PeerRequirement peerRequirement ) {
public void addPeerRequirement ( final PeerRequirement peerRequirement ) {
this . peerRequirements . add ( peerRequirement ) ;
this . peerRequirements . add ( peerRequirement ) ;
}
}