From 8a6840273252af81e44b6b245d9a81d8f11389af Mon Sep 17 00:00:00 2001 From: Karim T Date: Wed, 19 Feb 2020 10:34:36 +0100 Subject: [PATCH] [BOUNTY-4] Add NAT Kubernetes Support (#410) * add kubernetes support Signed-off-by: Karim TAAM * fix review issues Signed-off-by: Karim TAAM --- .../org/hyperledger/besu/RunnerBuilder.java | 11 +- .../hyperledger/besu/cli/BesuCommandTest.java | 5 +- gradle/versions.gradle | 2 + nat/build.gradle | 1 + .../org/hyperledger/besu/nat/NatMethod.java | 1 + .../nat/kubernetes/KubernetesDetector.java | 45 ++++++ .../nat/kubernetes/KubernetesNatManager.java | 150 +++++++++++++++++ .../kubernetes/KubernetesNatManagerTest.java | 153 ++++++++++++++++++ 8 files changed, 366 insertions(+), 2 deletions(-) create mode 100644 nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesDetector.java create mode 100644 nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManager.java create mode 100644 nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManagerTest.java diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index 243da5cc07..c7abf140fa 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -93,6 +93,8 @@ import org.hyperledger.besu.nat.NatService; import org.hyperledger.besu.nat.core.NatManager; import org.hyperledger.besu.nat.docker.DockerDetector; import org.hyperledger.besu.nat.docker.DockerNatManager; +import org.hyperledger.besu.nat.kubernetes.KubernetesDetector; +import org.hyperledger.besu.nat.kubernetes.KubernetesNatManager; import org.hyperledger.besu.nat.manual.ManualNatManager; import org.hyperledger.besu.nat.upnp.UpnpNatManager; import org.hyperledger.besu.plugin.BesuPlugin; @@ -114,10 +116,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import graphql.GraphQL; import io.vertx.core.Vertx; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes; public class RunnerBuilder { + protected static final Logger LOG = LogManager.getLogger(); + private Vertx vertx; private BesuController besuController; @@ -346,6 +352,7 @@ public class RunnerBuilder { .map(nodePerms -> PeerPermissions.combine(nodePerms, bannedNodes)) .orElse(bannedNodes); + LOG.info("Detecting NAT service."); final NatService natService = new NatService(buildNatManager(natMethod)); final NetworkBuilder inactiveNetwork = (caps) -> new NoopP2PNetwork(); final NetworkBuilder activeNetwork = @@ -590,7 +597,7 @@ public class RunnerBuilder { final NatMethod detectedNatMethod = Optional.of(natMethod) .filter(not(isEqual(NatMethod.AUTO))) - .orElse(NatService.autoDetectNatMethod(new DockerDetector())); + .orElse(NatService.autoDetectNatMethod(new DockerDetector(), new KubernetesDetector())); switch (detectedNatMethod) { case UPNP: return Optional.of(new UpnpNatManager()); @@ -600,6 +607,8 @@ public class RunnerBuilder { case DOCKER: return Optional.of( new DockerNatManager(p2pAdvertisedHost, p2pListenPort, jsonRpcConfiguration.getPort())); + case KUBERNETES: + return Optional.of(new KubernetesNatManager()); case NONE: default: return Optional.empty(); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java index b65bcfd418..e794940a4c 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java @@ -1395,6 +1395,9 @@ public class BesuCommandTest extends CommandTestAbstract { parseCommand("--nat-method", "DOCKER"); verify(mockRunnerBuilder).natMethod(eq(NatMethod.DOCKER)); + parseCommand("--nat-method", "KUBERNETES"); + verify(mockRunnerBuilder).natMethod(eq(NatMethod.KUBERNETES)); + assertThat(commandOutput.toString()).isEmpty(); assertThat(commandErrorOutput.toString()).isEmpty(); } @@ -1407,7 +1410,7 @@ public class BesuCommandTest extends CommandTestAbstract { assertThat(commandOutput.toString()).isEmpty(); assertThat(commandErrorOutput.toString()) .contains( - "Invalid value for option '--nat-method': expected one of [UPNP, MANUAL, DOCKER, AUTO, NONE] (case-insensitive) but was 'invalid'"); + "Invalid value for option '--nat-method': expected one of [UPNP, MANUAL, DOCKER, KUBERNETES, AUTO, NONE] (case-insensitive) but was 'invalid'"); } @Test diff --git a/gradle/versions.gradle b/gradle/versions.gradle index 4521ec2031..df1c686570 100644 --- a/gradle/versions.gradle +++ b/gradle/versions.gradle @@ -102,6 +102,8 @@ dependencyManagement { dependency 'org.xerial.snappy:snappy-java:1.1.7.3' + dependency 'io.kubernetes:client-java:5.0.0' + dependency 'tech.pegasys.ethsigner.internal:core:0.4.0' dependency 'tech.pegasys.ethsigner.internal:file-based:0.4.0' dependency 'tech.pegasys.ethsigner.internal:signing-api:0.4.0' diff --git a/nat/build.gradle b/nat/build.gradle index 099fa2721e..a5967c6374 100644 --- a/nat/build.gradle +++ b/nat/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation 'org.apache.logging.log4j:log4j-api' implementation 'org.jupnp:org.jupnp' implementation 'org.jupnp:org.jupnp.support' + implementation 'io.kubernetes:client-java' runtimeOnly 'org.apache.logging.log4j:log4j-core' diff --git a/nat/src/main/java/org/hyperledger/besu/nat/NatMethod.java b/nat/src/main/java/org/hyperledger/besu/nat/NatMethod.java index 2b72c68feb..3877a923e4 100644 --- a/nat/src/main/java/org/hyperledger/besu/nat/NatMethod.java +++ b/nat/src/main/java/org/hyperledger/besu/nat/NatMethod.java @@ -18,6 +18,7 @@ public enum NatMethod { UPNP, MANUAL, DOCKER, + KUBERNETES, AUTO, NONE; diff --git a/nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesDetector.java b/nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesDetector.java new file mode 100644 index 0000000000..823e066042 --- /dev/null +++ b/nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesDetector.java @@ -0,0 +1,45 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.nat.kubernetes; + +import org.hyperledger.besu.nat.NatMethod; +import org.hyperledger.besu.nat.core.NatMethodDetector; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; + +public class KubernetesDetector implements NatMethodDetector { + + // When a Pod runs on a Node, the kubelet adds a set of environment variables for each active + // Service. + // https://kubernetes.io/docs/concepts/services-networking/connect-applications-service/#environment-variables + private static final Optional KUBERNETES_SERVICE_HOST = + Optional.ofNullable(System.getenv("KUBERNETES_SERVICE_HOST")); + private static final Path KUBERNETES_WATERMARK_FILE = Paths.get("var/run/secrets/kubernetes.io"); + + @Override + public Optional detect() { + return KUBERNETES_SERVICE_HOST + .map(__ -> NatMethod.KUBERNETES) + .or( + () -> + Files.exists(KUBERNETES_WATERMARK_FILE) + ? Optional.of(NatMethod.KUBERNETES) + : Optional.empty()); + } +} diff --git a/nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManager.java b/nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManager.java new file mode 100644 index 0000000000..6f5e4f0e70 --- /dev/null +++ b/nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManager.java @@ -0,0 +1,150 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.nat.kubernetes; + +import org.hyperledger.besu.nat.NatMethod; +import org.hyperledger.besu.nat.core.AbstractNatManager; +import org.hyperledger.besu.nat.core.domain.NatPortMapping; +import org.hyperledger.besu.nat.core.domain.NatServiceType; +import org.hyperledger.besu.nat.core.domain.NetworkProtocol; + +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.Configuration; +import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.models.V1Service; +import io.kubernetes.client.util.ClientBuilder; +import io.kubernetes.client.util.KubeConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This class describes the behaviour of the Kubernetes NAT manager. Kubernetes Nat manager add + * support for Kubernetes’s NAT implementation when Besu is being run from a Kubernetes cluster + */ +public class KubernetesNatManager extends AbstractNatManager { + protected static final Logger LOG = LogManager.getLogger(); + + private static final String KUBE_CONFIG_PATH_ENV = "KUBE_CONFIG_PATH"; + private static final String DEFAULT_KUBE_CONFIG_PATH = "~/.kube/config"; + private static final String DEFAULT_BESU_POD_NAME_FILTER = "besu"; + + private String internalAdvertisedHost; + private final List forwardedPorts = new ArrayList<>(); + + public KubernetesNatManager() { + super(NatMethod.KUBERNETES); + } + + @Override + protected void doStart() { + LOG.info("Starting kubernetes NAT manager."); + update(); + } + + private void update() { + try { + LOG.debug("Trying to update information using Kubernetes client SDK."); + final String kubeConfigPath = + Optional.ofNullable(System.getenv(KUBE_CONFIG_PATH_ENV)).orElse(DEFAULT_KUBE_CONFIG_PATH); + LOG.debug( + "Checking if Kubernetes config file is present on file system: {}.", kubeConfigPath); + if (!Files.exists(Paths.get(kubeConfigPath))) { + throw new IllegalStateException("Cannot locate Kubernetes config file."); + } + // loading the out-of-cluster config, a kubeconfig from file-system + final ApiClient client = + ClientBuilder.kubeconfig( + KubeConfig.loadKubeConfig( + Files.newBufferedReader(Paths.get(kubeConfigPath), Charset.defaultCharset()))) + .build(); + + // set the global default api-client to the in-cluster one from above + Configuration.setDefaultApiClient(client); + + // the CoreV1Api loads default api-client from global configuration. + CoreV1Api api = new CoreV1Api(); + // invokes the CoreV1Api client + api.listServiceForAllNamespaces(null, null, null, null, null, null, null, null, null) + .getItems().stream() + .filter( + v1Service -> v1Service.getMetadata().getName().contains(DEFAULT_BESU_POD_NAME_FILTER)) + .findFirst() + .ifPresent(this::updateUsingBesuService); + + } catch (Exception e) { + LOG.warn("Failed update information using Kubernetes client SDK.", e); + } + } + + @VisibleForTesting + void updateUsingBesuService(final V1Service service) { + try { + LOG.info("Found Besu service: {}", service.getMetadata().getName()); + LOG.info("Setting host IP to: {}.", service.getSpec().getClusterIP()); + internalAdvertisedHost = service.getSpec().getClusterIP(); + final String internalHost = queryLocalIPAddress().get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + service + .getSpec() + .getPorts() + .forEach( + v1ServicePort -> { + try { + final NatServiceType natServiceType = + NatServiceType.fromString(v1ServicePort.getName()); + forwardedPorts.add( + new NatPortMapping( + natServiceType, + natServiceType.equals(NatServiceType.DISCOVERY) + ? NetworkProtocol.UDP + : NetworkProtocol.TCP, + internalHost, + internalAdvertisedHost, + v1ServicePort.getPort(), + v1ServicePort.getTargetPort().getIntValue())); + } catch (IllegalStateException e) { + LOG.warn("Ignored unknown Besu port: {}", e.getMessage()); + } + }); + } catch (Exception e) { + LOG.warn("Failed update information using pod metadata.", e); + } + } + + @Override + protected void doStop() { + LOG.info("Stopping kubernetes NAT manager."); + } + + @Override + protected CompletableFuture retrieveExternalIPAddress() { + return CompletableFuture.completedFuture(internalAdvertisedHost); + } + + @Override + public CompletableFuture> getPortMappings() { + return CompletableFuture.completedFuture(forwardedPorts); + } +} diff --git a/nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManagerTest.java b/nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManagerTest.java new file mode 100644 index 0000000000..030256d7e6 --- /dev/null +++ b/nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManagerTest.java @@ -0,0 +1,153 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.nat.kubernetes; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +import org.hyperledger.besu.nat.core.domain.NatPortMapping; +import org.hyperledger.besu.nat.core.domain.NatServiceType; +import org.hyperledger.besu.nat.core.domain.NetworkProtocol; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; + +import io.kubernetes.client.custom.IntOrString; +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1Service; +import io.kubernetes.client.models.V1ServicePort; +import io.kubernetes.client.models.V1ServiceSpec; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public final class KubernetesNatManagerTest { + + private final String detectedAdvertisedHost = "199.45.69.12"; + + private final int p2pPort = 1; + private final int rpcHttpPort = 2; + + @Mock private V1Service v1Service; + + private KubernetesNatManager natManager; + + @Before + public void initialize() throws IOException { + when(v1Service.getSpec()) + .thenReturn( + new V1ServiceSpec() + .clusterIP(detectedAdvertisedHost) + .ports( + Arrays.asList( + new V1ServicePort() + .name(NatServiceType.JSON_RPC.getValue()) + .port(rpcHttpPort) + .targetPort(new IntOrString(rpcHttpPort)), + new V1ServicePort() + .name(NatServiceType.RLPX.getValue()) + .port(p2pPort) + .targetPort(new IntOrString(p2pPort)), + new V1ServicePort() + .name(NatServiceType.DISCOVERY.getValue()) + .port(p2pPort) + .targetPort(new IntOrString(p2pPort))))); + when(v1Service.getMetadata()).thenReturn(new V1ObjectMeta().name("besu")); + natManager = new KubernetesNatManager(); + try { + natManager.start(); + } catch (Exception ignored) { + System.err.println("Ignored missing Kube config file in testing context."); + } + natManager.updateUsingBesuService(v1Service); + } + + @Test + public void assertThatExternalIPIsEqualToRemoteHost() + throws ExecutionException, InterruptedException { + + assertThat(natManager.queryExternalIPAddress().get()).isEqualTo(detectedAdvertisedHost); + } + + @Test + public void assertThatLocalIPIsEqualToLocalHost() + throws ExecutionException, InterruptedException, UnknownHostException { + final String internalHost = InetAddress.getLocalHost().getHostAddress(); + assertThat(natManager.queryLocalIPAddress().get()).isEqualTo(internalHost); + } + + @Test + public void assertThatMappingForDiscoveryWorks() throws UnknownHostException { + final String internalHost = InetAddress.getLocalHost().getHostAddress(); + + final NatPortMapping mapping = + natManager.getPortMapping(NatServiceType.DISCOVERY, NetworkProtocol.UDP); + + final NatPortMapping expectedMapping = + new NatPortMapping( + NatServiceType.DISCOVERY, + NetworkProtocol.UDP, + internalHost, + detectedAdvertisedHost, + p2pPort, + p2pPort); + + assertThat(mapping).isEqualToComparingFieldByField(expectedMapping); + } + + @Test + public void assertThatMappingForJsonRpcWorks() throws UnknownHostException { + final String internalHost = InetAddress.getLocalHost().getHostAddress(); + + final NatPortMapping mapping = + natManager.getPortMapping(NatServiceType.JSON_RPC, NetworkProtocol.TCP); + + final NatPortMapping expectedMapping = + new NatPortMapping( + NatServiceType.JSON_RPC, + NetworkProtocol.TCP, + internalHost, + detectedAdvertisedHost, + rpcHttpPort, + rpcHttpPort); + + assertThat(mapping).isEqualToComparingFieldByField(expectedMapping); + } + + @Test + public void assertThatMappingForRlpxWorks() throws UnknownHostException { + final String internalHost = InetAddress.getLocalHost().getHostAddress(); + + final NatPortMapping mapping = + natManager.getPortMapping(NatServiceType.RLPX, NetworkProtocol.TCP); + + final NatPortMapping expectedMapping = + new NatPortMapping( + NatServiceType.RLPX, + NetworkProtocol.TCP, + internalHost, + detectedAdvertisedHost, + p2pPort, + p2pPort); + + assertThat(mapping).isEqualToComparingFieldByField(expectedMapping); + } +}