Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Merge branch 'develop' into snanda/gradle7-iv
Browse files Browse the repository at this point in the history
  • Loading branch information
sudiksha27 committed Jan 20, 2022
2 parents fc1e03e + 69f78a0 commit 7cd0314
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* (c) Copyright 2022 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra.pool;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.palantir.common.concurrent.PTExecutors;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

/**
* Supplier that will evaluate a delegate supplier asynchronously, returning {@link Optional#empty()} until
* the underlying result is complete, at which point it will be returned. This computation is only initiated after
* the first call to get, but will be memoized infinitely after that. If the delegate throws, it will not be retried
* and {@link Optional#empty()} will be returned for the lifecycle of this supplier. Note that this class is
* responsible for creating and shutting down any executors.
*
* This class is thread safe by synchronising on {@link #get()}.
*/
@ThreadSafe
public final class AsyncSupplier<T> implements Supplier<Optional<T>> {
private static final SafeLogger log = SafeLoggerFactory.get(AsyncSupplier.class);
private final Supplier<Optional<T>> delegate;
private final ExecutorService executorService;
private Future<Optional<T>> result;

@VisibleForTesting
AsyncSupplier(Supplier<Optional<T>> delegate, ExecutorService executorService) {
this.delegate = delegate;
this.executorService = executorService;
}

@Override
public synchronized Optional<T> get() {
if (result == null) {
result = executorService.submit(delegate::get);
executorService.shutdown();
} else if (result.isDone()) {
try {
return result.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SafeRuntimeException("Interrupted while attempting to compute supplier value", e);
} catch (ExecutionException e) {
log.warn("Failed to evaluate delegate supplier asynchronously; returning empty", e);
result = Futures.immediateFuture(Optional.empty());
}
}
return Optional.empty();
}

public static <T> AsyncSupplier<T> create(Supplier<Optional<T>> delegate) {
return new AsyncSupplier<>(delegate, PTExecutors.newSingleThreadExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public CassandraService(
CassandraClientPoolMetrics poolMetrics) {
this.metricsManager = metricsManager;
this.config = config;
this.myLocationSupplier = HostLocationSupplier.create(this::getSnitch, config.overrideHostLocation());
this.myLocationSupplier =
AsyncSupplier.create(HostLocationSupplier.create(this::getSnitch, config.overrideHostLocation()));
this.blacklist = blacklist;
this.poolMetrics = poolMetrics;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* (c) Copyright 2022 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra.pool;

import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.util.concurrent.Uninterruptibles;
import com.palantir.common.concurrent.PTExecutors;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.Test;

public final class AsyncSupplierTests {
@Test
public void supplierMemoizesResult() {
AtomicInteger calls = new AtomicInteger(0);
AsyncSupplier<Integer> supplier = AsyncSupplier.create(() -> Optional.of(calls.incrementAndGet()));

Awaitility.await("wait for computation to complete")
.atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(supplier.get()).hasValue(1));

assertThat(supplier.get()).hasValue(1);
}

@Test
public void supplierDoesNotBlockIfDelegateBlocks() {
CountDownLatch taskExecuting = new CountDownLatch(1);
CountDownLatch begunExecution = new CountDownLatch(1);
AsyncSupplier<Integer> supplier = AsyncSupplier.create(() -> {
begunExecution.countDown();
Uninterruptibles.awaitUninterruptibly(taskExecuting);
return Optional.of(1);
});

assertThat(supplier.get()).isEmpty();
Uninterruptibles.awaitUninterruptibly(begunExecution);
assertThat(supplier.get()).isEmpty();

taskExecuting.countDown();
Awaitility.await("wait for computation to complete")
.atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(supplier.get()).hasValue(1));
}

@Test
public void throwingSupplierOnlyThrowsOnceAndReturnsEmptyForeverMore() {
AtomicInteger calls = new AtomicInteger(0);
CountDownLatch taskDone = new CountDownLatch(1);
AsyncSupplier<Integer> supplier = AsyncSupplier.create(() -> {
calls.incrementAndGet();
try {
throw new RuntimeException();
} finally {
taskDone.countDown();
}
});

assertThat(supplier.get()).isEmpty();
Uninterruptibles.awaitUninterruptibly(taskDone);

for (int attempt = 0; attempt < 100; attempt++) {
assertThat(supplier.get()).isEmpty();
}

assertThat(calls).hasValue(1);
}

@Test
public void executorIsShutdownAfterExecution() {
ExecutorService executor = PTExecutors.newSingleThreadExecutor();
AsyncSupplier<Integer> supplier = new AsyncSupplier<>(() -> Optional.of(1), executor);

Awaitility.await("wait for computation to complete")
.atMost(Duration.ofSeconds(1))
.untilAsserted(() -> assertThat(supplier.get()).hasValue(1));
assertThat(executor.isShutdown()).isTrue();
}
}
8 changes: 8 additions & 0 deletions changelog/0.520.0/pr-5865.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: improvement
improvement:
description: The host location supplier is now run in a separate thread, and will
return empty while evaluating. This is particularly important when the `Ec2HostLocationSupplier`
repeatedly (and slowly) fails to connect, as this is on the creation path of the
`CassandraClientPoolImpl` and thus the transaction manager creation path.
links:
- https://github.com/palantir/atlasdb/pull/5865

0 comments on commit 7cd0314

Please sign in to comment.