@ -14,7 +14,7 @@
* /
package org.hyperledger.besu.ethereum.stratum ;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator ;
import org.hyperledger.besu.ethereum.blockcreation.PoW MiningCoordinator ;
import org.hyperledger.besu.ethereum.chain.PoWObserver ;
import org.hyperledger.besu.ethereum.mainnet.EthHash ;
import org.hyperledger.besu.ethereum.mainnet.PoWSolution ;
@ -24,18 +24,39 @@ import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter ;
import java.math.BigInteger ;
import java.nio.charset.StandardCharsets ;
import java.util.List ;
import java.util.Optional ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.function.Function ;
import com.google.common.util.concurrent.AtomicDouble ;
import io.netty.buffer.ByteBuf ;
import io.netty.buffer.Unpooled ;
import io.netty.channel.ChannelHandlerContext ;
import io.netty.channel.ChannelPipeline ;
import io.netty.handler.codec.DecoderResultProvider ;
import io.netty.handler.codec.LineBasedFrameDecoder ;
import io.netty.handler.codec.MessageToMessageDecoder ;
import io.netty.handler.codec.http.DefaultFullHttpResponse ;
import io.netty.handler.codec.http.HttpContent ;
import io.netty.handler.codec.http.HttpMethod ;
import io.netty.handler.codec.http.HttpRequest ;
import io.netty.handler.codec.http.HttpRequestDecoder ;
import io.netty.handler.codec.http.HttpResponseEncoder ;
import io.netty.handler.codec.http.HttpResponseStatus ;
import io.netty.handler.codec.http.HttpVersion ;
import io.netty.handler.codec.http.LastHttpContent ;
import io.netty.handler.codec.string.StringEncoder ;
import io.netty.util.CharsetUtil ;
import io.vertx.core.Future ;
import io.vertx.core.Vertx ;
import io.vertx.core.buffer.Buffer ;
import io.vertx.core.net.NetServer ;
import io.vertx.core.net.NetServerOptions ;
import io.vertx.core.net.NetSocket ;
import io.vertx.core.net.impl.NetSocketInternal ;
import org.apache.tuweni.units.bigints.UInt256 ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
@ -47,6 +68,7 @@ import org.slf4j.LoggerFactory;
public class StratumServer implements PoWObserver {
private static final Logger logger = LoggerFactory . getLogger ( StratumServer . class ) ;
private static final String VERTX_HANDLER_NAME = "handler" ;
private final Vertx vertx ;
private final int port ;
@ -61,7 +83,7 @@ public class StratumServer implements PoWObserver {
public StratumServer (
final Vertx vertx ,
final MiningCoordinator miningCoordinator ,
final PoW MiningCoordinator miningCoordinator ,
final int port ,
final String networkInterface ,
final String extraNonce ,
@ -71,7 +93,7 @@ public class StratumServer implements PoWObserver {
this . networkInterface = networkInterface ;
protocols =
new StratumProtocol [ ] {
new GetWorkProtocol ( miningCoordinator ) ,
new GetWorkProtocol ( miningCoordinator . getEpochCalculator ( ) ) ,
new Stratum1Protocol ( extraNonce , miningCoordinator ) ,
new Stratum1EthProxyProtocol ( miningCoordinator )
} ;
@ -90,65 +112,196 @@ public class StratumServer implements PoWObserver {
BesuMetricCategory . STRATUM , "disconnections" , "Number of disconnections over time" ) ;
}
public Completable Future< ? > start ( ) {
public Future < NetServer > start ( ) {
if ( started . compareAndSet ( false , true ) ) {
logger . info ( "Starting stratum server on {}:{}" , networkInterface , port ) ;
server =
vertx . createNetServer (
new NetServerOptions ( ) . setPort ( port ) . setHost ( networkInterface ) . setTcpKeepAlive ( true ) ) ;
CompletableFuture < ? > result = new CompletableFuture < > ( ) ;
server . connectHandler ( this : : handle ) ;
server . listen (
res - > {
if ( res . failed ( ) ) {
result . completeExceptionally (
new StratumServerException (
String . format (
"Failed to bind Stratum Server listener to %s:%s: %s" ,
networkInterface , port , res . cause ( ) . getMessage ( ) ) ) ) ;
} else {
result . complete ( null ) ;
}
} ) ;
return result ;
return server
. listen ( )
. onSuccess (
v - >
logger . info ( "Stratum server started on {}:{}" , networkInterface , v . actualPort ( ) ) ) ;
}
return Completable Future. completedFuture ( null ) ;
return Future . succeededFuture ( server ) ;
}
private void handle ( final NetSocket socket ) {
connectionsCount . inc ( ) ;
numberOfMiners . incrementAndGet ( ) ;
NetSocketInternal internalSocket = ( NetSocketInternal ) socket ;
StratumConnection conn =
new StratumConnection (
protocols , socket : : close , bytes - > socket . write ( Buffer . buffer ( bytes ) ) ) ;
socket . handler ( conn : : handleBuffer ) ;
protocols , response - > sendLineBasedResponse ( internalSocket , response ) ) ;
ChannelPipeline pipeline = internalSocket . channelHandlerContext ( ) . pipeline ( ) ;
pipeline . addBefore (
VERTX_HANDLER_NAME ,
"stratumDecoder" ,
new HttpRequestDecoder ( ) {
@Override
protected void decode (
final ChannelHandlerContext ctx , final ByteBuf in , final List < Object > out )
throws Exception {
ByteBuf inputCopy = in . copy ( ) ;
int indexBeforeDecode = inputCopy . readerIndex ( ) ;
super . decode ( ctx , inputCopy , out ) ;
if ( out . isEmpty ( ) ) { // to process last chunk
super . decode ( ctx , inputCopy , out ) ;
}
DecoderResultProvider httpDecodingResult =
( DecoderResultProvider ) out . get ( out . size ( ) - 1 ) ;
ChannelPipeline pipeline = ctx . pipeline ( ) ;
if ( httpDecodingResult . decoderResult ( ) . isFailure ( ) ) {
logger . trace ( "Received non-HTTP request, switching to line-based protocol" ) ;
out . remove ( httpDecodingResult ) ;
pipeline . addBefore (
VERTX_HANDLER_NAME , "frameDecoder" , new LineBasedFrameDecoder ( 240 ) ) ;
pipeline . addBefore (
VERTX_HANDLER_NAME ,
"lineTrimmer" ,
new MessageToMessageDecoder < ByteBuf > ( ) {
@Override
protected void decode (
final ChannelHandlerContext ctx ,
final ByteBuf message ,
final List < Object > out ) {
if ( message . readableBytes ( ) > 0
& & message . getByte ( message . readableBytes ( ) - 1 ) = = '\n' ) {
if ( message . readableBytes ( ) > 1
& & message . getByte ( message . readableBytes ( ) - 2 ) = = '\r' ) {
out . add ( message . readRetainedSlice ( message . readableBytes ( ) - 2 ) ) ;
} else {
out . add ( message . readRetainedSlice ( message . readableBytes ( ) - 1 ) ) ;
}
} else {
out . add ( message . retain ( ) ) ;
}
}
} ) ;
pipeline . addBefore (
VERTX_HANDLER_NAME , "stringEncoder" , new StringEncoder ( CharsetUtil . UTF_8 ) ) ;
pipeline . remove ( this ) ;
} else {
String httpEncoderHandlerName = "httpEncoder" ;
if ( pipeline . get ( httpEncoderHandlerName ) = = null ) {
logger . trace ( "Received HTTP request, switching to HTTP protocol" ) ;
pipeline . addBefore (
VERTX_HANDLER_NAME , httpEncoderHandlerName , new HttpResponseEncoder ( ) ) ;
}
if ( httpDecodingResult instanceof HttpRequest request
& & ! request . method ( ) . equals ( HttpMethod . POST ) ) {
logger . debug ( "Received non-POST request" ) ;
in . skipBytes ( in . readableBytes ( ) ) ; // skip body decode
} else {
in . skipBytes ( inputCopy . readerIndex ( ) - indexBeforeDecode ) ;
}
}
}
} ) ;
Buffer requestBody = Buffer . buffer ( ) ;
internalSocket . messageHandler (
obj - > {
try {
if ( obj instanceof HttpRequest request & & ! request . method ( ) . equals ( HttpMethod . POST ) ) {
sendHttpResponse (
internalSocket , HttpResponseStatus . METHOD_NOT_ALLOWED , Unpooled . EMPTY_BUFFER ) ;
} else if ( obj instanceof HttpContent httpContent ) {
requestBody . appendBuffer ( Buffer . buffer ( httpContent . content ( ) ) ) ;
if ( obj instanceof LastHttpContent ) {
try {
conn . handleBuffer (
requestBody ,
response - >
sendHttpResponse (
internalSocket ,
HttpResponseStatus . OK ,
Unpooled . wrappedBuffer ( response . getBytes ( CharsetUtil . UTF_8 ) ) ) ) ;
} catch ( IllegalArgumentException e ) {
logger . warn ( "Invalid message {}" , requestBody ) ;
sendHttpResponse (
internalSocket , HttpResponseStatus . BAD_REQUEST , Unpooled . EMPTY_BUFFER ) ;
} catch ( Exception e ) {
logger . warn ( "Unexpected error" , e ) ;
sendHttpResponse (
internalSocket ,
HttpResponseStatus . INTERNAL_SERVER_ERROR ,
Unpooled . EMPTY_BUFFER ) ;
} finally {
conn . close ( ) ;
}
}
} else if ( obj instanceof ByteBuf value & & value . readableBytes ( ) > 0 ) {
try {
conn . handleBuffer (
Buffer . buffer ( value ) ,
response - > sendLineBasedResponse ( internalSocket , response ) ) ;
} catch ( Exception e ) {
logger . error ( "Error handling request" , e ) ;
internalSocket . close ( ) . onFailure ( ex - > logger . error ( "Error closing socket" , ex ) ) ;
}
}
} catch ( Exception e ) {
pipeline . fireExceptionCaught ( e ) ;
}
} ) ;
internalSocket . exceptionHandler (
ex - > {
logger . error ( "Unknown error" , ex ) ;
internalSocket
. close ( )
. onFailure (
closingException - > logger . error ( "Error closing socket" , closingException ) ) ;
} ) ;
socket . closeHandler (
( aVoid ) - > {
logger . debug ( "Socket closed" ) ;
conn . close ( ) ;
numberOfMiners . decrementAndGet ( ) ;
disconnectionsCount . inc ( ) ;
} ) ;
}
public CompletableFuture < ? > stop ( ) {
private static void sendHttpResponse (
final NetSocketInternal internalSocket ,
final HttpResponseStatus responseStatus ,
final ByteBuf content ) {
ByteBuf contentResponse = content ;
if ( logger . isTraceEnabled ( ) ) {
contentResponse = content . copy ( ) ;
}
internalSocket
. writeMessage (
new DefaultFullHttpResponse ( HttpVersion . HTTP_1_1 , responseStatus , contentResponse ) )
. onFailure ( ex - > logger . error ( "Failed to write response" , ex ) )
. onSuccess (
writeResult - >
internalSocket
. close ( )
. onSuccess (
v - >
logger . trace (
"<< {}" ,
content . isReadable ( )
? content . toString ( StandardCharsets . UTF_8 )
: "no content" ) )
. onFailure ( ex - > logger . error ( "Failed to close socket" , ex ) ) ) ;
}
private static Future < Void > sendLineBasedResponse (
final NetSocketInternal internalSocket , final String response ) {
return internalSocket
. writeMessage ( response + '\n' ) // response is delimited by a newline
. onSuccess ( v - > logger . trace ( "<< {}" , response ) )
. onFailure ( ex - > logger . error ( "Failed to send response: {}" , response , ex ) ) ;
}
public Future < Void > stop ( ) {
if ( started . compareAndSet ( true , false ) ) {
CompletableFuture < ? > result = new CompletableFuture < > ( ) ;
server . close (
res - > {
if ( res . failed ( ) ) {
result . completeExceptionally (
new StratumServerException (
String . format (
"Failed to bind Stratum Server listener to %s:%s: %s" ,
networkInterface , port , res . cause ( ) . getMessage ( ) ) ) ) ;
} else {
result . complete ( null ) ;
}
} ) ;
return result ;
return server . close ( ) ;
}
logger . debug ( "Stopping StratumServer that was not running" ) ;
return CompletableFuture . completedFuture ( null ) ;
return Future . succeededFuture ( ) ;
}
@Override