|
|
|
@ -54,7 +54,6 @@ import java.util.concurrent.atomic.AtomicReference; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
import java.util.stream.Stream; |
|
|
|
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting; |
|
|
|
|
import org.apache.tuweni.bytes.Bytes; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
@ -76,7 +75,7 @@ public class RlpxAgent { |
|
|
|
|
// without allowing the counterparty to play nodeId farming games
|
|
|
|
|
private final Bytes nodeIdMask = Bytes.random(SECPPublicKey.BYTE_LENGTH); |
|
|
|
|
|
|
|
|
|
@VisibleForTesting final Map<Bytes, RlpxConnection> connectionsById = new ConcurrentHashMap<>(); |
|
|
|
|
final Map<Bytes, RlpxConnection> connectionsById = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
|
|
private final AtomicBoolean started = new AtomicBoolean(false); |
|
|
|
|
private final AtomicBoolean stopped = new AtomicBoolean(false); |
|
|
|
@ -182,7 +181,7 @@ public class RlpxAgent { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
public void disconnect(final Bytes peerId, final DisconnectReason reason) { |
|
|
|
|
RlpxConnection connection = connectionsById.remove(peerId); |
|
|
|
|
final RlpxConnection connection = connectionsById.remove(peerId); |
|
|
|
|
if (connection != null) { |
|
|
|
|
connection.disconnect(reason); |
|
|
|
|
} |
|
|
|
@ -240,7 +239,8 @@ public class RlpxAgent { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Initiate connection or return existing connection
|
|
|
|
|
AtomicReference<CompletableFuture<PeerConnection>> connectionFuture = new AtomicReference<>(); |
|
|
|
|
final AtomicReference<CompletableFuture<PeerConnection>> connectionFuture = |
|
|
|
|
new AtomicReference<>(); |
|
|
|
|
connectionsById.compute( |
|
|
|
|
peer.getId(), |
|
|
|
|
(id, existingConnection) -> { |
|
|
|
@ -252,7 +252,7 @@ public class RlpxAgent { |
|
|
|
|
// We're initiating a new connection
|
|
|
|
|
final CompletableFuture<PeerConnection> future = initiateOutboundConnection(peer); |
|
|
|
|
connectionFuture.set(future); |
|
|
|
|
RlpxConnection newConnection = RlpxConnection.outboundConnection(peer, future); |
|
|
|
|
final RlpxConnection newConnection = RlpxConnection.outboundConnection(peer, future); |
|
|
|
|
newConnection.subscribeConnectionEstablished( |
|
|
|
|
(conn) -> { |
|
|
|
|
this.dispatchConnect(conn.getPeerConnection()); |
|
|
|
|