Skip to content

Commit

Permalink
Refactor PropertyImpl#get() to avoid locks.
Browse files Browse the repository at this point in the history
This introduces a chance for duplicate work in cache updating. We choose to accept the risk, under the assumption that users will not register interpolators or decoders that are non-idempotent or overly expensive.
  • Loading branch information
rgallardo-netflix committed Oct 1, 2024
1 parent 3cfa681 commit f83adeb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
/**
* API for decoding properties to arbitrary types.
*
* @implSpec Implementations of this interface MUST be idempotent. Failing to do so will result in correctness errors.
* Implementations of this interface SHOULD also be cheap to execute. Expensive or blocking operations are to be
* avoided since they can potentially cause large delays in property resolution.
* @author spencergibb
*/
public interface Decoder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
* interpolator.create(lookup).resolve("123-${foo}") -> 123-abc
* }
* </pre>
*
*
* @implSpec Implementations of this interface MUST be idempotent (as long as the backing Config remains unchanged).
* Failing to do so will result in correctness errors.
* Implementations of this interface SHOULD also be cheap to execute. Expensive or blocking operations are to be
* avoided since they can potentially cause large delays in property resolution.
* @author elandau
*
*/
Expand All @@ -38,7 +42,7 @@ public interface StrInterpolator {
*
* @author elandau
*/
public interface Lookup {
interface Lookup {
String lookup(String key);
}

Expand All @@ -47,14 +51,11 @@ public interface Lookup {
* @author elandau
*
*/
public interface Context {
interface Context {
/**
* Resolve a string with replaceable variables using the provided map to lookup replacement
* values. The implementation should deal with nested replacements and throw an exception
* for infinite recursion.
*
* @param value
* @return
*/
String resolve(String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class DefaultPropertyFactory implements PropertyFactory, ConfigListener {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPropertyFactory.class);

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<PropertyImpl, CachedValue> CACHED_VALUE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(PropertyImpl.class, CachedValue.class, "cachedValue");

/**
* Create a Property factory that is attached to a specific config
Expand Down Expand Up @@ -136,9 +140,8 @@ private final class PropertyImpl<T> implements Property<T> {
private final KeyAndType<T> keyAndType;
private final Supplier<T> supplier;

// Caching machinery. Writes to cachedValue are guarded by updateLock. Reads can be unguarded because the field is volatile.
private final ReentrantLock updateLock = new ReentrantLock();
private volatile CachedValue<T> cachedValue;
// This field cannot be private because it's accessed via reflection in the CACHED_VALUE_UPDATER :-(
volatile CachedValue<T> cachedValue;

// Keep track of old-style listeners so we can unsubscribe them when they are removed
// Field is initialized on demand only if it's actually needed.
Expand Down Expand Up @@ -167,62 +170,46 @@ public PropertyImpl(KeyAndType<T> keyAndType, Supplier<T> supplier) {
@Override
public T get() {
int currentMasterVersion = masterVersion.get();
CachedValue<T> currentCachedValue = this.cachedValue;

if (cachedValue != null) {
// Happy path. We have an up-to-date cached value, so just return that
if (cachedValue.version >= currentMasterVersion) {
return cachedValue.value;
}

// The cached value is stale, someone needs to update it
if (!updateLock.tryLock()) {
// The lock is taken, so another thread is already working on the update.
// We can just return the stale value. Since cachedValue is a volatile, it may even have gotten updated
// by now. That's fine too. :-)
return cachedValue.value;
}

// If we're here we have the lock and we skip past the else ...

} else {

// There is no cached value at all, not even stale, so we MUST wait until it's updated, either by us
// or by another thread. So no tryLock here, we HAVE to block until the lock is available.
updateLock.lock();

// Done waiting, got the lock. But maybe someone did the update while we were waiting for the lock?
if (cachedValue != null) {
// Yes! We can just return the value that was computed by that other thread (and release the lock before leaving!)
try {
return cachedValue.value;
} finally {
updateLock.unlock();
}
}

// Nope, still null. Let's proceed.
// Happy path. We have an up-to-date cached value, so just return that.
// We check for >= in case an upstream update happened between getting the version and the cached value AND
// another thread came and updated the cache.
if (currentCachedValue != null && currentCachedValue.version >= currentMasterVersion) {
return currentCachedValue.value;
}

// At this point, we have the lock AND there's no valid cache. The job of updating falls on us.
// No valid cache, let's try to update it. Multiple threads may get here and try to update. That's fine,
// the worst case is wasted effort. A hidden assumption here is that the supplier is idempotent and relatively
// cheap, which should be true unless the user installed badly behaving interpolators or converters in
// the Config object.
// The tricky edge case is if another update came in between the check above to get the version and
// the call to the supplier. In that case we'll tag the updated value with an old version number. That's fine,
// since the next call to get() will see the old version and try again.
try {
// Get the new value from the supplier. This call could fail.
T newValue = supplier.get();

// We successfully got the new value!
// Atomically update both the value and the version
cachedValue = new CachedValue<>(newValue, currentMasterVersion);
// And return
return newValue;
CachedValue<T> newValue = new CachedValue<>(supplier.get(), currentMasterVersion);

/*
* We successfully got the new value, so now we update the cache. We use an atomic CAS operation to guard
* from edge cases where another thread could have updated to a higher version than we have, in a flow like this:
* Assume currentVersion started at 1., property cache is set to 1 too.
* 1. Upstream update bumps version to 2.
* 2. Thread A reads currentVersion at 2, cachedValue at 1, proceeds to start update, gets interrupted and yields the cpu.
* 3. Thread C bumps version to 3, yields the cpu.
* 4. Thread B is scheduled, reads currentVersion at 3, cachedValue still at 1, proceeds to start update.
* 5. Thread B keeps running, updates cache to 3, yields.
* 6. Thread A resumes, tries to write cache with version 2.
*/
CACHED_VALUE_UPDATER.compareAndSet(this, currentCachedValue, newValue);

return newValue.value;

} catch (RuntimeException e) {
// Oh, no, something went wrong while trying to get the new value. Log the error and rethrow the exception
// so our caller knows there's a problem. We leave the cache unchanged. Next caller will try again.
LOG.error("Unable to get current version of property '{}'", keyAndType.key, e);
throw e;
} finally {

// Success or not, release the lock before leaving.
updateLock.unlock();
}
}

Expand Down

0 comments on commit f83adeb

Please sign in to comment.