Skip to content

Commit

Permalink
Allow client provided custom endpoint as override router, subject to …
Browse files Browse the repository at this point in the history
…configuration

    Pseudo Cherry pick of PR 26748
  • Loading branch information
hugofirth committed Nov 29, 2024
1 parent 62d9790 commit 5ab6674
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.neo4j.configuration.SettingConstraints.is;
import static org.neo4j.configuration.SettingConstraints.max;
import static org.neo4j.configuration.SettingConstraints.min;
import static org.neo4j.configuration.SettingConstraints.minSize;
import static org.neo4j.configuration.SettingConstraints.range;
import static org.neo4j.configuration.SettingImpl.newBuilder;
import static org.neo4j.configuration.SettingValueParsers.BOOL;
Expand Down Expand Up @@ -856,4 +857,38 @@ public enum SplittingTopBehavior
.addConstraint( min( 0 ) )
.build();

@Internal
@Description(
"Enables returning the client address provided by a driver as the sole router in routing tables. This makes it easier for networking "
+ "middleware to migrate workload to new Neo4j servers, or even a new DBMS." )
public static final Setting<Boolean> client_provided_router_enabled = newBuilder(
"unsupported.dbms.routing.client_provided_router_enabled", BOOL, false)
.build();

@Internal
@Description(
"A list of prefixes to append to the client provided router address when the server cycles through addresses to force a routing table "
+ "update." )
public static final Setting<List<String>> client_provided_router_prefixes = newBuilder(
"unsupported.dbms.routing.client_provided_router_prefixes",
listOf(STRING),
List.of("a", "b", "c", "d"))
.addConstraint(minSize(2))
.build();

@Internal
@Description(
"The period of time for the server to wait between cycling client provided router address prefixes to force a routing table update." )
public static final Setting<Duration> client_provided_router_prefix_rotation_period = newBuilder(
"unsupported.dbms.routing.client_provided_router_rotation_period", DURATION, Duration.ofMinutes(1))
.addConstraint(range(Duration.ofSeconds(10), Duration.ofDays(1)))
.build();

@Internal
@Description(
"The suffix we expect on the client provided address in order to trigger the behaviour of client_provided_router_enabled and return said "
+ "address as the sole router in routing tables." )
public static final Setting<String> client_provided_router_suffix = newBuilder(
"unsupported.dbms.routing.client_provided_address_suffix", STRING, "endpoints.neo4j.io")
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,32 @@ public String getDescription()
};
}

public static <T> SettingConstraint<List<T>> minSize( final int size )
{
return new SettingConstraint<>()
{
@Override
public void validate( List<T> value, Configuration config )
{
if ( value == null )
{
throw new IllegalArgumentException( "can not be null" );
}

if ( value.size() <= size )
{
throw new IllegalArgumentException( format( "needs to be greater than size of %s", size ) );
}
}

@Override
public String getDescription()
{
return format( "has minimum size `%s`", size );
}
};
}

public static final SettingConstraint<SocketAddress> HOSTNAME_ONLY = new SettingConstraint<>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ protected AbstractRoutingProcedureInstaller createRoutingProcedureInstaller( Glo
var logProvider = globalModule.getLogService().getInternalLogProvider();
var databaseAvailabilityChecker = new DefaultDatabaseAvailabilityChecker( databaseManager );
return new SingleInstanceRoutingProcedureInstaller( databaseAvailabilityChecker, clientRoutingDomainChecker,
portRegister, config, logProvider, databaseReferenceRepo, defaultDatabaseResolver );
portRegister, config, logProvider, databaseReferenceRepo, defaultDatabaseResolver,
globalModule.getGlobalClock() );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.procedure.builtin.routing;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
Expand All @@ -41,6 +43,7 @@
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.procedure.Mode;
import org.neo4j.time.Clocks;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.TextValue;
import org.neo4j.values.virtual.MapValue;
Expand Down Expand Up @@ -74,19 +77,25 @@ public final class GetRoutingTableProcedure implements CallableProcedure
private final Supplier<Boolean> boltEnabled;
private final boolean logInfoRoutingTableResult;
private final DefaultDatabaseResolver defaultDatabaseResolver;
private final boolean clientProvidedRouterEnabled;
private final List<String> clientProvidedRouterPrefixes;
private final Duration clientProvidedRouterPrefixRotationPeriod;
private final String clientProvidedRouterSuffix;
private final Clock clock;

public GetRoutingTableProcedure( List<String> namespace, String description, DatabaseReferenceRepository databaseReferenceRepo,
RoutingTableProcedureValidator validator, SingleAddressRoutingTableProvider routingTableProvider,
ClientRoutingDomainChecker clientRoutingDomainChecker, Config config, LogProvider logProvider, DefaultDatabaseResolver defaultDatabaseResolver )
RoutingTableProcedureValidator validator, SingleAddressRoutingTableProvider routingTableProvider,
ClientRoutingDomainChecker clientRoutingDomainChecker, Config config, LogProvider logProvider,
DefaultDatabaseResolver defaultDatabaseResolver, Clock clock )
{
this( namespace, description, databaseReferenceRepo, validator, routingTableProvider,
routingTableProvider, clientRoutingDomainChecker, config, logProvider, defaultDatabaseResolver );
routingTableProvider, clientRoutingDomainChecker, config, logProvider, defaultDatabaseResolver, clock );
}

public GetRoutingTableProcedure( List<String> namespace, String description, DatabaseReferenceRepository databaseReferenceRepo,
RoutingTableProcedureValidator validator, ClientSideRoutingTableProvider clientSideRoutingTableProvider,
ServerSideRoutingTableProvider serverSideRoutingTableProvider, ClientRoutingDomainChecker clientRoutingDomainChecker,
Config config, LogProvider logProvider, DefaultDatabaseResolver defaultDatabaseResolver )
RoutingTableProcedureValidator validator, ClientSideRoutingTableProvider clientSideRoutingTableProvider,
ServerSideRoutingTableProvider serverSideRoutingTableProvider, ClientRoutingDomainChecker clientRoutingDomainChecker,
Config config, LogProvider logProvider, DefaultDatabaseResolver defaultDatabaseResolver, Clock clock )
{
this.signature = buildSignature( namespace, description );
this.databaseReferenceRepo = databaseReferenceRepo;
Expand All @@ -99,6 +108,12 @@ public GetRoutingTableProcedure( List<String> namespace, String description, Dat
this.boltEnabled = () -> config.get( BoltConnector.enabled );
this.logInfoRoutingTableResult = config.get( GraphDatabaseInternalSettings.pagecache_warmup_blocking );
this.defaultDatabaseResolver = defaultDatabaseResolver;
this.clientProvidedRouterEnabled = config.get(GraphDatabaseInternalSettings.client_provided_router_enabled);
this.clientProvidedRouterPrefixes = config.get(GraphDatabaseInternalSettings.client_provided_router_prefixes);
this.clientProvidedRouterSuffix = config.get(GraphDatabaseInternalSettings.client_provided_router_suffix);
this.clientProvidedRouterPrefixRotationPeriod =
config.get(GraphDatabaseInternalSettings.client_provided_router_prefix_rotation_period);
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -143,22 +158,34 @@ private RoutingResult invoke( DatabaseReference databaseReference, MapValue rout
{
var clientProvidedAddress = RoutingTableProcedureHelpers.findClientProvidedAddress( routingContext, BoltConnector.DEFAULT_PORT, log );
var isInternalRef = databaseReference instanceof DatabaseReference.Internal;
RoutingResult result;

if ( !isInternalRef )
{
return serverSideRoutingTableProvider.getServerSideRoutingTable( clientProvidedAddress );
result = serverSideRoutingTableProvider.getServerSideRoutingTable( clientProvidedAddress );
}

var defaultRouter = defaultRouterSupplier.get();
if ( configAllowsForClientSideRouting( defaultRouter, clientProvidedAddress ) )
else if ( configAllowsForClientSideRouting(defaultRouterSupplier.get(), clientProvidedAddress ) )
{
validator.isValidForClientSideRouting( (DatabaseReference.Internal) databaseReference );
return clientSideRoutingTableProvider.getRoutingResultForClientSideRouting( (DatabaseReference.Internal) databaseReference, routingContext );
result = clientSideRoutingTableProvider.getRoutingResultForClientSideRouting( (DatabaseReference.Internal) databaseReference, routingContext );
}
else
{
validator.isValidForServerSideRouting( (DatabaseReference.Internal) databaseReference );
return serverSideRoutingTableProvider.getServerSideRoutingTable( clientProvidedAddress );
result = serverSideRoutingTableProvider.getServerSideRoutingTable( clientProvidedAddress );
}

var validClientProvidedRouterExists = clientProvidedAddress
.map(a -> a.getHostname().endsWith(clientProvidedRouterSuffix))
.orElse(false);
var shouldReplaceRouter = clientProvidedRouterEnabled && validClientProvidedRouterExists;

if ( shouldReplaceRouter )
{
result = replaceRouterWithClientProvidedAddress(result, clientProvidedAddress.get());
}

return result;
}

private DatabaseReference extractDatabaseReference( AnyValue[] input, String user ) throws ProcedureException
Expand Down Expand Up @@ -268,4 +295,28 @@ private void assertNotIllegalAliasChain( DatabaseReference databaseReference, Ma
}
}

private RoutingResult replaceRouterWithClientProvidedAddress( RoutingResult oldResult, SocketAddress clientProvidedAddress )
{

var millisSinceEpoch = clock.instant().toEpochMilli();
var prefix = calculateClientProvidedRouterPrefix(
this.clientProvidedRouterPrefixes,
this.clientProvidedRouterPrefixRotationPeriod.toMillis(),
millisSinceEpoch );

return new RoutingResult(
List.of( new SocketAddress(
String.format("%s-%s", prefix, clientProvidedAddress.getHostname()),
clientProvidedAddress.getPort() ) ),
oldResult.writeEndpoints(),
oldResult.readEndpoints(),
oldResult.ttlMillis() );
}

public static String calculateClientProvidedRouterPrefix( List<String> prefixes, long rotationPeriodMills, long millisSinceEpoch )
{
var periodsSinceEpoch = millisSinceEpoch / rotationPeriodMills;
var prefixToSelect = (int) ( periodsSinceEpoch % prefixes.size() );
return prefixes.get( prefixToSelect );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
*/
package org.neo4j.procedure.builtin.routing;

import java.time.Clock;
import java.util.List;

import org.neo4j.configuration.Config;
import org.neo4j.configuration.connectors.ConnectorPortRegister;
import org.neo4j.kernel.database.DatabaseReferenceRepository;
import org.neo4j.kernel.database.DefaultDatabaseResolver;
import org.neo4j.logging.LogProvider;
import org.neo4j.time.Clocks;

import static org.neo4j.procedure.builtin.routing.RoutingTableTTLProvider.ttlFromConfig;

Expand All @@ -40,10 +42,11 @@ public final class SingleInstanceRoutingProcedureInstaller extends AbstractRouti
private final Config config;
private final LogProvider logProvider;
private final DefaultDatabaseResolver defaultDatabaseResolver;
private final Clock clock;

public SingleInstanceRoutingProcedureInstaller( DatabaseAvailabilityChecker databaseAvailabilityChecker,
ClientRoutingDomainChecker clientRoutingDomainChecker, ConnectorPortRegister portRegister, Config config, LogProvider logProvider,
DatabaseReferenceRepository databaseReferenceRepo, DefaultDatabaseResolver defaultDatabaseResolver )
DatabaseReferenceRepository databaseReferenceRepo, DefaultDatabaseResolver defaultDatabaseResolver, Clock clock )
{
this.databaseAvailabilityChecker = databaseAvailabilityChecker;
this.clientRoutingDomainChecker = clientRoutingDomainChecker;
Expand All @@ -52,6 +55,7 @@ public SingleInstanceRoutingProcedureInstaller( DatabaseAvailabilityChecker data
this.logProvider = logProvider;
this.databaseReferenceRepo = databaseReferenceRepo;
this.defaultDatabaseResolver = defaultDatabaseResolver;
this.clock = clock;
}

@Override
Expand All @@ -62,6 +66,6 @@ public GetRoutingTableProcedure createProcedure( List<String> namespace )
portRegister, RoutingOption.ROUTE_WRITE_AND_READ, config, logProvider, ttlFromConfig( config ) );

return new GetRoutingTableProcedure( namespace, DESCRIPTION, databaseReferenceRepo, validator, routingTableProvider, clientRoutingDomainChecker,
config, logProvider, defaultDatabaseResolver );
config, logProvider, defaultDatabaseResolver, clock );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.time.Clocks;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;
Expand Down Expand Up @@ -515,7 +516,7 @@ protected GetRoutingTableProcedure newProcedure( DatabaseAvailabilityChecker dat
var clientRoutingDomainChecker = SimpleClientRoutingDomainChecker.fromConfig( config, logProvider );
var defaultDatabaseResolver = mock( DefaultDatabaseResolver.class );
return new SingleInstanceRoutingProcedureInstaller( databaseAvailabilityChecker, clientRoutingDomainChecker,
portRegister, config, logProvider, databaseReferenceRepo, defaultDatabaseResolver )
portRegister, config, logProvider, databaseReferenceRepo, defaultDatabaseResolver, Clocks.fakeClock() )
.createProcedure( DEFAULT_NAMESPACE );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.kernel.database.DefaultDatabaseResolver;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.procedure.impl.GlobalProceduresRegistry;
import org.neo4j.time.Clocks;

import static java.util.stream.Collectors.toSet;
import static org.eclipse.collections.impl.set.mutable.UnifiedSet.newSetWith;
Expand All @@ -52,8 +53,9 @@ void shouldRegisterRoutingProcedures() throws Exception
var logProvider = NullLogProvider.getInstance();
var defaultDatabaseResolver = mock( DefaultDatabaseResolver.class );

var installer = new SingleInstanceRoutingProcedureInstaller( databaseAvailabilityChecker, clientRoutingDomainChecker,
portRegister, config, logProvider, databaseReferenceRepo, defaultDatabaseResolver );
var installer = new SingleInstanceRoutingProcedureInstaller( databaseAvailabilityChecker,
clientRoutingDomainChecker, portRegister, config, logProvider, databaseReferenceRepo,
defaultDatabaseResolver, Clocks.fakeClock() );
var procedures = spy( new GlobalProceduresRegistry() );

installer.install( procedures );
Expand Down

0 comments on commit 5ab6674

Please sign in to comment.