@ -49,7 +49,6 @@ final class NettyPeerConnection implements PeerConnection {
private final PeerInfo peerInfo ;
private final PeerInfo peerInfo ;
private final Set < Capability > agreedCapabilities ;
private final Set < Capability > agreedCapabilities ;
private final Map < String , Capability > protocolToCapability = new HashMap < > ( ) ;
private final Map < String , Capability > protocolToCapability = new HashMap < > ( ) ;
private final AtomicBoolean disconnectDispatched = new AtomicBoolean ( false ) ;
private final AtomicBoolean disconnected = new AtomicBoolean ( false ) ;
private final AtomicBoolean disconnected = new AtomicBoolean ( false ) ;
private final Callbacks callbacks ;
private final Callbacks callbacks ;
private final CapabilityMultiplexer multiplexer ;
private final CapabilityMultiplexer multiplexer ;
@ -81,6 +80,10 @@ final class NettyPeerConnection implements PeerConnection {
if ( isDisconnected ( ) ) {
if ( isDisconnected ( ) ) {
throw new PeerNotConnected ( "Attempt to send message to a closed peer connection" ) ;
throw new PeerNotConnected ( "Attempt to send message to a closed peer connection" ) ;
}
}
doSend ( capability , message ) ;
}
private void doSend ( final Capability capability , final MessageData message ) {
if ( capability ! = null ) {
if ( capability ! = null ) {
// Validate message is valid for this capability
// Validate message is valid for this capability
final SubProtocol subProtocol = multiplexer . subProtocol ( capability ) ;
final SubProtocol subProtocol = multiplexer . subProtocol ( capability ) ;
@ -133,10 +136,9 @@ final class NettyPeerConnection implements PeerConnection {
@Override
@Override
public void terminateConnection ( final DisconnectReason reason , final boolean peerInitiated ) {
public void terminateConnection ( final DisconnectReason reason , final boolean peerInitiated ) {
if ( disconnectDispatch ed . compareAndSet ( false , true ) ) {
if ( disconnected . compareAndSet ( false , true ) ) {
LOG . debug ( "Disconnected ({}) from {}" , reason , peerInfo ) ;
LOG . debug ( "Disconnected ({}) from {}" , reason , peerInfo ) ;
callbacks . invokeDisconnect ( this , reason , peerInitiated ) ;
callbacks . invokeDisconnect ( this , reason , peerInitiated ) ;
disconnected . set ( true ) ;
}
}
// Always ensure the context gets closed immediately even if we previously sent a disconnect
// Always ensure the context gets closed immediately even if we previously sent a disconnect
// message and are waiting to close.
// message and are waiting to close.
@ -145,16 +147,10 @@ final class NettyPeerConnection implements PeerConnection {
@Override
@Override
public void disconnect ( final DisconnectReason reason ) {
public void disconnect ( final DisconnectReason reason ) {
if ( disconnectDispatch ed . compareAndSet ( false , true ) ) {
if ( disconnected . compareAndSet ( false , true ) ) {
LOG . debug ( "Disconnecting ({}) from {}" , reason , peerInfo ) ;
LOG . debug ( "Disconnecting ({}) from {}" , reason , peerInfo ) ;
callbacks . invokeDisconnect ( this , reason , false ) ;
callbacks . invokeDisconnect ( this , reason , false ) ;
try {
doSend ( null , DisconnectMessage . create ( reason ) ) ;
send ( null , DisconnectMessage . create ( reason ) ) ;
} catch ( final PeerNotConnected e ) {
// The connection has already been closed - nothing left to do
return ;
}
disconnected . set ( true ) ;
ctx . channel ( ) . eventLoop ( ) . schedule ( ( Callable < ChannelFuture > ) ctx : : close , 2L , SECONDS ) ;
ctx . channel ( ) . eventLoop ( ) . schedule ( ( Callable < ChannelFuture > ) ctx : : close , 2L , SECONDS ) ;
}
}
}
}