-
Notifications
You must be signed in to change notification settings - Fork 926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TraceAbleRequestContextStorage for ease of detect context leaks #4232
Changes from 20 commits
1d64827
c79b852
443e740
c431103
06c728e
0f5e5e5
bcf92df
82bb1a8
a7fd563
ea2e675
cb8dcf4
50d4127
818c166
5207a4c
d24e0a2
61e767d
449f051
4dabd01
8909ace
fdd8490
5562cfe
f124fc8
a71a725
c4a534f
3b00c2a
af31a6e
ab4de84
9f681d4
a72a9ca
d43b052
4f8a8f2
6cbcfab
89ddc46
6b4b9f9
083d560
4fb9575
7b3f3d1
9d929bf
90e6e78
a0540c8
9156898
50384a2
0fe21fd
caba3d6
1302481
9e8e10e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright 2022 LINE Corporation | ||
* | ||
* LINE Corporation licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package com.linecorp.armeria.common; | ||
|
||
import org.openjdk.jmh.annotations.Benchmark; | ||
|
||
import com.linecorp.armeria.common.util.Sampler; | ||
import com.linecorp.armeria.server.ServiceRequestContext; | ||
|
||
/** | ||
* Microbenchmarks for LeakTracingRequestContextStorage. | ||
*/ | ||
public class LeakTracingRequestContextStorageBenchmark { | ||
|
||
private static final RequestContextStorage threadLocalReqCtxStorage = | ||
RequestContextStorage.threadLocal(); | ||
private static final RequestContextStorage neverSample = | ||
new LeakTracingRequestContextStorage(threadLocalReqCtxStorage, Sampler.never()); | ||
private static final RequestContextStorage rateLimited1 = | ||
new LeakTracingRequestContextStorage(threadLocalReqCtxStorage, Sampler.of("rate-limited=1")); | ||
private static final RequestContextStorage rateLimited10 = | ||
new LeakTracingRequestContextStorage(threadLocalReqCtxStorage, Sampler.of("rate-limited=10")); | ||
private static final RequestContextStorage random1 = | ||
new LeakTracingRequestContextStorage(threadLocalReqCtxStorage, Sampler.of("random=0.01")); | ||
private static final RequestContextStorage random10 = | ||
new LeakTracingRequestContextStorage(threadLocalReqCtxStorage, Sampler.of("random=0.10")); | ||
private static final RequestContextStorage alwaysSample = | ||
new LeakTracingRequestContextStorage(threadLocalReqCtxStorage, Sampler.always()); | ||
private static final RequestContext reqCtx = newCtx("/"); | ||
|
||
private static ServiceRequestContext newCtx(String path) { | ||
return ServiceRequestContext.builder(HttpRequest.of(HttpMethod.GET, path)) | ||
.build(); | ||
} | ||
|
||
@Benchmark | ||
public void baseline_threadLocal() { | ||
final RequestContext oldCtx = threadLocalReqCtxStorage.push(reqCtx); | ||
threadLocalReqCtxStorage.pop(reqCtx, oldCtx); | ||
} | ||
|
||
@Benchmark | ||
public void leakTracing_never_sample() { | ||
final RequestContext oldCtx = neverSample.push(reqCtx); | ||
neverSample.pop(reqCtx, oldCtx); | ||
} | ||
|
||
@Benchmark | ||
public void leakTracing_rateLimited_1() { | ||
final RequestContext oldCtx = rateLimited1.push(reqCtx); | ||
rateLimited1.pop(reqCtx, oldCtx); | ||
} | ||
|
||
@Benchmark | ||
public void leakTracing_rateLimited_10() { | ||
final RequestContext oldCtx = rateLimited10.push(reqCtx); | ||
rateLimited10.pop(reqCtx, oldCtx); | ||
} | ||
|
||
@Benchmark | ||
public void leakTracing_random_1() { | ||
final RequestContext oldCtx = random1.push(reqCtx); | ||
random1.pop(reqCtx, oldCtx); | ||
} | ||
|
||
@Benchmark | ||
public void leakTracing_random_10() { | ||
final RequestContext oldCtx = random10.push(reqCtx); | ||
random10.pop(reqCtx, oldCtx); | ||
} | ||
|
||
@Benchmark | ||
public void leakTracing_always_sample() { | ||
final RequestContext oldCtx = alwaysSample.push(reqCtx); | ||
alwaysSample.pop(reqCtx, oldCtx); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -960,4 +960,19 @@ default Boolean allowDoubleDotsInQueryString() { | |||||
default Path defaultMultipartUploadsLocation() { | ||||||
return null; | ||||||
} | ||||||
|
||||||
/** | ||||||
* Returns the {@link Sampler} that determines whether to trace the stack trace of request contexts leaks | ||||||
* and how frequently to keeps stack trace. A sampled exception will have the stack trace while the others | ||||||
* will have an empty stack trace to eliminate the cost of capturing the stack trace. | ||||||
* | ||||||
* <p>The default value of this flag is {@link Sampler#never()}. | ||||||
* Specify the {@code -Dcom.linecorp.armeria.requestContextLeakDetectionSampler=<specification>} JVM option | ||||||
* to override the default. This feature is disabled if users provide the {@link Sampler#never()}. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* See {@link Sampler#of(String)} for the specification string format.</p> | ||||||
*/ | ||||||
@Nullable | ||||||
ikhoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
default Sampler<? super RequestContext> requestContextLeakDetectionSampler() { | ||||||
return null; | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,116 @@ | ||||||||
/* | ||||||||
* Copyright 2022 LINE Corporation | ||||||||
* | ||||||||
* LINE Corporation licenses this file to you under the Apache License, | ||||||||
* version 2.0 (the "License"); you may not use this file except in compliance | ||||||||
* with the License. You may obtain a copy of the License at: | ||||||||
* | ||||||||
* https://www.apache.org/licenses/LICENSE-2.0 | ||||||||
* | ||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||||||
* License for the specific language governing permissions and limitations | ||||||||
* under the License. | ||||||||
*/ | ||||||||
|
||||||||
package com.linecorp.armeria.common; | ||||||||
|
||||||||
import static com.linecorp.armeria.internal.common.RequestContextUtil.newIllegalContextPushingException; | ||||||||
import static java.lang.Thread.currentThread; | ||||||||
import static java.util.Objects.requireNonNull; | ||||||||
|
||||||||
import com.linecorp.armeria.client.ClientRequestContext; | ||||||||
import com.linecorp.armeria.common.annotation.Nullable; | ||||||||
import com.linecorp.armeria.common.annotation.UnstableApi; | ||||||||
import com.linecorp.armeria.common.util.Sampler; | ||||||||
import com.linecorp.armeria.server.ServiceRequestContext; | ||||||||
|
||||||||
import io.netty.util.concurrent.FastThreadLocal; | ||||||||
|
||||||||
/** | ||||||||
* A {@link RequestContextStorage} which keeps track of {@link RequestContext}s, reporting pushed thread | ||||||||
* information if a {@link RequestContext} is leaked. | ||||||||
*/ | ||||||||
@UnstableApi | ||||||||
public final class LeakTracingRequestContextStorage implements RequestContextStorage { | ||||||||
ikhoon marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Is there any reason this class can't be under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, I didn't add Flag |
||||||||
|
||||||||
private final RequestContextStorage delegate; | ||||||||
private final FastThreadLocal<PendingRequestContextStackTrace> pendingRequestCtx; | ||||||||
private final Sampler<Object> sampler; | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
/** | ||||||||
* Creates a new instance. | ||||||||
* @param delegate the underlying {@link RequestContextStorage} that stores {@link RequestContext} | ||||||||
*/ | ||||||||
public LeakTracingRequestContextStorage(RequestContextStorage delegate) { | ||||||||
this(delegate, Flags.verboseExceptionSampler()); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now, we can use
Suggested change
|
||||||||
} | ||||||||
|
||||||||
/** | ||||||||
* Creates a new instance. | ||||||||
* @param delegate the underlying {@link RequestContextStorage} that stores {@link RequestContext} | ||||||||
* @param sampler the {@link Sampler} that determines whether to retain the stacktrace of the context leaks | ||||||||
*/ | ||||||||
public LeakTracingRequestContextStorage(RequestContextStorage delegate, | ||||||||
Sampler<?> sampler) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
this.delegate = requireNonNull(delegate, "delegate"); | ||||||||
ikhoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
pendingRequestCtx = new FastThreadLocal<>(); | ||||||||
this.sampler = (Sampler<Object>) requireNonNull(sampler, "sampler"); | ||||||||
} | ||||||||
|
||||||||
@Nullable | ||||||||
@Override | ||||||||
public <T extends RequestContext> T push(RequestContext toPush) { | ||||||||
requireNonNull(toPush, "toPush"); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if (sampler == Sampler.never()) {
requestContextStorage = provider.newStorage();
} else {
requestContextStorage = new LeakTracingRequestContextStorage(provider.newStorage(), sampler);
} Also, I'll move There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. If it is moved to internal, we don't have to double-check it. |
||||||||
|
||||||||
final RequestContext prevContext = delegate.currentOrNull(); | ||||||||
if (prevContext != null) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a little concerned that this validation logic is:
e.g.
I guess this PR is a good start, but I'm wondering if it would be more robust to "attach" the caller thread/stacktrace to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for pointing this out 🙏.
Hi @trustin, Could help elaborate on this option? thanks. Another option that I can think of is to store a stack of stack traces. Actually, @minwoox mentioned in the issue that stack is needed because context can be pushed multiple times before getting popped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we need a stack to store stack traces. A request context could be pushed multiple times before it's popped, so the stack is the right data structure for this case. private final FastThreadLocal<Deque<ContextAndStackTrace>> threadLocalStack = new FastThreadLocal<>();
@Nullable
@Override
public <T extends RequestContext> T push(RequestContext toPush) {
...
final RequestContext prevContext = delegate.currentOrNull();
if (prevContext != null) {
if (...) {
// valid
} else {
throw newIllegalContextPushingException(toPush, prevContext, threadLocalStack.get());
}
}
...
threadLocalStack.get().push(new ContextAndStackTrace(toPush, currentThread().getStackTrace()));
...
}
@Override
public void pop(RequestContext current, @Nullable RequestContext toRestore) {
final ContextAndStackTrace last = threadLocalStack.get().peekLast();
if (last == null || last.ctx != current) {
// exception!
}
threadLocalStack.get().pop();
delegate.pop(current, toRestore);
}
private static class ContextAndStackTrace {
private final RequestContext ctx;
private final StackTraceElement[] stackTrace;
ContextAndStackTrace(RequestContext ctx, StackTraceElement[] stackTrace) {
this.ctx = ctx;
this.stackTrace = stackTrace;
}
}
Let's suppose this situation:
Because b and c are not popped we need to provide the stack traces of those contexts. However, if b is sampled and c isn't sampled we cannot provide them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got it, Thanks for the explanation 🙏 |
||||||||
if (prevContext == toPush) { | ||||||||
// Re-entrance | ||||||||
} else if (toPush instanceof ServiceRequestContext && | ||||||||
prevContext.root() == toPush) { | ||||||||
// The delegate has the ServiceRequestContext whose root() is toPush | ||||||||
} else if (toPush instanceof ClientRequestContext && | ||||||||
prevContext.root() == toPush.root()) { | ||||||||
// The delegate has the ClientRequestContext whose root() is the same as toPush.root() | ||||||||
} else { | ||||||||
throw newIllegalContextPushingException(prevContext, toPush, pendingRequestCtx.get()); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
if (sampler.isSampled(PendingRequestContextStackTrace.class)) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
pendingRequestCtx.set(new PendingRequestContextStackTrace(toPush, true)); | ||||||||
} else { | ||||||||
pendingRequestCtx.set(new PendingRequestContextStackTrace(toPush, false)); | ||||||||
} | ||||||||
|
||||||||
return delegate.push(toPush); | ||||||||
} | ||||||||
|
||||||||
@Override | ||||||||
public void pop(RequestContext current, @Nullable RequestContext toRestore) { | ||||||||
try { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. Should we just call |
||||||||
delegate.pop(current, toRestore); | ||||||||
} finally { | ||||||||
pendingRequestCtx.remove(); | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
@Nullable | ||||||||
@Override | ||||||||
public <T extends RequestContext> T currentOrNull() { | ||||||||
return delegate.currentOrNull(); | ||||||||
} | ||||||||
|
||||||||
static class PendingRequestContextStackTrace extends RuntimeException { | ||||||||
|
||||||||
private static final long serialVersionUID = -689451606253441556L; | ||||||||
|
||||||||
PendingRequestContextStackTrace(RequestContext context, boolean isSample) { | ||||||||
super("At thread [" + currentThread().getName() + "], previous RequestContext didn't popped : " + | ||||||||
context + (isSample ? ", It is pushed at the following stacktrace" : ""), null, | ||||||||
true, isSample); | ||||||||
ikhoon marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
} | ||||||||
} | ||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -438,6 +438,26 @@ public Path defaultMultipartUploadsLocation() { | |
return getAndParse("defaultMultipartUploadsLocation", Paths::get); | ||
} | ||
|
||
@Override | ||
public Sampler<? super RequestContext> requestContextLeakDetectionSampler() { | ||
final String spec = getNormalized("requestContextLeakDetectionSampler"); | ||
if (spec == null) { | ||
return null; | ||
} | ||
if ("true".equals(spec) || "always".equals(spec)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about modifying |
||
return Sampler.always(); | ||
} | ||
if ("false".equals(spec) || "never".equals(spec)) { | ||
return Sampler.never(); | ||
} | ||
try { | ||
return Sampler.of(spec); | ||
} catch (Exception e) { | ||
// Invalid sampler specification | ||
throw new IllegalArgumentException("invalid sampler spec: " + spec, e); | ||
} | ||
} | ||
|
||
@Nullable | ||
private static Long getLong(String name) { | ||
return getAndParse(name, Long::parseLong); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -33,11 +33,13 @@ | |||||
import com.linecorp.armeria.client.DefaultClientRequestContext; | ||||||
import com.linecorp.armeria.common.Flags; | ||||||
import com.linecorp.armeria.common.HttpRequest; | ||||||
import com.linecorp.armeria.common.LeakTracingRequestContextStorage; | ||||||
import com.linecorp.armeria.common.RequestContext; | ||||||
import com.linecorp.armeria.common.RequestContextStorage; | ||||||
import com.linecorp.armeria.common.RequestContextStorageProvider; | ||||||
import com.linecorp.armeria.common.annotation.Nullable; | ||||||
import com.linecorp.armeria.common.util.SafeCloseable; | ||||||
import com.linecorp.armeria.common.util.Sampler; | ||||||
import com.linecorp.armeria.server.DefaultServiceRequestContext; | ||||||
|
||||||
import io.netty.channel.ChannelFuture; | ||||||
|
@@ -63,8 +65,14 @@ public final class RequestContextUtil { | |||||
|
||||||
static { | ||||||
final RequestContextStorageProvider provider = Flags.requestContextStorageProvider(); | ||||||
final Sampler<? super RequestContext> sampler = Flags.requestContextLeakDetectionSampler(); | ||||||
try { | ||||||
requestContextStorage = provider.newStorage(); | ||||||
if (sampler.equals(Sampler. never())) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
requestContextStorage = provider.newStorage(); | ||||||
} else { | ||||||
requestContextStorage = | ||||||
new LeakTracingRequestContextStorage(provider.newStorage(), sampler); | ||||||
} | ||||||
} catch (Throwable t) { | ||||||
throw new IllegalStateException("Failed to create context storage. provider: " + provider, t); | ||||||
} | ||||||
|
@@ -89,12 +97,22 @@ public static SafeCloseable noopSafeCloseable() { | |||||
*/ | ||||||
public static IllegalStateException newIllegalContextPushingException( | ||||||
RequestContext newCtx, RequestContext oldCtx) { | ||||||
return newIllegalContextPushingException(newCtx, oldCtx, null); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Returns an {@link IllegalStateException} which is raised when pushing a context from | ||||||
* the unexpected thread or forgetting to close the previous context. | ||||||
*/ | ||||||
public static IllegalStateException newIllegalContextPushingException( | ||||||
RequestContext newCtx, RequestContext oldCtx, @Nullable Throwable cause) { | ||||||
requireNonNull(newCtx, "newCtx"); | ||||||
requireNonNull(oldCtx, "oldCtx"); | ||||||
final IllegalStateException ex = new IllegalStateException( | ||||||
"Trying to call object wrapped with context " + newCtx + ", but context is currently " + | ||||||
"set to " + oldCtx + ". This means the callback was called from " + | ||||||
"unexpected thread or forgetting to close previous context."); | ||||||
final String message = "Trying to call object wrapped with context " + newCtx + ", but context is " + | ||||||
"currently set to " + oldCtx + ". This means the callback was called from " + | ||||||
"unexpected thread or forgetting to close previous context."; | ||||||
final IllegalStateException ex = cause == null ? new IllegalStateException(message) | ||||||
: new IllegalStateException(message, cause); | ||||||
if (REPORTED_THREADS.add(Thread.currentThread())) { | ||||||
logger.warn("An error occurred while pushing a context", ex); | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.