From 33ac7e1ec9744eaefb28b5162d3071e72c70debe Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Tue, 20 Nov 2018 16:59:11 +1000 Subject: [PATCH] Switch back to Xerial Snappy Library (#284) --- ethereum/p2p/build.gradle | 2 +- .../ethereum/p2p/rlpx/framing/Framer.java | 9 +-- .../p2p/rlpx/framing/SnappyCompressor.java | 22 ++++++-- .../ethereum/p2p/netty/DeFramerTest.java | 55 +++++++++++++++++++ .../ethereum/p2p/rlpx/framing/FramerTest.java | 30 +++++++++- .../rlpx/framing/SnappyCompressorTest.java | 43 +++++++++++---- gradle/versions.gradle | 2 +- 7 files changed, 137 insertions(+), 26 deletions(-) create mode 100644 ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java diff --git a/ethereum/p2p/build.gradle b/ethereum/p2p/build.gradle index 9ae06a9b4d..7635d41e2b 100644 --- a/ethereum/p2p/build.gradle +++ b/ethereum/p2p/build.gradle @@ -33,7 +33,7 @@ dependencies { implementation 'com.google.guava:guava' implementation 'io.vertx:vertx-core' implementation 'org.apache.logging.log4j:log4j-api' - implementation 'org.iq80.snappy:snappy' + implementation 'org.xerial.snappy:snappy-java' runtime 'org.apache.logging.log4j:log4j-core' diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java index 3199673c65..4a216d535d 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/Framer.java @@ -34,7 +34,6 @@ import org.bouncycastle.crypto.engines.AESEngine; import org.bouncycastle.crypto.modes.SICBlockCipher; import org.bouncycastle.crypto.params.KeyParameter; import org.bouncycastle.crypto.params.ParametersWithIV; -import org.iq80.snappy.CorruptionException; /** * This component is responsible for reading and composing RLPx protocol frames, conformant to the @@ -260,12 +259,8 @@ public class Framer { if (uncompressedLength >= LENGTH_MAX_MESSAGE_FRAME) { throw error("Message size %s in excess of maximum length.", uncompressedLength); } - try { - final byte[] decompressedMessageData = compressor.decompress(compressedMessageData); - data = BytesValue.wrap(decompressedMessageData); - } catch (final CorruptionException e) { - throw new FramingException("Decompression failed", e); - } + final byte[] decompressedMessageData = compressor.decompress(compressedMessageData); + data = BytesValue.wrap(decompressedMessageData); } else { // Move data to a ByteBuf final int messageLength = frameSize - LENGTH_MESSAGE_ID; diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressor.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressor.java index 281f236f8b..20179d3682 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressor.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressor.java @@ -14,7 +14,9 @@ package tech.pegasys.pantheon.ethereum.p2p.rlpx.framing; import static com.google.common.base.Preconditions.checkNotNull; -import org.iq80.snappy.Snappy; +import java.io.IOException; + +import org.xerial.snappy.Snappy; /** * A strategy for compressing and decompressing data with the Snappy algorithm. @@ -25,16 +27,28 @@ public class SnappyCompressor { public byte[] compress(final byte[] uncompressed) { checkNotNull(uncompressed, "input data must not be null"); - return Snappy.compress(uncompressed); + try { + return Snappy.compress(uncompressed); + } catch (final IOException e) { + throw new FramingException("Snappy compression failed", e); + } } public byte[] decompress(final byte[] compressed) { checkNotNull(compressed, "input data must not be null"); - return Snappy.uncompress(compressed, 0, compressed.length); + try { + return Snappy.uncompress(compressed); + } catch (final IOException e) { + throw new FramingException("Snappy decompression failed", e); + } } public int uncompressedLength(final byte[] compressed) { checkNotNull(compressed, "input data must not be null"); - return Snappy.getUncompressedLength(compressed, 0); + try { + return Snappy.uncompressedLength(compressed); + } catch (final IOException e) { + throw new FramingException("Snappy uncompressedLength failed", e); + } } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java new file mode 100644 index 0000000000..4700311e30 --- /dev/null +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/netty/DeFramerTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.netty; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection; +import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.Framer; +import tech.pegasys.pantheon.ethereum.p2p.rlpx.framing.FramingException; +import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderException; +import org.junit.Test; + +public class DeFramerTest { + + private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + private final Framer framer = mock(Framer.class); + private final Callbacks callbacks = mock(Callbacks.class); + private final PeerConnection peerConnection = mock(PeerConnection.class); + private final CompletableFuture connectFuture = new CompletableFuture<>(); + private final DeFramer deFramer = + new DeFramer( + framer, + Collections.emptyList(), + new PeerInfo(5, "abc", Collections.emptyList(), 0, BytesValue.fromHexString("0x01")), + callbacks, + connectFuture); + + @Test + public void shouldDisconnectForBreachOfProtocolWhenFramingExceptionThrown() throws Exception { + connectFuture.complete(peerConnection); + + deFramer.exceptionCaught(ctx, new DecoderException(new FramingException("Test"))); + + verify(peerConnection).disconnect(DisconnectReason.BREACH_OF_PROTOCOL); + } +} diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java index 785e475eba..3f58eb8f0e 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/FramerTest.java @@ -20,10 +20,13 @@ import static java.util.stream.StreamSupport.stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import tech.pegasys.pantheon.ethereum.p2p.api.MessageData; import tech.pegasys.pantheon.ethereum.p2p.rlpx.handshake.HandshakeSecrets; import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage; +import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.io.IOException; @@ -34,8 +37,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import org.iq80.snappy.Snappy; import org.junit.Test; +import org.xerial.snappy.Snappy; public class FramerTest { private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -195,6 +198,31 @@ public class FramerTest { } } + @Test + public void shouldThrowFramingExceptionWhenMessageIsNotCompressedButShouldBe() { + final HandshakeSecrets secrets = + new HandshakeSecrets( + BytesValue.fromHexString( + "0x75b3ee95adff0c529a05efd7612aa1dbe5057eb9facdde0dfc837ad143da1d43") + .extractArray(), + BytesValue.fromHexString( + "0x030dfd1566f4800c4842c177f7d476b64ae2b99a2aa0ab5600aa2f41a8710575") + .extractArray(), + BytesValue.fromHexString( + "0xc9d3385b1588a5969cba312f8c29bedb4cb9d56ec0cf825436addc1ec644f1d6") + .extractArray()); + final Framer receivingFramer = new Framer(secrets); + final Framer sendingFramer = new Framer(secrets); + + // Write a disconnect message with compression disabled. + final ByteBuf out = Unpooled.buffer(); + sendingFramer.frame(DisconnectMessage.create(DisconnectReason.TIMEOUT), out); + + // Then read it with compression enabled. + receivingFramer.enableCompression(); + assertThatThrownBy(() -> receivingFramer.deframe(out)).isInstanceOf(FramingException.class); + } + private HandshakeSecrets secretsFrom(final JsonNode td, final boolean swap) { final byte[] aes = decodeHexDump(td.get("aes_secret").asText()); final byte[] mac = decodeHexDump(td.get("mac_secret").asText()); diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressorTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressorTest.java index 76a804b4fa..21d4fb2c0b 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressorTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/rlpx/framing/SnappyCompressorTest.java @@ -14,6 +14,9 @@ package tech.pegasys.pantheon.ethereum.p2p.rlpx.framing; import static io.netty.buffer.ByteBufUtil.decodeHexDump; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.nio.charset.StandardCharsets; @@ -21,38 +24,55 @@ import org.junit.Test; public class SnappyCompressorTest { + private final SnappyCompressor snappy = new SnappyCompressor(); + @Test public void roundTrip() { String input = "Uncompressed sample text for round-trip compression/decompression"; input = input + input + input + input; // Give it some repetition for good sample data final byte[] data = input.getBytes(StandardCharsets.UTF_8); - final SnappyCompressor snappy = new SnappyCompressor(); assertThat(snappy.decompress(snappy.compress(data))).isEqualTo(data); } @Test public void roundTripEmptyByteArray() { final byte[] data = new byte[0]; - final SnappyCompressor snappy = new SnappyCompressor(); assertThat(snappy.decompress(snappy.compress(data))).isEqualTo(data); } - @Test(expected = NullPointerException.class) + @Test public void compressNull() { - final SnappyCompressor snappy = new SnappyCompressor(); - snappy.compress(null); + assertThatThrownBy(() -> snappy.compress(null)).isInstanceOf(NullPointerException.class); } - @Test(expected = NullPointerException.class) + @Test public void decompressNull() { - final SnappyCompressor snappy = new SnappyCompressor(); - snappy.decompress(null); + assertThatThrownBy(() -> snappy.decompress(null)).isInstanceOf(NullPointerException.class); } - @Test(expected = NullPointerException.class) + @Test public void uncompressedLengthNull() { - final SnappyCompressor snappy = new SnappyCompressor(); - snappy.uncompressedLength(null); + assertThatThrownBy(() -> snappy.uncompressedLength(null)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void shouldDetermineLengthOfEmptyMessage() { + assertThatThrownBy(() -> snappy.uncompressedLength(new byte[0])) + .isInstanceOf(FramingException.class); + } + + @Test + public void shouldDecompressEmptyMessage() { + final byte[] compressed = snappy.compress(new byte[0]); + assertThat(snappy.decompress(compressed)).isEmpty(); + } + + @Test + public void shouldRejectMessageWithNonZeroDeclaredLengthButNoCompressedData() { + assertThatThrownBy( + () -> snappy.decompress(BytesValue.fromHexString("0xFFFFFF01").extractArray())) + .isInstanceOf(FramingException.class); } @Test @@ -74,7 +94,6 @@ public class SnappyCompressorTest { + "d94062719ae58d392b253268da005a4fb2d8d33b19ec84a7312a34ecbfc2a0055c660cc59f5dad52ae" + "4d6fd5f2fc081d706ee0bce4195ecfff07a1f85d1bd6"); - final SnappyCompressor snappy = new SnappyCompressor(); assertThat(snappy.compress(decompressed)).isEqualTo(compressed); assertThat(snappy.decompress(compressed)).isEqualTo(decompressed); assertThat(snappy.decompress(snappy.compress(decompressed))).isEqualTo(decompressed); diff --git a/gradle/versions.gradle b/gradle/versions.gradle index 65e10aae2f..f767e5706c 100644 --- a/gradle/versions.gradle +++ b/gradle/versions.gradle @@ -41,7 +41,7 @@ dependencyManagement { dependency('io.pkts:pkts-core:3.0.2') - dependency("org.iq80.snappy:snappy:0.4") + dependency('org.xerial.snappy:snappy-java:1.1.7.2') dependency('com.github.docker-java:docker-java:3.0.14')