Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Don't perform async initialization on calling thread (#5746)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoenig10 authored Nov 10, 2021
1 parent fbd20bf commit 4e73663
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -50,15 +51,19 @@ public final void initialize(boolean initializeAsync) {

initializationStartTime = System.currentTimeMillis();

if (!initializeAsync) {
if (initializeAsync) {
scheduleInitialization(Duration.ZERO);
} else {
tryInitializeInternal();
return;
}

tryInitializationLoop();
}

private void tryInitializationLoop() {
if (state.isCancelled()) {
singleThreadedExecutor.shutdown();
return;
}

try {
tryInitializeInternal();
log.info(
Expand All @@ -82,23 +87,13 @@ private void tryInitializationLoop() {
SafeArg.of("initializationDuration", System.currentTimeMillis() - initializationStartTime),
cleanupThrowable);
}
scheduleInitialization();
scheduleInitialization(sleepInterval());
}
}

// Not final for tests.
void scheduleInitialization() {
singleThreadedExecutor.schedule(
() -> {
if (state.isCancelled()) {
singleThreadedExecutor.shutdown();
return;
}

tryInitializationLoop();
},
sleepIntervalInMillis(),
TimeUnit.MILLISECONDS);
void scheduleInitialization(Duration delay) {
singleThreadedExecutor.schedule(this::tryInitializationLoop, delay.toMillis(), TimeUnit.MILLISECONDS);
}

// Not final for tests
Expand All @@ -116,8 +111,8 @@ void assertNeverCalledInitialize() {
}
}

protected int sleepIntervalInMillis() {
return 10_000;
protected Duration sleepInterval() {
return Duration.ofSeconds(10);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.AbstractObjectAssert;
import org.jmock.lib.concurrent.DeterministicScheduler;
import org.junit.Test;
import org.mockito.Mockito;

public class AsyncInitializerTest {
private static final int ASYNC_INIT_DELAY = 10;
private static final int FIVE = 5;

private static class AlwaysFailingInitializer extends AsyncInitializer {
final AtomicInteger initializationAttempts = new AtomicInteger(0);
Expand All @@ -46,8 +48,8 @@ public void tryInitialize() {
}

@Override
protected int sleepIntervalInMillis() {
return ASYNC_INIT_DELAY;
protected Duration sleepInterval() {
return Duration.ofMillis(10);
}

@Override
Expand Down Expand Up @@ -79,16 +81,16 @@ public void synchronousInitializationPropagatesExceptionsAndDoesNotRetry() throw
.isInstanceOf(RuntimeException.class)
.hasMessage("Failed initializing");
verify(initializer).tryInitialize();
verify(initializer, never()).scheduleInitialization();
verify(initializer, never()).scheduleInitialization(any());
}

@Test
public void asyncInitializationCatchesExceptionAndRetries() {
AsyncInitializer initializer = getMockedInitializer();

initializer.initialize(true);
verify(initializer).tryInitialize();
verify(initializer).scheduleInitialization();
verify(initializer, never()).tryInitialize();
verify(initializer).scheduleInitialization(any());
}

@Test
Expand All @@ -101,8 +103,7 @@ public void initializationAlwaysFailsAfterTheFirstSynchronousTry() {
assertThatThrownBy(() -> initializer.initialize(false))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Multiple calls tried to initialize the same instance.");
tickSchedulerFiveTimes(initializer);
assertThat(initializer.initializationAttempts.get()).isEqualTo(1);
assertInitializer(initializer).isNotInitialized().hasAttempts(1);
}

@Test
Expand All @@ -113,25 +114,32 @@ public void initializationAlwaysFailsAfterTheFirstAsynchronousTry() {
assertThatThrownBy(() -> initializer.initialize(false))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Multiple calls tried to initialize the same instance.");
tickSchedulerFiveTimes(initializer);
assertThat(initializer.initializationAttempts.get()).isEqualTo(1 + FIVE);
assertInitializer(initializer).isNotInitialized().hasAttempts(0);

tickScheduler(initializer, 0);
assertInitializer(initializer).isNotInitialized().hasAttempts(1);

tickScheduler(initializer, 5);
assertInitializer(initializer).isNotInitialized().hasAttempts(6);
}

@Test
public void asyncInitializationKeepsRetryingAndEventuallySucceeds() throws InterruptedException {
AlwaysFailingInitializer eventuallySuccessfulInitializer = new AlwaysFailingInitializer() {
AlwaysFailingInitializer initializer = new AlwaysFailingInitializer() {
@Override
public void tryInitialize() {
if (initializationAttempts.get() < FIVE) {
super.tryInitialize();
if (initializationAttempts.incrementAndGet() < 5) {
throw new RuntimeException("Failed initializing");
}
}
};

eventuallySuccessfulInitializer.initialize(true);
assertThat(eventuallySuccessfulInitializer.isInitialized()).isFalse();
tickSchedulerFiveTimes(eventuallySuccessfulInitializer);
assertThat(eventuallySuccessfulInitializer.isInitialized()).isTrue();
initializer.initialize(true);
tickScheduler(initializer, 3);
assertInitializer(initializer).isNotInitialized().hasAttempts(4);

tickScheduler(initializer, 1);
assertInitializer(initializer).isInitialized().hasAttempts(5);
}

@Test
Expand All @@ -140,10 +148,12 @@ public void canCancelInitializationAndNoCleanupIfNotInitializedBetweenIterations
Runnable cleanupTask = mock(Runnable.class);

initializer.initialize(true);
fiveTicksAndAssertNumberOfShutdownsAndAttempts(initializer, 0, 6);
tickScheduler(initializer, 5);
assertInitializer(initializer).isNotInitialized().hasAttempts(6).hasShutdowns(0);

initializer.cancelInitialization(cleanupTask);
fiveTicksAndAssertNumberOfShutdownsAndAttempts(initializer, 1, 6);
tickScheduler(initializer, 5);
assertInitializer(initializer).isNotInitialized().hasAttempts(6).hasShutdowns(1);

verify(cleanupTask, never()).run();
}
Expand All @@ -152,28 +162,30 @@ public void canCancelInitializationAndNoCleanupIfNotInitializedBetweenIterations
public void canCancelInitializationAndCleanupIfInitializedAfterCancel() throws InterruptedException {
Runnable cleanupTask = mock(Runnable.class);
doNothing().when(cleanupTask).run();
AlwaysFailingInitializer selfCancellingInitializer = new AlwaysFailingInitializer() {
AlwaysFailingInitializer initializer = new AlwaysFailingInitializer() {
@Override
public void tryInitialize() {
if (initializationAttempts.incrementAndGet() <= 6) {
throw new RuntimeException("Fail 6 times");
if (initializationAttempts.incrementAndGet() < 5) {
throw new RuntimeException("Failed initializing");
}
cancelInitialization(cleanupTask);
}
};

selfCancellingInitializer.initialize(true);
fiveTicksAndAssertNumberOfShutdownsAndAttempts(selfCancellingInitializer, 0, 6);
initializer.initialize(true);
tickScheduler(initializer, 3);
assertInitializer(initializer).isNotInitialized().hasAttempts(4).hasShutdowns(0);

// cancellation is called during the next run of tryInitialize
fiveTicksAndAssertNumberOfShutdownsAndAttempts(selfCancellingInitializer, 1, 7);
tickScheduler(initializer, 1);
assertInitializer(initializer).isNotInitialized().hasAttempts(5).hasShutdowns(1);

verify(cleanupTask, times(1)).run();
}

@Test
public void canCancelInitializationAndCleanupIfAlreadyInitialized() throws InterruptedException {
AlwaysFailingInitializer successfulInitializer = new AlwaysFailingInitializer() {
AlwaysFailingInitializer initializer = new AlwaysFailingInitializer() {
@Override
public void tryInitialize() {
initializationAttempts.incrementAndGet();
Expand All @@ -182,32 +194,59 @@ public void tryInitialize() {
Runnable cleanupTask = mock(Runnable.class);
doNothing().when(cleanupTask).run();

successfulInitializer.initialize(true);
fiveTicksAndAssertNumberOfShutdownsAndAttempts(successfulInitializer, 1, 1);
initializer.initialize(true);
tickScheduler(initializer, 5);
assertInitializer(initializer).isInitialized().hasAttempts(1).hasShutdowns(1);

successfulInitializer.cancelInitialization(cleanupTask);
fiveTicksAndAssertNumberOfShutdownsAndAttempts(successfulInitializer, 1, 1);
initializer.cancelInitialization(cleanupTask);
tickScheduler(initializer, 1);
assertInitializer(initializer).isInitialized().hasAttempts(1).hasShutdowns(1);

verify(cleanupTask, times(1)).run();
}

private void fiveTicksAndAssertNumberOfShutdownsAndAttempts(
AlwaysFailingInitializer initializer, int shutdowns, int attempts) {
tickSchedulerFiveTimes(initializer);
assertThat(initializer.deterministicScheduler.numberOfTimesShutdownCalled.get())
.isEqualTo(shutdowns);
assertThat(initializer.initializationAttempts.get()).isEqualTo(attempts);
private static AlwaysFailingInitializerAssert assertInitializer(AlwaysFailingInitializer initializer) {
return new AlwaysFailingInitializerAssert(initializer);
}

private static final class AlwaysFailingInitializerAssert
extends AbstractObjectAssert<AlwaysFailingInitializerAssert, AlwaysFailingInitializer> {

private AlwaysFailingInitializerAssert(AlwaysFailingInitializer actual) {
super(actual, AlwaysFailingInitializerAssert.class);
}

private AlwaysFailingInitializerAssert isInitialized() {
assertThat(actual.isInitialized()).isTrue();
return this;
}

private AlwaysFailingInitializerAssert isNotInitialized() {
assertThat(actual.isInitialized()).isFalse();
return this;
}

private AlwaysFailingInitializerAssert hasAttempts(int attempts) {
assertThat(actual.initializationAttempts.get()).isEqualTo(attempts);
return this;
}

private AlwaysFailingInitializerAssert hasShutdowns(int shutdowns) {
assertThat(actual.deterministicScheduler.numberOfTimesShutdownCalled.get())
.isEqualTo(shutdowns);
return this;
}
}

private AsyncInitializer getMockedInitializer() {
private static AsyncInitializer getMockedInitializer() {
AsyncInitializer initializer = mock(AsyncInitializer.class, Mockito.CALLS_REAL_METHODS);
doNothing().when(initializer).assertNeverCalledInitialize();
doThrow(new RuntimeException("Failed initializing")).when(initializer).tryInitialize();
doNothing().when(initializer).scheduleInitialization();
doNothing().when(initializer).scheduleInitialization(any());
return initializer;
}

private void tickSchedulerFiveTimes(AlwaysFailingInitializer initializer) {
initializer.deterministicScheduler.tick(ASYNC_INIT_DELAY * FIVE + 1, TimeUnit.MILLISECONDS);
private static void tickScheduler(AlwaysFailingInitializer initializer, long times) {
initializer.deterministicScheduler.tick(ASYNC_INIT_DELAY * times, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.palantir.atlasdb.keyvalue.api.CheckAndSetCompatibility;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.logsafe.Preconditions;
import java.time.Duration;

public final class AsyncInitializeableInMemoryKvs extends AsyncInitializer implements AutoDelegate_KeyValueService {
private final InMemoryKeyValueService delegate;
Expand Down Expand Up @@ -70,8 +71,8 @@ public CheckAndSetCompatibility getCheckAndSetCompatibility() {
}

@Override
protected int sleepIntervalInMillis() {
return 1_000;
protected Duration sleepInterval() {
return Duration.ofSeconds(1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.palantir.timestamp.InMemoryTimestampService;
import com.palantir.timestamp.ManagedTimestampService;
import com.palantir.timestamp.TimestampService;
import java.time.Duration;

public final class AsyncInitializeableInMemoryTimestampService extends AsyncInitializer
implements AutoDelegate_TimestampService, ManagedTimestampService {
Expand Down Expand Up @@ -63,8 +64,8 @@ protected String getInitializingClassName() {
}

@Override
protected int sleepIntervalInMillis() {
return 1_000;
protected Duration sleepInterval() {
return Duration.ofSeconds(1);
}

@Override
Expand Down
Loading

0 comments on commit 4e73663

Please sign in to comment.