Skip to content

Commit

Permalink
rls: fix child lb leak when client channel is shutdown (#8750)
Browse files Browse the repository at this point in the history
When client channel is shutting down, the RlsLoadBalancer is shutting down. However, the child loadbalancers of RlsLoadBalancer are not shut down. This is causing the issue b/209831670
  • Loading branch information
dapengzhang0 authored Jan 12, 2022
1 parent 26f0d61 commit 7a23fb2
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 74 deletions.
64 changes: 13 additions & 51 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand All @@ -51,7 +48,6 @@
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
import io.grpc.rls.LbPolicyConfiguration.ChildLoadBalancingPolicy;
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
import io.grpc.rls.LruCache.EvictionListener;
Expand Down Expand Up @@ -138,7 +134,8 @@ private CachingRlsLbClient(Builder builder) {
rlsConfig.getCacheSizeBytes(),
builder.evictionListener,
scheduledExecutorService,
timeProvider);
timeProvider,
lock);
logger = helper.getChannelLogger();
String serverHost = null;
try {
Expand Down Expand Up @@ -181,7 +178,9 @@ private CachingRlsLbClient(Builder builder) {
new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
refCountedChildPolicyWrapperFactory =
new RefCountedChildPolicyWrapperFactory(
childLbHelperProvider, new BackoffRefreshListener());
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
childLbHelperProvider,
new BackoffRefreshListener());
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

Expand Down Expand Up @@ -536,6 +535,7 @@ final class DataCacheEntry extends CacheEntry {
private final long staleTime;
private final ChildPolicyWrapper childPolicyWrapper;

// GuardedBy CachingRlsLbClient.lock
DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
super(request);
this.response = checkNotNull(response, "response");
Expand All @@ -546,29 +546,6 @@ final class DataCacheEntry extends CacheEntry {
long now = timeProvider.currentTimeNanos();
expireTime = now + maxAgeNanos;
staleTime = now + staleAgeNanos;

if (childPolicyWrapper.getPicker() != null) {
childPolicyWrapper.refreshState();
} else {
createChildLbPolicy();
}
}

private void createChildLbPolicy() {
ChildLoadBalancingPolicy childPolicy = lbPolicyConfig.getLoadBalancingPolicy();
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
ConfigOrError lbConfig =
lbProvider
.parseLoadBalancingPolicyConfig(
childPolicy.getEffectiveChildPolicy(childPolicyWrapper.getTarget()));

LoadBalancer lb = lbProvider.newLoadBalancer(childPolicyWrapper.getHelper());
logger.log(
ChannelLogLevel.DEBUG,
"RLS child lb created. config: {0}",
lbConfig.getConfig());
lb.handleResolvedAddresses(childLbResolvedAddressFactory.create(lbConfig.getConfig()));
lb.requestConnection();
}

/**
Expand Down Expand Up @@ -637,7 +614,9 @@ boolean isStaled(long now) {

@Override
void cleanup() {
refCountedChildPolicyWrapperFactory.release(childPolicyWrapper);
synchronized (lock) {
refCountedChildPolicyWrapperFactory.release(childPolicyWrapper);
}
}

@Override
Expand Down Expand Up @@ -856,14 +835,15 @@ private static final class RlsAsyncLruCache

RlsAsyncLruCache(long maxEstimatedSizeBytes,
@Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
ScheduledExecutorService ses, TimeProvider timeProvider) {
ScheduledExecutorService ses, TimeProvider timeProvider, Object lock) {
super(
maxEstimatedSizeBytes,
new AutoCleaningEvictionListener(evictionListener),
1,
TimeUnit.MINUTES,
ses,
timeProvider);
timeProvider,
lock);
}

@Override
Expand Down Expand Up @@ -985,27 +965,9 @@ private void startFallbackChildPolicy() {
}
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
}
LoadBalancerProvider lbProvider =
lbPolicyConfig.getLoadBalancingPolicy().getEffectiveLbProvider();
final LoadBalancer lb =
lbProvider.newLoadBalancer(fallbackChildPolicyWrapper.getHelper());
final ConfigOrError lbConfig =
lbProvider
.parseLoadBalancingPolicyConfig(
lbPolicyConfig
.getLoadBalancingPolicy()
.getEffectiveChildPolicy(defaultTarget));
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
lb.handleResolvedAddresses(
childLbResolvedAddressFactory.create(lbConfig.getConfig()));
lb.requestConnection();
}
});
}

// GuardedBy CachingRlsLbClient.lock
void close() {
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
Expand Down
68 changes: 63 additions & 5 deletions rls/src/main/java/io/grpc/rls/LbPolicyConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.ObjectPool;
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
Expand Down Expand Up @@ -191,33 +194,49 @@ public String toString() {

/** Factory for {@link ChildPolicyWrapper}. */
static final class RefCountedChildPolicyWrapperFactory {
// GuardedBy CachingRlsLbClient.lock
@VisibleForTesting
final Map<String /* target */, RefCountedChildPolicyWrapper> childPolicyMap =
new HashMap<>();

private final ChildLoadBalancerHelperProvider childLbHelperProvider;
private final ChildLbStatusListener childLbStatusListener;
private final ChildLoadBalancingPolicy childPolicy;
private final ResolvedAddressFactory childLbResolvedAddressFactory;

public RefCountedChildPolicyWrapperFactory(
ChildLoadBalancingPolicy childPolicy,
ResolvedAddressFactory childLbResolvedAddressFactory,
ChildLoadBalancerHelperProvider childLbHelperProvider,
ChildLbStatusListener childLbStatusListener) {
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
this.childLbResolvedAddressFactory =
checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory");
this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener");
}

// GuardedBy CachingRlsLbClient.lock
ChildPolicyWrapper createOrGet(String target) {
// TODO(creamsoup) check if the target is valid or not
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
if (pooledChildPolicyWrapper == null) {
ChildPolicyWrapper childPolicyWrapper =
new ChildPolicyWrapper(target, childLbHelperProvider, childLbStatusListener);
ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper(
target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider,
childLbStatusListener);
pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
childPolicyMap.put(target, pooledChildPolicyWrapper);
return pooledChildPolicyWrapper.getObject();
} else {
ChildPolicyWrapper childPolicyWrapper = pooledChildPolicyWrapper.getObject();
if (childPolicyWrapper.getPicker() != null) {
childPolicyWrapper.refreshState();
}
return childPolicyWrapper;
}

return pooledChildPolicyWrapper.getObject();
}

// GuardedBy CachingRlsLbClient.lock
void release(ChildPolicyWrapper childPolicyWrapper) {
checkNotNull(childPolicyWrapper, "childPolicyWrapper");
String target = childPolicyWrapper.getTarget();
Expand All @@ -238,16 +257,36 @@ static final class ChildPolicyWrapper {

private final String target;
private final ChildPolicyReportingHelper helper;
private final LoadBalancer lb;
private volatile SubchannelPicker picker;
private ConnectivityState state;

public ChildPolicyWrapper(
String target,
ChildLoadBalancingPolicy childPolicy,
final ResolvedAddressFactory childLbResolvedAddressFactory,
ChildLoadBalancerHelperProvider childLbHelperProvider,
ChildLbStatusListener childLbStatusListener) {
this.target = target;
this.helper =
new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
final ConfigOrError lbConfig =
lbProvider
.parseLoadBalancingPolicyConfig(
childPolicy.getEffectiveChildPolicy(target));
this.lb = lbProvider.newLoadBalancer(helper);
helper.getChannelLogger().log(
ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig());
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
lb.handleResolvedAddresses(
childLbResolvedAddressFactory.create(lbConfig.getConfig()));
lb.requestConnection();
}
});
}

String getTarget() {
Expand All @@ -263,7 +302,25 @@ ChildPolicyReportingHelper getHelper() {
}

void refreshState() {
helper.updateBalancingState(state, picker);
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
helper.updateBalancingState(state, picker);
}
}
);
}

void shutdown() {
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
lb.shutdown();
}
}
);
}

@Override
Expand Down Expand Up @@ -346,6 +403,7 @@ public ChildPolicyWrapper returnObject(Object object) {
long newCnt = refCnt.decrementAndGet();
checkState(newCnt != -1, "Cannot return never pooled childPolicyWrapper");
if (newCnt == 0) {
childPolicyWrapper.shutdown();
childPolicyWrapper = null;
}
return null;
Expand Down
24 changes: 12 additions & 12 deletions rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
@ThreadSafe
abstract class LinkedHashLruCache<K, V> implements LruCache<K, V> {

private final Object lock = new Object();
private final Object lock;

@GuardedBy("lock")
private final LinkedHashMap<K, SizedValue> delegate;
Expand All @@ -64,9 +64,11 @@ abstract class LinkedHashLruCache<K, V> implements LruCache<K, V> {
int cleaningInterval,
TimeUnit cleaningIntervalUnit,
ScheduledExecutorService ses,
final TimeProvider timeProvider) {
final TimeProvider timeProvider,
Object lock) {
checkState(estimatedMaxSizeBytes > 0, "max estimated cache size should be positive");
this.estimatedMaxSizeBytes = estimatedMaxSizeBytes;
this.lock = checkNotNull(lock, "lock");
this.evictionListener = new SizeHandlingEvictionListener(evictionListener);
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
delegate = new LinkedHashMap<K, SizedValue>(
Expand Down Expand Up @@ -200,14 +202,15 @@ private V invalidate(K key, EvictionType cause) {
}

@Override
public final void invalidateAll(Iterable<K> keys) {
checkNotNull(keys, "keys");
public final void invalidateAll() {
synchronized (lock) {
for (K key : keys) {
SizedValue existing = delegate.remove(key);
if (existing != null) {
evictionListener.onEviction(key, existing, EvictionType.EXPLICIT);
Iterator<Map.Entry<K, SizedValue>> iterator = delegate.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<K, SizedValue> entry = iterator.next();
if (entry.getValue() != null) {
evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPLICIT);
}
iterator.remove();
}
}
}
Expand Down Expand Up @@ -291,13 +294,10 @@ private boolean cleanupExpiredEntries(int maxExpiredEntries, long now) {
public final void close() {
synchronized (lock) {
periodicCleaner.stop();
doClose();
delegate.clear();
invalidateAll();
}
}

protected void doClose() {}

/** Periodically cleans up the AsyncRequestCache. */
private final class PeriodicCleaner {

Expand Down
4 changes: 2 additions & 2 deletions rls/src/main/java/io/grpc/rls/LruCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ interface LruCache<K, V> {
V invalidate(K key);

/**
* Invalidates cache entries for given keys. This operation will trigger {@link EvictionListener}
* Invalidates cache entries for all keys. This operation will trigger {@link EvictionListener}
* with {@link EvictionType#EXPLICIT}.
*/
void invalidateAll(Iterable<K> keys);
void invalidateAll();

/** Returns {@code true} if given key is cached. */
@CheckReturnValue
Expand Down
Loading

0 comments on commit 7a23fb2

Please sign in to comment.