Add cluster ip for Kubernetes Nat Manager (#1156)

- Add compatibility with ClusterIP services in the kubernetes nat manager
- Add new Xnat-method-fallback-enabled flag

Signed-off-by: Karim TAAM <karim.t2am@gmail.com>
pull/1187/head
Karim T 4 years ago committed by GitHub
parent 174e6e29ad
commit 50db46f855
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
  2. 28
      besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
  3. 62
      besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java
  4. 3
      besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java
  5. 3
      besu/src/test/resources/everything_config.toml
  6. 22
      nat/src/main/java/org/hyperledger/besu/nat/NatService.java
  7. 4
      nat/src/main/java/org/hyperledger/besu/nat/core/IpDetector.java
  8. 82
      nat/src/main/java/org/hyperledger/besu/nat/docker/DockerNatManager.java
  9. 4
      nat/src/main/java/org/hyperledger/besu/nat/docker/HostBasedIpDetector.java
  10. 49
      nat/src/main/java/org/hyperledger/besu/nat/kubernetes/KubernetesNatManager.java
  11. 36
      nat/src/main/java/org/hyperledger/besu/nat/kubernetes/service/KubernetesServiceType.java
  12. 51
      nat/src/main/java/org/hyperledger/besu/nat/kubernetes/service/LoadBalancerBasedDetector.java
  13. 41
      nat/src/test/java/org/hyperledger/besu/nat/NatServiceTest.java
  14. 13
      nat/src/test/java/org/hyperledger/besu/nat/docker/DockerNatManagerTest.java
  15. 157
      nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesClusterIpNatManagerTest.java
  16. 10
      nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesLoadManagerNatManagerTest.java
  17. 54
      nat/src/test/java/org/hyperledger/besu/nat/kubernetes/KubernetesUnknownNatManagerTest.java

@ -136,7 +136,8 @@ public class RunnerBuilder {
private String p2pListenInterface = NetworkUtility.INADDR_ANY;
private int p2pListenPort;
private NatMethod natMethod = NatMethod.AUTO;
private String natManagerPodName;
private String natManagerServiceName;
private boolean natMethodFallbackEnabled;
private int maxPeers;
private boolean limitRemoteWireConnectionsEnabled = false;
private float fractionRemoteConnectionsAllowed;
@ -207,8 +208,13 @@ public class RunnerBuilder {
return this;
}
public RunnerBuilder natManagerPodName(final String natManagerPodName) {
this.natManagerPodName = natManagerPodName;
public RunnerBuilder natManagerServiceName(final String natManagerServiceName) {
this.natManagerServiceName = natManagerServiceName;
return this;
}
public RunnerBuilder natMethodFallbackEnabled(final boolean natMethodFallbackEnabled) {
this.natMethodFallbackEnabled = natMethodFallbackEnabled;
return this;
}
@ -366,7 +372,8 @@ public class RunnerBuilder {
.orElse(bannedNodes);
LOG.info("Detecting NAT service.");
final NatService natService = new NatService(buildNatManager(natMethod));
final boolean fallbackEnabled = natMethod == NatMethod.AUTO || natMethodFallbackEnabled;
final NatService natService = new NatService(buildNatManager(natMethod), fallbackEnabled);
final NetworkBuilder inactiveNetwork = (caps) -> new NoopP2PNetwork();
final NetworkBuilder activeNetwork =
(caps) ->
@ -642,7 +649,7 @@ public class RunnerBuilder {
return Optional.of(
new DockerNatManager(p2pAdvertisedHost, p2pListenPort, jsonRpcConfiguration.getPort()));
case KUBERNETES:
return Optional.of(new KubernetesNatManager(natManagerPodName));
return Optional.of(new KubernetesNatManager(natManagerServiceName));
case NONE:
default:
return Optional.empty();

@ -31,7 +31,7 @@ import static org.hyperledger.besu.ethereum.core.MiningParameters.DEFAULT_REMOTE
import static org.hyperledger.besu.metrics.BesuMetricCategory.DEFAULT_METRIC_CATEGORIES;
import static org.hyperledger.besu.metrics.prometheus.MetricsConfiguration.DEFAULT_METRICS_PORT;
import static org.hyperledger.besu.metrics.prometheus.MetricsConfiguration.DEFAULT_METRICS_PUSH_PORT;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_POD_NAME_FILTER;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_SERVICE_NAME_FILTER;
import org.hyperledger.besu.BesuInfo;
import org.hyperledger.besu.Runner;
@ -422,10 +422,17 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
@SuppressWarnings({"FieldCanBeFinal", "FieldMayBeFinal"}) // PicoCLI requires non-final Strings.
@Option(
names = {"--Xnat-kube-pod-name"},
names = {"--Xnat-kube-service-name"},
description =
"Specify the name of the pod that will be used by the nat manager in Kubernetes. (default: ${DEFAULT-VALUE})")
private String natManagerPodName = DEFAULT_BESU_POD_NAME_FILTER;
"Specify the name of the service that will be used by the nat manager in Kubernetes. (default: ${DEFAULT-VALUE})")
private String natManagerServiceName = DEFAULT_BESU_SERVICE_NAME_FILTER;
@Option(
names = {"--Xnat-method-fallback-enabled"},
description =
"Enable fallback to NONE for the nat manager in case of failure. If False BESU will exit on failure. (default: ${DEFAULT-VALUE})",
arity = "1")
private final Boolean natMethodFallbackEnabled = true;
@Option(
names = {"--network-id"},
@ -1332,12 +1339,18 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
@SuppressWarnings("ConstantConditions")
private void validateNatParams() {
if (!(natMethod.equals(NatMethod.AUTO) || natMethod.equals(NatMethod.KUBERNETES))
&& !natManagerPodName.equals(DEFAULT_BESU_POD_NAME_FILTER)) {
&& !natManagerServiceName.equals(DEFAULT_BESU_SERVICE_NAME_FILTER)) {
throw new ParameterException(
this.commandLine,
"The `--Xnat-kube-pod-name` parameter is only used in kubernetes mode. Either remove --Xnat-kube-pod-name"
"The `--Xnat-kube-service-name` parameter is only used in kubernetes mode. Either remove --Xnat-kube-service-name"
+ " or select the KUBERNETES mode (via --nat--method=KUBERNETES)");
}
if (natMethod.equals(NatMethod.AUTO) && !natMethodFallbackEnabled) {
throw new ParameterException(
this.commandLine,
"The `--Xnat-method-fallback-enabled` parameter cannot be used in AUTO mode. Either remove --Xnat-method-fallback-enabled"
+ " or select another mode (via --nat--method=XXXX)");
}
}
private void issueOptionWarnings() {
@ -1982,7 +1995,8 @@ public class BesuCommand implements DefaultCommandValues, Runnable {
.besuController(controller)
.p2pEnabled(p2pEnabled)
.natMethod(natMethod)
.natManagerPodName(natManagerPodName)
.natManagerServiceName(natManagerServiceName)
.natMethodFallbackEnabled(natMethodFallbackEnabled)
.discovery(peerDiscoveryEnabled)
.ethNetworkConfig(ethNetworkConfig)
.p2pAdvertisedHost(p2pAdvertisedHost)

@ -31,7 +31,7 @@ import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.NET;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.PERM;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.WEB3;
import static org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration.MAINNET_BOOTSTRAP_NODES;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_POD_NAME_FILTER;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_SERVICE_NAME_FILTER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNotNull;
@ -1347,7 +1347,7 @@ public class BesuCommandTest extends CommandTestAbstract {
public void natManagerPodNamePropertyDefaultIsBesu() {
parseCommand();
verify(mockRunnerBuilder).natManagerPodName(eq(DEFAULT_BESU_POD_NAME_FILTER));
verify(mockRunnerBuilder).natManagerServiceName(eq(DEFAULT_BESU_SERVICE_NAME_FILTER));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
@ -1356,9 +1356,9 @@ public class BesuCommandTest extends CommandTestAbstract {
@Test
public void natManagerPodNamePropertyIsCorrectlyUpdated() {
final String podName = "besu-updated";
parseCommand("--Xnat-kube-pod-name", podName);
parseCommand("--Xnat-kube-service-name", podName);
verify(mockRunnerBuilder).natManagerPodName(eq(podName));
verify(mockRunnerBuilder).natManagerServiceName(eq(podName));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
@ -1366,22 +1366,68 @@ public class BesuCommandTest extends CommandTestAbstract {
@Test
public void natManagerPodNameCannotBeUsedWithNatDockerMethod() {
parseCommand("--nat-method", "DOCKER", "--Xnat-kube-pod-name", "besu-updated");
parseCommand("--nat-method", "DOCKER", "--Xnat-kube-service-name", "besu-updated");
Mockito.verifyZeroInteractions(mockRunnerBuilder);
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString())
.contains(
"The `--Xnat-kube-pod-name` parameter is only used in kubernetes mode. Either remove --Xnat-kube-pod-name or select the KUBERNETES mode (via --nat--method=KUBERNETES)");
"The `--Xnat-kube-service-name` parameter is only used in kubernetes mode. Either remove --Xnat-kube-service-name or select the KUBERNETES mode (via --nat--method=KUBERNETES)");
}
@Test
public void natManagerPodNameCannotBeUsedWithNatNoneMethod() {
parseCommand("--nat-method", "NONE", "--Xnat-kube-pod-name", "besu-updated");
parseCommand("--nat-method", "NONE", "--Xnat-kube-service-name", "besu-updated");
Mockito.verifyZeroInteractions(mockRunnerBuilder);
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString())
.contains(
"The `--Xnat-kube-pod-name` parameter is only used in kubernetes mode. Either remove --Xnat-kube-pod-name or select the KUBERNETES mode (via --nat--method=KUBERNETES)");
"The `--Xnat-kube-service-name` parameter is only used in kubernetes mode. Either remove --Xnat-kube-service-name or select the KUBERNETES mode (via --nat--method=KUBERNETES)");
}
@Test
public void natMethodFallbackEnabledPropertyIsCorrectlyUpdatedWithKubernetes() {
parseCommand("--nat-method", "KUBERNETES", "--Xnat-method-fallback-enabled", "false");
verify(mockRunnerBuilder).natMethodFallbackEnabled(eq(false));
parseCommand("--nat-method", "KUBERNETES", "--Xnat-method-fallback-enabled", "true");
verify(mockRunnerBuilder).natMethodFallbackEnabled(eq(true));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void natMethodFallbackEnabledPropertyIsCorrectlyUpdatedWithDocker() {
parseCommand("--nat-method", "DOCKER", "--Xnat-method-fallback-enabled", "false");
verify(mockRunnerBuilder).natMethodFallbackEnabled(eq(false));
parseCommand("--nat-method", "DOCKER", "--Xnat-method-fallback-enabled", "true");
verify(mockRunnerBuilder).natMethodFallbackEnabled(eq(true));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void natMethodFallbackEnabledPropertyIsCorrectlyUpdatedWithUpnp() {
parseCommand("--nat-method", "UPNP", "--Xnat-method-fallback-enabled", "false");
verify(mockRunnerBuilder).natMethodFallbackEnabled(eq(false));
parseCommand("--nat-method", "UPNP", "--Xnat-method-fallback-enabled", "true");
verify(mockRunnerBuilder).natMethodFallbackEnabled(eq(true));
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString()).isEmpty();
}
@Test
public void natMethodFallbackEnabledCannotBeUsedWithAutoMethod() {
parseCommand("--nat-method", "AUTO", "--Xnat-method-fallback-enabled", "false");
Mockito.verifyZeroInteractions(mockRunnerBuilder);
assertThat(commandOutput.toString()).isEmpty();
assertThat(commandErrorOutput.toString())
.contains(
"The `--Xnat-method-fallback-enabled` parameter cannot be used in AUTO mode. Either remove --Xnat-method-fallback-enabled or select another mode (via --nat--method=XXXX)");
}
@Test

@ -213,7 +213,8 @@ public abstract class CommandTestAbstract {
.thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.p2pEnabled(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.natMethod(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.natManagerPodName(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.natManagerServiceName(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.natMethodFallbackEnabled(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.jsonRpcConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.graphQLConfiguration(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.webSocketConfiguration(any())).thenReturn(mockRunnerBuilder);

@ -21,7 +21,8 @@ security-module="localfile"
identity="PegaSysEng"
p2p-enabled=true
nat-method="NONE"
Xnat-kube-pod-name="besu"
Xnat-kube-service-name="besu"
Xnat-method-fallback-enabled=true
discovery-enabled=false
bootnodes=[
"enode://6f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0@192.168.0.1:4567",

@ -33,12 +33,20 @@ public class NatService {
private static final Logger LOG = LogManager.getLogger();
private static final boolean DEFAULT_FALLBACK_STATUS = true;
private NatMethod currentNatMethod;
private Optional<NatManager> currentNatManager;
private final boolean fallbackEnabled;
public NatService(final Optional<NatManager> natManager) {
public NatService(final Optional<NatManager> natManager, final boolean fallbackEnabled) {
this.currentNatMethod = retrieveNatMethod(natManager);
this.currentNatManager = natManager;
this.fallbackEnabled = fallbackEnabled;
}
public NatService(final Optional<NatManager> natManager) {
this(natManager, DEFAULT_FALLBACK_STATUS);
}
/**
@ -88,10 +96,14 @@ public class NatService {
getNatManager().orElseThrow().start();
} catch (Exception e) {
LOG.debug(
"Nat manager failed to configure itself automatically due to the following reason "
+ e.getMessage()
+ ". NONE mode will be used");
disableNatManager();
"Nat manager failed to configure itself automatically due to the following reason : {}. {}",
e.getMessage(),
(fallbackEnabled) ? "NONE mode will be used" : "");
if (fallbackEnabled) {
disableNatManager();
} else {
throw new IllegalStateException(e.getMessage(), e);
}
}
} else {
LOG.info("No NAT environment detected so no service could be started");

@ -13,11 +13,11 @@
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.nat.docker;
package org.hyperledger.besu.nat.core;
import java.util.Optional;
public interface IpDetector {
Optional<String> detectExternalIp();
Optional<String> detectAdvertisedIp() throws Exception;
}

@ -17,12 +17,13 @@ package org.hyperledger.besu.nat.docker;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.core.AbstractNatManager;
import org.hyperledger.besu.nat.core.IpDetector;
import org.hyperledger.besu.nat.core.domain.NatPortMapping;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import org.hyperledger.besu.nat.core.exception.NatInitializationException;
import java.util.Arrays;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -42,11 +43,11 @@ public class DockerNatManager extends AbstractNatManager {
private final IpDetector ipDetector;
private final String internalAdvertisedHost;
private final int internalP2pPort;
private final int internalRpcHttpPort;
private final List<NatPortMapping> forwardedPorts;
private String internalAdvertisedHost;
private final List<NatPortMapping> forwardedPorts = new ArrayList<>();
public DockerNatManager(final String advertisedHost, final int p2pPort, final int rpcHttpPort) {
this(new HostBasedIpDetector(), advertisedHost, p2pPort, rpcHttpPort);
@ -62,29 +63,62 @@ public class DockerNatManager extends AbstractNatManager {
this.internalAdvertisedHost = advertisedHost;
this.internalP2pPort = p2pPort;
this.internalRpcHttpPort = rpcHttpPort;
this.forwardedPorts = buildForwardedPorts();
}
private List<NatPortMapping> buildForwardedPorts() {
@Override
protected void doStart() throws NatInitializationException {
LOG.info("Starting docker NAT manager.");
try {
ipDetector.detectAdvertisedIp().ifPresent(ipFound -> internalAdvertisedHost = ipFound);
buildForwardedPorts();
} catch (Exception e) {
throw new NatInitializationException("Unable to retrieve IP from docker");
}
}
@Override
protected void doStop() {
LOG.info("Stopping docker NAT manager.");
}
@Override
protected CompletableFuture<String> retrieveExternalIPAddress() {
return CompletableFuture.completedFuture(internalAdvertisedHost);
}
@Override
public CompletableFuture<List<NatPortMapping>> getPortMappings() {
return CompletableFuture.completedFuture(forwardedPorts);
}
private int getExternalPort(final int defaultValue) {
return Optional.ofNullable(System.getenv(PORT_MAPPING_TAG + defaultValue))
.map(Integer::valueOf)
.orElse(defaultValue);
}
private void buildForwardedPorts() {
try {
final String internalHost = queryLocalIPAddress().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
final String advertisedHost =
retrieveExternalIPAddress().get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return Arrays.asList(
forwardedPorts.add(
new NatPortMapping(
NatServiceType.DISCOVERY,
NetworkProtocol.UDP,
internalHost,
advertisedHost,
internalP2pPort,
getExternalPort(internalP2pPort)),
getExternalPort(internalP2pPort)));
forwardedPorts.add(
new NatPortMapping(
NatServiceType.RLPX,
NetworkProtocol.TCP,
internalHost,
advertisedHost,
internalP2pPort,
getExternalPort(internalP2pPort)),
getExternalPort(internalP2pPort)));
forwardedPorts.add(
new NatPortMapping(
NatServiceType.JSON_RPC,
NetworkProtocol.TCP,
@ -95,35 +129,5 @@ public class DockerNatManager extends AbstractNatManager {
} catch (Exception e) {
LOG.warn("Failed to create forwarded port list", e);
}
return Collections.emptyList();
}
@Override
protected void doStart() {
LOG.info("Starting docker NAT manager.");
}
@Override
protected void doStop() {
LOG.info("Stopping docker NAT manager.");
}
@Override
protected CompletableFuture<String> retrieveExternalIPAddress() {
return ipDetector
.detectExternalIp()
.map(CompletableFuture::completedFuture)
.orElse(CompletableFuture.completedFuture(internalAdvertisedHost));
}
@Override
public CompletableFuture<List<NatPortMapping>> getPortMappings() {
return CompletableFuture.completedFuture(forwardedPorts);
}
private int getExternalPort(final int defaultValue) {
return Optional.ofNullable(System.getenv(PORT_MAPPING_TAG + defaultValue))
.map(Integer::valueOf)
.orElse(defaultValue);
}
}

@ -15,6 +15,8 @@
package org.hyperledger.besu.nat.docker;
import org.hyperledger.besu.nat.core.IpDetector;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
@ -24,7 +26,7 @@ public class HostBasedIpDetector implements IpDetector {
private static final String HOSTNAME = "HOST_IP";
@Override
public Optional<String> detectExternalIp() {
public Optional<String> detectAdvertisedIp() {
try {
return Optional.of(InetAddress.getByName(HOSTNAME).getHostAddress());
} catch (final UnknownHostException e) {

@ -17,14 +17,17 @@ package org.hyperledger.besu.nat.kubernetes;
import org.hyperledger.besu.nat.NatMethod;
import org.hyperledger.besu.nat.core.AbstractNatManager;
import org.hyperledger.besu.nat.core.IpDetector;
import org.hyperledger.besu.nat.core.domain.NatPortMapping;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import org.hyperledger.besu.nat.core.exception.NatInitializationException;
import org.hyperledger.besu.nat.kubernetes.service.KubernetesServiceType;
import org.hyperledger.besu.nat.kubernetes.service.LoadBalancerBasedDetector;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -32,7 +35,6 @@ import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1LoadBalancerIngress;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
@ -47,15 +49,15 @@ import org.apache.logging.log4j.Logger;
public class KubernetesNatManager extends AbstractNatManager {
private static final Logger LOG = LogManager.getLogger();
public static final String DEFAULT_BESU_POD_NAME_FILTER = "besu";
public static final String DEFAULT_BESU_SERVICE_NAME_FILTER = "besu";
private String internalAdvertisedHost;
private final String besuPodNameFilter;
private final String besuServiceNameFilter;
private final List<NatPortMapping> forwardedPorts = new ArrayList<>();
public KubernetesNatManager(final String besuPodNameFilter) {
public KubernetesNatManager(final String besuServiceNameFilter) {
super(NatMethod.KUBERNETES);
this.besuPodNameFilter = besuPodNameFilter;
this.besuServiceNameFilter = besuServiceNameFilter;
}
@Override
@ -77,7 +79,8 @@ public class KubernetesNatManager extends AbstractNatManager {
final V1Service service =
api.listServiceForAllNamespaces(null, null, null, null, null, null, null, null, null)
.getItems().stream()
.filter(v1Service -> v1Service.getMetadata().getName().contains(besuPodNameFilter))
.filter(
v1Service -> v1Service.getMetadata().getName().contains(besuServiceNameFilter))
.findFirst()
.orElseThrow(() -> new NatInitializationException("Service not found"));
updateUsingBesuService(service);
@ -91,21 +94,11 @@ public class KubernetesNatManager extends AbstractNatManager {
try {
LOG.info("Found Besu service: {}", service.getMetadata().getName());
final V1LoadBalancerIngress v1LoadBalancerIngress =
service.getStatus().getLoadBalancer().getIngress().stream()
.filter(
v1LoadBalancerIngress1 ->
v1LoadBalancerIngress1.getHostname() != null
|| v1LoadBalancerIngress1.getIp() != null)
.findFirst()
.orElseThrow(() -> new NatInitializationException("Ingress not found"));
if (v1LoadBalancerIngress.getHostname() != null) {
internalAdvertisedHost =
InetAddress.getByName(v1LoadBalancerIngress.getHostname()).getHostAddress();
} else {
internalAdvertisedHost = v1LoadBalancerIngress.getIp();
}
internalAdvertisedHost =
getIpDetector(service)
.detectAdvertisedIp()
.orElseThrow(
() -> new NatInitializationException("Unable to retrieve IP from service"));
LOG.info("Setting host IP to: {}.", internalAdvertisedHost);
@ -152,4 +145,16 @@ public class KubernetesNatManager extends AbstractNatManager {
public CompletableFuture<List<NatPortMapping>> getPortMappings() {
return CompletableFuture.completedFuture(forwardedPorts);
}
private IpDetector getIpDetector(final V1Service v1Service) throws NatInitializationException {
final String serviceType = v1Service.getSpec().getType();
switch (KubernetesServiceType.fromName(serviceType)) {
case CLUSTER_IP:
return () -> Optional.ofNullable(v1Service.getSpec().getClusterIP());
case LOAD_BALANCER:
return new LoadBalancerBasedDetector(v1Service);
default:
throw new NatInitializationException(String.format("%s is not implemented", serviceType));
}
}
}

@ -0,0 +1,36 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.nat.kubernetes.service;
public enum KubernetesServiceType {
CLUSTER_IP("ClusterIP"),
LOAD_BALANCER("LoadBalancer"),
UNKNOWN("");
String name;
KubernetesServiceType(final String name) {
this.name = name;
}
public static KubernetesServiceType fromName(final String name) {
for (KubernetesServiceType value : values()) {
if (value.name.equals(name)) {
return value;
}
}
return UNKNOWN;
}
}

@ -0,0 +1,51 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.nat.kubernetes.service;
import org.hyperledger.besu.nat.core.IpDetector;
import org.hyperledger.besu.nat.core.exception.NatInitializationException;
import java.net.InetAddress;
import java.util.Optional;
import io.kubernetes.client.models.V1LoadBalancerIngress;
import io.kubernetes.client.models.V1Service;
public class LoadBalancerBasedDetector implements IpDetector {
private final V1Service v1Service;
public LoadBalancerBasedDetector(final V1Service v1Service) {
this.v1Service = v1Service;
}
@Override
public Optional<String> detectAdvertisedIp() throws Exception {
final V1LoadBalancerIngress v1LoadBalancerIngress =
v1Service.getStatus().getLoadBalancer().getIngress().stream()
.filter(
v1LoadBalancerIngress1 ->
v1LoadBalancerIngress1.getHostname() != null
|| v1LoadBalancerIngress1.getIp() != null)
.findFirst()
.orElseThrow(() -> new NatInitializationException("Ingress not found"));
if (v1LoadBalancerIngress.getHostname() != null) {
return Optional.ofNullable(
InetAddress.getByName(v1LoadBalancerIngress.getHostname()).getHostAddress());
} else {
return Optional.ofNullable(v1LoadBalancerIngress.getIp());
}
}
}

@ -16,6 +16,7 @@
package org.hyperledger.besu.nat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -41,24 +42,24 @@ public class NatServiceTest {
@Test
public void assertThatGetNatManagerReturnValidManager() {
final NatService natService = new NatService(Optional.of(new UpnpNatManager()));
final NatService natService = new NatService(Optional.of(new UpnpNatManager()), true);
assertThat(natService.getNatMethod()).isEqualTo(NatMethod.UPNP);
assertThat(natService.getNatManager()).containsInstanceOf(UpnpNatManager.class);
}
@Test
public void assertThatGetNatManagerNotReturnManagerWhenNatMethodIsNone() {
final NatService natService = new NatService(Optional.empty());
final NatService natService = new NatService(Optional.empty(), true);
assertThat(natService.getNatMethod()).isEqualTo(NatMethod.NONE);
assertThat(natService.getNatManager()).isNotPresent();
}
@Test
public void assertThatIsNatEnvironmentReturnCorrectStatus() {
final NatService nonNatService = new NatService(Optional.empty());
final NatService nonNatService = new NatService(Optional.empty(), true);
assertThat(nonNatService.isNatEnvironment()).isFalse();
final NatService upnpNatService = new NatService(Optional.of(new UpnpNatManager()));
final NatService upnpNatService = new NatService(Optional.of(new UpnpNatManager()), true);
assertThat(upnpNatService.isNatEnvironment()).isTrue();
}
@ -74,7 +75,7 @@ public class NatServiceTest {
.thenReturn(natPortMapping);
when(natManager.getNatMethod()).thenReturn(NatMethod.UPNP);
final NatService natService = new NatService(Optional.of(natManager));
final NatService natService = new NatService(Optional.of(natManager), true);
final Optional<NatPortMapping> portMapping =
natService.getPortMapping(natPortMapping.getNatServiceType(), natPortMapping.getProtocol());
@ -88,7 +89,7 @@ public class NatServiceTest {
@Test
public void assertThatGetPortMappingWorksProperlyWithoutNat() {
final NatService natService = new NatService(Optional.empty());
final NatService natService = new NatService(Optional.empty(), true);
final Optional<NatPortMapping> portMapping =
natService.getPortMapping(NatServiceType.DISCOVERY, NetworkProtocol.TCP);
@ -105,7 +106,7 @@ public class NatServiceTest {
.thenReturn(CompletableFuture.completedFuture(externalIp));
when(natManager.getNatMethod()).thenReturn(NatMethod.UPNP);
final NatService natService = new NatService(Optional.of(natManager));
final NatService natService = new NatService(Optional.of(natManager), true);
final String resultIp = natService.queryExternalIPAddress(fallbackExternalIp);
@ -119,7 +120,7 @@ public class NatServiceTest {
final String fallbackExternalIp = "127.0.0.1";
final NatService natService = new NatService(Optional.empty());
final NatService natService = new NatService(Optional.empty(), true);
final String resultIp = natService.queryExternalIPAddress(fallbackExternalIp);
@ -135,7 +136,7 @@ public class NatServiceTest {
.thenReturn(CompletableFuture.completedFuture(externalIp));
when(natManager.getNatMethod()).thenReturn(NatMethod.UPNP);
final NatService natService = new NatService(Optional.of(natManager));
final NatService natService = new NatService(Optional.of(natManager), true);
final String resultIp = natService.queryLocalIPAddress(fallbackExternalIp);
@ -149,7 +150,7 @@ public class NatServiceTest {
final String fallbackValue = "1.2.3.4";
final NatService natService = new NatService(Optional.empty());
final NatService natService = new NatService(Optional.empty(), true);
final String resultIp = natService.queryLocalIPAddress(fallbackValue);
@ -176,7 +177,7 @@ public class NatServiceTest {
NatServiceType.DISCOVERY, NetworkProtocol.UDP, localIp, externalIp, 1111, 1111));
when(natManager.getNatMethod()).thenReturn(NatMethod.UPNP);
final NatService natService = new NatService(Optional.of(natManager));
final NatService natService = new NatService(Optional.of(natManager), true);
assertThat(natService.getNatMethod()).isEqualTo(NatMethod.UPNP);
assertThat(natService.isNatEnvironment()).isTrue();
@ -197,6 +198,24 @@ public class NatServiceTest {
assertThat(natService.queryLocalIPAddress(fallbackLocalIp)).isEqualTo(fallbackLocalIp);
}
@Test
public void assertThatManagerSwitchToNoneForInvalidNatEnvironmentIfFallbackDisabled()
throws NatInitializationException {
final NatManager natManager = mock(NatManager.class);
doThrow(NatInitializationException.class).when(natManager).start();
when(natManager.getNatMethod()).thenReturn(NatMethod.UPNP);
final NatService natService = new NatService(Optional.of(natManager), false);
assertThat(natService.getNatMethod()).isEqualTo(NatMethod.UPNP);
assertThat(natService.isNatEnvironment()).isTrue();
assertThat(natService.getNatManager()).contains(natManager);
assertThatThrownBy(natService::start);
}
@Test
public void givenOneAutoDetectionWorksWhenAutoDetectThenReturnCorrectNatMethod() {
final NatMethod natMethod = NatService.autoDetectNatMethod(() -> Optional.of(NatMethod.UPNP));

@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.nat.core.NatManager;
import org.hyperledger.besu.nat.core.domain.NatPortMapping;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
@ -28,6 +29,7 @@ import java.net.UnknownHostException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@ -47,7 +49,7 @@ public final class DockerNatManagerTest {
@Before
public void initialize() throws NatInitializationException {
hostBasedIpDetector = mock(HostBasedIpDetector.class);
when(hostBasedIpDetector.detectExternalIp()).thenReturn(Optional.of(detectedAdvertisedHost));
when(hostBasedIpDetector.detectAdvertisedIp()).thenReturn(Optional.of(detectedAdvertisedHost));
natManager = new DockerNatManager(hostBasedIpDetector, advertisedHost, p2pPort, rpcHttpPort);
natManager.start();
}
@ -61,7 +63,14 @@ public final class DockerNatManagerTest {
@Test
public void assertThatExternalIPIsEqualToDefaultHostIfIpDetectorCannotRetrieveIP()
throws ExecutionException, InterruptedException {
when(hostBasedIpDetector.detectExternalIp()).thenReturn(Optional.empty());
final NatManager natManager =
new DockerNatManager(hostBasedIpDetector, advertisedHost, p2pPort, rpcHttpPort);
when(hostBasedIpDetector.detectAdvertisedIp()).thenReturn(Optional.empty());
try {
natManager.start();
} catch (NatInitializationException e) {
Assertions.fail(e.getMessage());
}
assertThat(natManager.queryExternalIPAddress().get()).isEqualTo(advertisedHost);
}

@ -0,0 +1,157 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.nat.kubernetes;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_SERVICE_NAME_FILTER;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.nat.core.domain.NatPortMapping;
import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1ServicePort;
import io.kubernetes.client.models.V1ServiceSpec;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public final class KubernetesClusterIpNatManagerTest {
private final String detectedAdvertisedHost = "199.45.69.12";
private final int p2pPort = 1;
private final int rpcHttpPort = 2;
@Mock private V1Service v1Service;
private KubernetesNatManager natManager;
@Before
public void initialize() throws IOException {
when(v1Service.getSpec())
.thenReturn(
new V1ServiceSpec()
.type("ClusterIP")
.clusterIP(detectedAdvertisedHost)
.ports(
Arrays.asList(
new V1ServicePort()
.name(NatServiceType.JSON_RPC.getValue())
.port(rpcHttpPort)
.targetPort(new IntOrString(rpcHttpPort)),
new V1ServicePort()
.name(NatServiceType.RLPX.getValue())
.port(p2pPort)
.targetPort(new IntOrString(p2pPort)),
new V1ServicePort()
.name(NatServiceType.DISCOVERY.getValue())
.port(p2pPort)
.targetPort(new IntOrString(p2pPort)))));
when(v1Service.getMetadata())
.thenReturn(new V1ObjectMeta().name(DEFAULT_BESU_SERVICE_NAME_FILTER));
natManager = new KubernetesNatManager(DEFAULT_BESU_SERVICE_NAME_FILTER);
try {
natManager.start();
} catch (Exception ignored) {
System.err.println("Ignored missing Kube config file in testing context.");
}
natManager.updateUsingBesuService(v1Service);
}
@Test
public void assertThatExternalIPIsEqualToRemoteHost()
throws ExecutionException, InterruptedException {
assertThat(natManager.queryExternalIPAddress().get()).isEqualTo(detectedAdvertisedHost);
}
@Test
public void assertThatLocalIPIsEqualToLocalHost()
throws ExecutionException, InterruptedException, UnknownHostException {
final String internalHost = InetAddress.getLocalHost().getHostAddress();
assertThat(natManager.queryLocalIPAddress().get()).isEqualTo(internalHost);
}
@Test
public void assertThatMappingForDiscoveryWorks() throws UnknownHostException {
final String internalHost = InetAddress.getLocalHost().getHostAddress();
final NatPortMapping mapping =
natManager.getPortMapping(NatServiceType.DISCOVERY, NetworkProtocol.UDP);
final NatPortMapping expectedMapping =
new NatPortMapping(
NatServiceType.DISCOVERY,
NetworkProtocol.UDP,
internalHost,
detectedAdvertisedHost,
p2pPort,
p2pPort);
assertThat(mapping).isEqualToComparingFieldByField(expectedMapping);
}
@Test
public void assertThatMappingForJsonRpcWorks() throws UnknownHostException {
final String internalHost = InetAddress.getLocalHost().getHostAddress();
final NatPortMapping mapping =
natManager.getPortMapping(NatServiceType.JSON_RPC, NetworkProtocol.TCP);
final NatPortMapping expectedMapping =
new NatPortMapping(
NatServiceType.JSON_RPC,
NetworkProtocol.TCP,
internalHost,
detectedAdvertisedHost,
rpcHttpPort,
rpcHttpPort);
assertThat(mapping).isEqualToComparingFieldByField(expectedMapping);
}
@Test
public void assertThatMappingForRlpxWorks() throws UnknownHostException {
final String internalHost = InetAddress.getLocalHost().getHostAddress();
final NatPortMapping mapping =
natManager.getPortMapping(NatServiceType.RLPX, NetworkProtocol.TCP);
final NatPortMapping expectedMapping =
new NatPortMapping(
NatServiceType.RLPX,
NetworkProtocol.TCP,
internalHost,
detectedAdvertisedHost,
p2pPort,
p2pPort);
assertThat(mapping).isEqualToComparingFieldByField(expectedMapping);
}
}

@ -15,7 +15,7 @@
package org.hyperledger.besu.nat.kubernetes;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_POD_NAME_FILTER;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_SERVICE_NAME_FILTER;
import static org.mockito.Mockito.when;
import org.hyperledger.besu.nat.core.domain.NatPortMapping;
@ -43,7 +43,7 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public final class KubernetesNatManagerTest {
public final class KubernetesLoadManagerNatManagerTest {
private final String detectedAdvertisedHost = "199.45.69.12";
@ -67,6 +67,7 @@ public final class KubernetesNatManagerTest {
when(v1Service.getSpec())
.thenReturn(
new V1ServiceSpec()
.type("LoadBalancer")
.ports(
Arrays.asList(
new V1ServicePort()
@ -81,8 +82,9 @@ public final class KubernetesNatManagerTest {
.name(NatServiceType.DISCOVERY.getValue())
.port(p2pPort)
.targetPort(new IntOrString(p2pPort)))));
when(v1Service.getMetadata()).thenReturn(new V1ObjectMeta().name(DEFAULT_BESU_POD_NAME_FILTER));
natManager = new KubernetesNatManager(DEFAULT_BESU_POD_NAME_FILTER);
when(v1Service.getMetadata())
.thenReturn(new V1ObjectMeta().name(DEFAULT_BESU_SERVICE_NAME_FILTER));
natManager = new KubernetesNatManager(DEFAULT_BESU_SERVICE_NAME_FILTER);
try {
natManager.start();
} catch (Exception ignored) {

@ -0,0 +1,54 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.nat.kubernetes;
import static org.hyperledger.besu.nat.kubernetes.KubernetesNatManager.DEFAULT_BESU_SERVICE_NAME_FILTER;
import static org.mockito.Mockito.when;
import io.kubernetes.client.models.V1ObjectMeta;
import io.kubernetes.client.models.V1Service;
import io.kubernetes.client.models.V1ServiceSpec;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public final class KubernetesUnknownNatManagerTest {
@Mock private V1Service v1Service;
private KubernetesNatManager natManager;
@Before
public void initialize() {
when(v1Service.getSpec()).thenReturn(new V1ServiceSpec().type("Unknown"));
when(v1Service.getMetadata())
.thenReturn(new V1ObjectMeta().name(DEFAULT_BESU_SERVICE_NAME_FILTER));
natManager = new KubernetesNatManager(DEFAULT_BESU_SERVICE_NAME_FILTER);
try {
natManager.start();
} catch (Exception ignored) {
System.err.println("Ignored missing Kube config file in testing context.");
}
}
@Test(expected = RuntimeException.class)
public void assertThatNatExceptionIsThrownWithUnknownServiceType() {
natManager.updateUsingBesuService(v1Service);
}
}
Loading…
Cancel
Save