Skip to content

Commit

Permalink
Fixed SSL handling
Browse files Browse the repository at this point in the history
This update fixes a number of SSL-related tests in testkit and CausalClusteringIT.shouldDropBrokenOldConnections test.
The connection pooling strategy has been updated to use the same connection pool when the connection host is unambiguous.
Removed hardcoded domain name resolution from the BoltServerAddress and moved the logic to ChannelConnectorImpl that uses the DomainNameResolver.
  • Loading branch information
injectives committed Mar 18, 2021
1 parent b98b697 commit 8727aae
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@
*/
package org.neo4j.driver.internal;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

import org.neo4j.driver.net.ServerAddress;

Expand All @@ -39,11 +34,10 @@ public class BoltServerAddress implements ServerAddress
public static final int DEFAULT_PORT = 7687;
public static final BoltServerAddress LOCAL_DEFAULT = new BoltServerAddress( "localhost", DEFAULT_PORT );

private final String host; // This could either be the same as originalHost or it is an IP address resolved from the original host.
private final int port;
protected final String host; // Host or IP address.
private final String connectionHost; // Either is equal to the host or is explicitly provided on creation and is expected to be a resolved IP address.
protected final int port;
private final String stringValue;

private final Set<BoltServerAddress> resolved;

public BoltServerAddress( String address )
{
Expand All @@ -57,15 +51,17 @@ public BoltServerAddress( URI uri )

public BoltServerAddress( String host, int port )
{
this( host, port, Collections.emptySet() );
this( host, host, port );
}

public BoltServerAddress( String host, int port, Set<BoltServerAddress> resolved )
public BoltServerAddress( String host, String connectionHost, int port )
{
this.host = requireNonNull( host, "host" );
this.connectionHost = requireNonNull( connectionHost, "connectionHost" );
this.port = requireValidPort( port );
this.stringValue = String.format( "%s:%d", host, port );
this.resolved = Collections.unmodifiableSet( new LinkedHashSet<>( resolved ) );
this.stringValue = host.equals( connectionHost )
? String.format( "%s:%d", host, port )
: String.format( "%s(%s):%d", host, connectionHost, port );
}

public static BoltServerAddress from( ServerAddress address )
Expand All @@ -86,14 +82,14 @@ public boolean equals( Object o )
{
return false;
}
BoltServerAddress that = (BoltServerAddress) o;
return port == that.port && host.equals( that.host );
BoltServerAddress address = (BoltServerAddress) o;
return port == address.port && host.equals( address.host ) && connectionHost.equals( address.connectionHost );
}

@Override
public int hashCode()
{
return Objects.hash( host, port );
return Objects.hash( host, connectionHost, port );
}

@Override
Expand All @@ -102,18 +98,6 @@ public String toString()
return stringValue;
}

/**
* Create a {@link SocketAddress} from this bolt address. This method always attempts to resolve the hostname into
* an {@link InetAddress}.
*
* @return new socket address.
* @see InetSocketAddress
*/
public SocketAddress toSocketAddress()
{
return new InetSocketAddress( host, port );
}

@Override
public String host()
{
Expand All @@ -126,9 +110,21 @@ public int port()
return port;
}

public Set<BoltServerAddress> resolved()
public String connectionHost()
{
return connectionHost;
}

/**
* Create a stream of unicast addresses.
* <p>
* While this implementation just returns a stream of itself, the subclasses may provide multiple addresses.
*
* @return stream of unicast addresses.
*/
public Stream<BoltServerAddress> unicastStream()
{
return this.resolved;
return Stream.of( this );
}

private static String hostFrom( URI uri )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

/**
* An explicitly resolved version of {@link BoltServerAddress} that always contains one or more resolved IP addresses.
*/
public class ResolvedBoltServerAddress extends BoltServerAddress
{
private static final String HOST_ADDRESSES_FORMAT = "%s%s:%d";
private static final int MAX_HOST_ADDRESSES_IN_STRING_VALUE = 5;
private static final String HOST_ADDRESS_DELIMITER = ",";
private static final String HOST_ADDRESSES_PREFIX = "(";
private static final String HOST_ADDRESSES_SUFFIX = ")";
private static final String TRIMMED_HOST_ADDRESSES_SUFFIX = ",..." + HOST_ADDRESSES_SUFFIX;

private final Set<InetAddress> resolvedAddresses;
private final String stringValue;

public ResolvedBoltServerAddress( String host, int port, InetAddress[] resolvedAddressesArr )
{
super( host, port );
requireNonNull( resolvedAddressesArr, "resolvedAddressesArr" );
if ( resolvedAddressesArr.length == 0 )
{
throw new IllegalArgumentException(
"The resolvedAddressesArr must not be empty, check your DomainNameResolver is compliant with the interface contract" );
}
resolvedAddresses = Collections.unmodifiableSet( new LinkedHashSet<>( Arrays.asList( resolvedAddressesArr ) ) );
stringValue = createStringRepresentation();
}

/**
* Create a stream of unicast addresses.
* <p>
* The stream is created from the list of resolved IP addresses. Each unicast address is given a unique IP address as the connectionHost value.
*
* @return stream of unicast addresses.
*/
@Override
public Stream<BoltServerAddress> unicastStream()
{
return resolvedAddresses.stream().map( address -> new BoltServerAddress( host, address.getHostAddress(), port ) );
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
if ( !super.equals( o ) )
{
return false;
}
ResolvedBoltServerAddress that = (ResolvedBoltServerAddress) o;
return resolvedAddresses.equals( that.resolvedAddresses );
}

@Override
public int hashCode()
{
return Objects.hash( super.hashCode(), resolvedAddresses );
}

@Override
public String toString()
{
return stringValue;
}

private String createStringRepresentation()
{
String hostAddresses = resolvedAddresses.stream()
.limit( MAX_HOST_ADDRESSES_IN_STRING_VALUE )
.map( InetAddress::getHostAddress )
.collect( joining( HOST_ADDRESS_DELIMITER, HOST_ADDRESSES_PREFIX,
resolvedAddresses.size() > MAX_HOST_ADDRESSES_IN_STRING_VALUE
? TRIMMED_HOST_ADDRESSES_SUFFIX
: HOST_ADDRESSES_SUFFIX ) );
return String.format( HOST_ADDRESSES_FORMAT, host, hostAddresses, port );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.resolver.AddressResolverGroup;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import org.neo4j.driver.AuthToken;
import org.neo4j.driver.AuthTokens;
Expand All @@ -53,6 +54,7 @@ public class ChannelConnectorImpl implements ChannelConnector
private final int connectTimeoutMillis;
private final Logging logging;
private final Clock clock;
private final DomainNameResolver domainNameResolver;
private final AddressResolverGroup<InetSocketAddress> addressResolverGroup;

public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
Expand All @@ -73,7 +75,8 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan
this.pipelineBuilder = pipelineBuilder;
this.logging = requireNonNull( logging );
this.clock = requireNonNull( clock );
this.addressResolverGroup = new NettyDomainNameResolverGroup( requireNonNull( domainNameResolver ) );
this.domainNameResolver = requireNonNull( domainNameResolver );
this.addressResolverGroup = new NettyDomainNameResolverGroup( this.domainNameResolver );
}

@Override
Expand All @@ -83,7 +86,17 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) );
bootstrap.resolver( addressResolverGroup );

ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
SocketAddress socketAddress;
try
{
socketAddress = new InetSocketAddress( domainNameResolver.resolve( address.connectionHost() )[0], address.port() );
}
catch ( Throwable t )
{
socketAddress = InetSocketAddress.createUnresolved( address.connectionHost(), address.port() );
}

ChannelFuture channelConnected = bootstrap.connect( socketAddress );

Channel channel = channelConnected.channel();
ChannelPromise handshakeCompleted = channel.newPromise();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,15 @@
import io.netty.util.concurrent.EventExecutorGroup;

import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logger;
Expand All @@ -42,6 +39,7 @@
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DomainNameResolver;
import org.neo4j.driver.internal.ResolvedBoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.net.ServerAddress;
Expand Down Expand Up @@ -308,7 +306,7 @@ public List<BoltServerAddress> resolve() throws UnknownHostException
{
try
{
resolvedAddresses.addAll( resolveAllByDomainName( BoltServerAddress.from( serverAddress ) ) );
resolveAllByDomainName( serverAddress ).unicastStream().forEach( resolvedAddresses::add );
}
catch ( UnknownHostException e )
{
Expand Down Expand Up @@ -345,21 +343,22 @@ private BoltServerAddress resolveByDomainNameOrThrowCompletionException( BoltSer
{
try
{
Set<BoltServerAddress> resolvedAddresses = resolveAllByDomainName( address );
routingTable.replaceRouterIfPresent( address, new BoltServerAddress( address.host(), address.port(), resolvedAddresses ) );
return resolvedAddresses.stream().findFirst().orElseThrow(
() -> new IllegalStateException( "Domain name resolution returned empty result set and has not thrown an exception" ) );
ResolvedBoltServerAddress resolvedAddress = resolveAllByDomainName( address );
routingTable.replaceRouterIfPresent( address, resolvedAddress );
return resolvedAddress.unicastStream()
.findFirst()
.orElseThrow(
() -> new IllegalStateException(
"Unexpected condition, the ResolvedBoltServerAddress must always have at least one unicast address" ) );
}
catch ( Throwable e )
{
throw new CompletionException( e );
}
}

private Set<BoltServerAddress> resolveAllByDomainName( BoltServerAddress address ) throws UnknownHostException
private ResolvedBoltServerAddress resolveAllByDomainName( ServerAddress address ) throws UnknownHostException
{
return Arrays.stream( domainNameResolver.resolve( address.host() ) )
.map( inetAddress -> new BoltServerAddress( inetAddress.getHostAddress(), address.port() ) )
.collect( Collectors.toCollection( LinkedHashSet::new ) );
return new ResolvedBoltServerAddress( address.host(), address.port(), domainNameResolver.resolve( address.host() ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,18 @@ else if ( routingTable.isStaleFor( context.mode() ) )
}
}

private synchronized void freshClusterCompositionFetched( ClusterCompositionLookupResult composition )
private synchronized void freshClusterCompositionFetched( ClusterCompositionLookupResult compositionLookupResult )
{
try
{
routingTable.update( composition.getClusterComposition() );
routingTable.update( compositionLookupResult.getClusterComposition() );
routingTableRegistry.removeAged();

Set<BoltServerAddress> addressesToRetain = new LinkedHashSet<>();
for ( BoltServerAddress address : routingTableRegistry.allServers() )
{
addressesToRetain.add( address );
addressesToRetain.addAll( address.resolved() );
}
composition.getResolvedInitialRouters().ifPresent(
routingTableRegistry.allServers().stream()
.flatMap( BoltServerAddress::unicastStream )
.forEach( addressesToRetain::add );
compositionLookupResult.getResolvedInitialRouters().ifPresent(
addresses ->
{
resolvedInitialRouters.clear();
Expand Down
Loading

0 comments on commit 8727aae

Please sign in to comment.