Skip to content

Commit

Permalink
Fix flaky CausalClusteringStressIT (#800) (#806)
Browse files Browse the repository at this point in the history
Fix flaky CausalClusteringStressIT by removing read distribution checks from stress tests
  • Loading branch information
gjmwoods authored Jan 14, 2021
1 parent 5092f1f commit 69e10e9
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@ public final long getCreatedNodesCount()
public final void readCompleted( ResultSummary summary )
{
readNodesCount.incrementAndGet();
processSummary( summary );
}

public abstract void processSummary( ResultSummary summary );

public long getReadNodesCount()
{
return readNodesCount.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ Config config()

abstract boolean handleWriteFailure( Throwable error, C context );

abstract void assertExpectedReadQueryDistribution( C context );

abstract <A extends C> void printStats( A context );

private List<Future<?>> launchBlockingWorkerThreads( C context )
Expand Down Expand Up @@ -437,7 +435,6 @@ private void verifyResults( C context, ResourcesInfo resourcesInfo )
assertNoFileDescriptorLeak( resourcesInfo.openFileDescriptorCount );
assertNoLoggersLeak( resourcesInfo.acquiredLoggerNames );
assertExpectedNumberOfNodesCreated( context.getCreatedNodesCount() );
assertExpectedReadQueryDistribution( context );
}

private void assertNoFileDescriptorLeak( long previousOpenFileDescriptors )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,14 @@

import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.util.cc.ClusterMemberRole;
import org.neo4j.driver.util.cc.ClusterMemberRoleDiscoveryFactory;
import org.neo4j.driver.util.cc.LocalOrRemoteClusterExtension;

import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.junit.MatcherAssert.assertThat;

class CausalClusteringStressIT extends AbstractStressTestBase<CausalClusteringStressIT.Context>
{
@RegisterExtension
Expand Down Expand Up @@ -104,27 +85,6 @@ boolean handleWriteFailure( Throwable error, Context context )
return false;
}

@Override
void assertExpectedReadQueryDistribution( Context context )
{
Map<String,Long> readQueriesByServer = context.getReadQueriesByServer();
ClusterAddresses clusterAddresses = fetchClusterAddresses( driver );

// expect all followers to serve more than zero read queries
assertAllAddressesServedReadQueries( "Follower", clusterAddresses.followers, readQueriesByServer );

// expect all read replicas to serve more than zero read queries
assertAllAddressesServedReadQueries( "Read replica", clusterAddresses.readReplicas, readQueriesByServer );

// expect all followers to serve same order of magnitude read queries
assertAllAddressesServedSimilarAmountOfReadQueries( "Followers", clusterAddresses.followers,
readQueriesByServer, clusterAddresses );

// expect all read replicas to serve same order of magnitude read queries
assertAllAddressesServedSimilarAmountOfReadQueries( "Read replicas", clusterAddresses.readReplicas,
readQueriesByServer, clusterAddresses );
}

@Override
void printStats( Context context )
{
Expand All @@ -141,115 +101,10 @@ void dumpLogs()
clusterRule.dumpClusterLogs();
}

private static ClusterAddresses fetchClusterAddresses( Driver driver )
{
Set<String> followers = new HashSet<>();
Set<String> readReplicas = new HashSet<>();

final ClusterMemberRoleDiscoveryFactory.ClusterMemberRoleDiscovery discovery =
ClusterMemberRoleDiscoveryFactory.newInstance( ServerVersion.version( driver ) );
final Map<BoltServerAddress,ClusterMemberRole> clusterOverview = discovery.findClusterOverview( driver );

for ( BoltServerAddress address : clusterOverview.keySet() )
{
String boltAddress = String.format( "%s:%s", address.host(), address.port() );
ClusterMemberRole role = clusterOverview.get( address );
if ( role == ClusterMemberRole.FOLLOWER )
{
followers.add( boltAddress );
}
else if ( role == ClusterMemberRole.READ_REPLICA )
{
readReplicas.add( boltAddress );
}
}

return new ClusterAddresses( followers, readReplicas );
}

private static void assertAllAddressesServedReadQueries( String addressType, Set<String> addresses,
Map<String,Long> readQueriesByServer )
{
for ( String address : addresses )
{
Long queries = readQueriesByServer.get( address );
assertThat( addressType + " did not serve any read queries", queries, greaterThan( 0L ) );
}
}

private static void assertAllAddressesServedSimilarAmountOfReadQueries( String addressesType, Set<String> addresses,
Map<String,Long> readQueriesByServer, ClusterAddresses allAddresses )
{
long expectedOrderOfMagnitude = -1;
for ( String address : addresses )
{
long queries = readQueriesByServer.get( address );
long orderOfMagnitude = orderOfMagnitude( queries );
if ( expectedOrderOfMagnitude == -1 )
{
expectedOrderOfMagnitude = orderOfMagnitude;
}
else
{
assertThat( addressesType + " are expected to serve similar amount of queries. " +
"Addresses: " + allAddresses + ", " +
"read queries served: " + readQueriesByServer,
orderOfMagnitude,
both( greaterThanOrEqualTo( expectedOrderOfMagnitude - 1 ) )
.and( lessThanOrEqualTo( expectedOrderOfMagnitude + 1 ) ) );
}
}
}

private static long orderOfMagnitude( long number )
{
long result = 1;
while ( number >= 10 )
{
number /= 10;
result++;
}
return result;
}

static class Context extends AbstractContext
{
final ConcurrentMap<String,AtomicLong> readQueriesByServer = new ConcurrentHashMap<>();
final AtomicInteger leaderSwitches = new AtomicInteger();

@Override
public void processSummary( ResultSummary summary )
{
if ( summary == null )
{
return;
}

String serverAddress = summary.server().address();

AtomicLong count = readQueriesByServer.get( serverAddress );
if ( count == null )
{
count = new AtomicLong();
AtomicLong existingCounter = readQueriesByServer.putIfAbsent( serverAddress, count );
if ( existingCounter != null )
{
count = existingCounter;
}
}
count.incrementAndGet();
}

Map<String,Long> getReadQueriesByServer()
{
Map<String,Long> result = new HashMap<>();
for ( Map.Entry<String,AtomicLong> entry : readQueriesByServer.entrySet() )
{
result.put( entry.getKey(), entry.getValue().get() );
}
return result;
}

void leaderSwitch()
{
leaderSwitches.incrementAndGet();
Expand All @@ -261,24 +116,4 @@ int getLeaderSwitchCount()
}
}

private static class ClusterAddresses
{
final Set<String> followers;
final Set<String> readReplicas;

ClusterAddresses( Set<String> followers, Set<String> readReplicas )
{
this.followers = followers;
this.readReplicas = readReplicas;
}

@Override
public String toString()
{
return "ClusterAddresses{" +
"followers=" + followers +
", readReplicas=" + readReplicas +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.driver.AuthToken;
import org.neo4j.driver.Config;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.junit.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

@ParallelizableIT
class SingleInstanceStressIT extends AbstractStressTestBase<SingleInstanceStressIT.Context>
{
Expand Down Expand Up @@ -62,7 +56,7 @@ Config.ConfigBuilder config( Config.ConfigBuilder builder )
@Override
Context createContext()
{
return new Context( neo4j.address().toString() );
return new Context();
}

@Override
Expand All @@ -78,12 +72,6 @@ boolean handleWriteFailure( Throwable error, Context context )
return false;
}

@Override
void assertExpectedReadQueryDistribution( Context context )
{
assertThat( context.getReadQueryCount(), greaterThan( 0L ) );
}

@Override
<A extends Context> void printStats( A context )
{
Expand All @@ -95,31 +83,6 @@ <A extends Context> void printStats( A context )

static class Context extends AbstractContext
{
final String expectedAddress;
final AtomicLong readQueries = new AtomicLong();

Context( String expectedAddress )
{
this.expectedAddress = expectedAddress;
}

@Override
public void processSummary( ResultSummary summary )
{
if ( summary == null )
{
return;
}

String address = summary.server().address();
assertEquals( expectedAddress, address );
readQueries.incrementAndGet();
}

long getReadQueryCount()
{
return readQueries.get();
}
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions driver/src/test/java/org/neo4j/driver/util/TemporalUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public final class TemporalUtil
"Africa/Casablanca",
"tzid",
"Asia/Qostanay",
"America/Santiago" // Can cause flakyness on windows, see https://stackoverflow.com/questions/37533796/java-calendar-returns-wrong-hour-in-ms-windows-for-america-santiago-zone.
);
"America/Santiago",// Can cause flakyness on windows, see https://stackoverflow.com/questions/37533796/java-calendar-returns-wrong-hour-in-ms-windows-for-america-santiago-zone.
"Pacific/Easter"
);

private TemporalUtil()
{
Expand Down

0 comments on commit 69e10e9

Please sign in to comment.