@ -22,12 +22,9 @@ import org.hyperledger.besu.nat.core.domain.NatServiceType;
import org.hyperledger.besu.nat.core.domain.NetworkProtocol ;
import org.hyperledger.besu.nat.core.exception.NatInitializationException ;
import java.nio.charset.Charset ;
import java.nio.file.Files ;
import java.nio.file.Paths ;
import java.net.InetAddress ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Optional ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.TimeUnit ;
@ -35,9 +32,11 @@ import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiClient ;
import io.kubernetes.client.Configuration ;
import io.kubernetes.client.apis.CoreV1Api ;
import io.kubernetes.client.models.V1LoadBalancerIngress ;
import io.kubernetes.client.models.V1Service ;
import io.kubernetes.client.util.ClientBuilder ;
import io.kubernetes.client.util.KubeConfig ;
import io.kubernetes.client.util.authenticators.GCPAuthenticator ;
import org.apache.logging.log4j.LogManager ;
import org.apache.logging.log4j.Logger ;
@ -48,8 +47,6 @@ import org.apache.logging.log4j.Logger;
public class KubernetesNatManager extends AbstractNatManager {
protected static final Logger LOG = LogManager . getLogger ( ) ;
private static final String KUBE_CONFIG_PATH_ENV = "KUBE_CONFIG_PATH" ;
private static final String DEFAULT_KUBE_CONFIG_PATH = "~/.kube/config" ;
private static final String DEFAULT_BESU_POD_NAME_FILTER = "besu" ;
private String internalAdvertisedHost ;
@ -63,34 +60,27 @@ public class KubernetesNatManager extends AbstractNatManager {
protected void doStart ( ) throws NatInitializationException {
LOG . info ( "Starting kubernetes NAT manager." ) ;
try {
KubeConfig . registerAuthenticator ( new GCPAuthenticator ( ) ) ;
LOG . debug ( "Trying to update information using Kubernetes client SDK." ) ;
final String kubeConfigPath =
Optional . ofNullable ( System . getenv ( KUBE_CONFIG_PATH_ENV ) ) . orElse ( DEFAULT_KUBE_CONFIG_PATH ) ;
LOG . debug (
"Checking if Kubernetes config file is present on file system: {}." , kubeConfigPath ) ;
if ( ! Files . exists ( Paths . get ( kubeConfigPath ) ) ) {
throw new NatInitializationException ( "Cannot locate Kubernetes config file." ) ;
}
// loading the out-of-cluster config, a kubeconfig from file-system
final ApiClient client =
ClientBuilder . kubeconfig (
KubeConfig . loadKubeConfig (
Files . newBufferedReader ( Paths . get ( kubeConfigPath ) , Charset . defaultCharset ( ) ) ) )
. build ( ) ;
final ApiClient client = ClientBuilder . cluster ( ) . build ( ) ;
// set the global default api-client to the in-cluster one from above
Configuration . setDefaultApiClient ( client ) ;
// the CoreV1Api loads default api-client from global configuration.
CoreV1Api api = new CoreV1Api ( ) ;
final CoreV1Api api = new CoreV1Api ( ) ;
// invokes the CoreV1Api client
api . listServiceForAllNamespaces ( null , null , null , null , null , null , null , null , null )
. getItems ( ) . stream ( )
. filter (
v1Service - > v1Service . getMetadata ( ) . getName ( ) . contains ( DEFAULT_BESU_POD_NAME_FILTER ) )
. findFirst ( )
. ifPresent ( this : : updateUsingBesuService ) ;
final V1Service service =
api . listServiceForAllNamespaces ( null , null , null , null , null , null , null , null , null )
. getItems ( ) . stream ( )
. filter (
v1Service - >
v1Service . getMetadata ( ) . getName ( ) . contains ( DEFAULT_BESU_POD_NAME_FILTER ) )
. findFirst ( )
. orElseThrow ( ( ) - > new NatInitializationException ( "Service not found" ) ) ;
updateUsingBesuService ( service ) ;
} catch ( Exception e ) {
throw new NatInitializationException (
"Failed update information using Kubernetes client SDK." , e ) ;
@ -101,8 +91,25 @@ public class KubernetesNatManager extends AbstractNatManager {
void updateUsingBesuService ( final V1Service service ) throws RuntimeException {
try {
LOG . info ( "Found Besu service: {}" , service . getMetadata ( ) . getName ( ) ) ;
LOG . info ( "Setting host IP to: {}." , service . getSpec ( ) . getClusterIP ( ) ) ;
internalAdvertisedHost = service . getSpec ( ) . getClusterIP ( ) ;
final V1LoadBalancerIngress v1LoadBalancerIngress =
service . getStatus ( ) . getLoadBalancer ( ) . getIngress ( ) . stream ( )
. filter (
v1LoadBalancerIngress1 - >
v1LoadBalancerIngress1 . getHostname ( ) ! = null
| | v1LoadBalancerIngress1 . getIp ( ) ! = null )
. findFirst ( )
. orElseThrow ( ( ) - > new NatInitializationException ( "Ingress not found" ) ) ;
if ( v1LoadBalancerIngress . getHostname ( ) ! = null ) {
internalAdvertisedHost =
InetAddress . getByName ( v1LoadBalancerIngress . getHostname ( ) ) . getHostAddress ( ) ;
} else {
internalAdvertisedHost = v1LoadBalancerIngress . getIp ( ) ;
}
LOG . info ( "Setting host IP to: {}." , internalAdvertisedHost ) ;
final String internalHost = queryLocalIPAddress ( ) . get ( TIMEOUT_SECONDS , TimeUnit . SECONDS ) ;
service
. getSpec ( )