From 953c909b685f6a53e967a8a6c70f6d19b9e09456 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 16 Apr 2019 09:45:01 +0200 Subject: [PATCH 1/6] Recycle via reference counting --- .../apm/agent/impl/ElasticApmTracer.java | 36 ++-- .../async/SpanInScopeCallableWrapper.java | 2 + .../async/SpanInScopeRunnableWrapper.java | 2 + .../apm/agent/impl/error/ErrorPayload.java | 7 - .../apm/agent/impl/payload/Payload.java | 1 - .../impl/payload/TransactionPayload.java | 9 - .../agent/impl/transaction/AbstractSpan.java | 163 ++++++++++++++---- .../apm/agent/impl/transaction/Span.java | 43 ++++- .../agent/impl/transaction/TraceContext.java | 5 + .../impl/transaction/TraceContextHolder.java | 2 + .../agent/impl/transaction/Transaction.java | 25 ++- .../apm/agent/report/ApmServerReporter.java | 4 +- .../report/IntakeV2ReportingEventHandler.java | 4 +- .../co/elastic/apm/agent/MockReporter.java | 22 +++ .../apm/agent/impl/ElasticApmTracerTest.java | 9 +- .../apm/agent/impl/ScopeManagementTest.java | 43 +++-- .../api/ElasticApmApiInstrumentationTest.java | 4 +- ...asticsearchClientAsyncInstrumentation.java | 2 - ...asticsearchClientAsyncInstrumentation.java | 2 - .../concurrent/ExecutorInstrumentation.java | 3 +- .../FailingExecutorInstrumentationTest.java | 8 +- .../OkHttp3ClientAsyncInstrumentation.java | 2 +- .../OkHttpClientAsyncInstrumentation.java | 2 +- .../agent/servlet/AsyncInstrumentation.java | 1 - .../opentracing/OpenTracingBridgeTest.java | 8 +- .../pom.xml | 5 + ...stractServletContainerIntegrationTest.java | 4 +- .../java/co/elastic/apm/servlet/JettyIT.java | 3 +- 28 files changed, 297 insertions(+), 124 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java index c6e9eb2554..2c4c044019 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/ElasticApmTracer.java @@ -210,7 +210,7 @@ public Transaction startTransaction(TraceContext.ChildContextCreator chil if (!coreConfiguration.isActive()) { transaction = noopTransaction(); } else { - transaction = transactionPool.createInstance().start(childContextCreator, parent, epochMicros, sampler); + transaction = createTransaction().start(childContextCreator, parent, epochMicros, sampler); } if (logger.isDebugEnabled()) { logger.debug("startTransaction {} {", transaction); @@ -227,7 +227,16 @@ public Transaction startTransaction(TraceContext.ChildContextCreator chil } public Transaction noopTransaction() { - return transactionPool.createInstance().startNoop(); + return createTransaction().startNoop(); + } + + private Transaction createTransaction() { + Transaction transaction = transactionPool.createInstance(); + while (transaction.getReferenceCount() != 0) { + logger.warn("Tried to start a transaction with a non-zero reference count {} {}", transaction.getReferenceCount(), transaction); + transaction = transactionPool.createInstance(); + } + return transaction; } @Nullable @@ -270,7 +279,7 @@ public Span startSpan(AbstractSpan parent, long epochMicros) { * @see #startSpan(TraceContext.ChildContextCreator, Object) */ public Span startSpan(TraceContext.ChildContextCreator childContextCreator, T parentContext, long epochMicros) { - Span span = spanPool.createInstance(); + Span span = createSpan(); final boolean dropped; Transaction transaction = currentTransaction(); if (transaction != null) { @@ -288,6 +297,15 @@ public Span startSpan(TraceContext.ChildContextCreator childContextCreato return span; } + private Span createSpan() { + Span span = spanPool.createInstance(); + while (span.getReferenceCount() != 0) { + logger.warn("Tried to start a span with a non-zero reference count {} {}", span.getReferenceCount(), span); + span = spanPool.createInstance(); + } + return span; + } + private boolean isTransactionSpanLimitReached(Transaction transaction) { return coreConfiguration.getTransactionMaxSpans() <= transaction.getSpanCount().getStarted().get(); } @@ -347,7 +365,7 @@ public void endTransaction(Transaction transaction) { // we do report non-sampled transactions (without the context) reporter.report(transaction); } else { - transaction.recycle(); + transaction.decrementReferences(); } } @@ -356,13 +374,13 @@ public void endSpan(Span span) { if (span.isSampled()) { long spanFramesMinDurationMs = stacktraceConfiguration.getSpanFramesMinDurationMs(); if (spanFramesMinDurationMs != 0 && span.isSampled()) { - if (span.getDuration() >= spanFramesMinDurationMs) { + if (span.getDurationMs() >= spanFramesMinDurationMs) { span.withStacktrace(new Throwable()); } } reporter.report(span); } else { - span.recycle(); + span.decrementReferences(); } } @@ -475,12 +493,6 @@ public void deactivate(TraceContextHolder holder) { } final Deque> stack = activeStack.get(); assertIsActive(holder, stack.poll()); - if (holder == stack.peekLast()) { - // if this is the bottom of the stack - // clear to avoid potential leaks in case some spans didn't deactivate properly - // makes all leaked spans eligible for GC - stack.clear(); - } } private void assertIsActive(Object span, @Nullable Object currentlyActive) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java index 4bfe6e7ec5..0f1008e631 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeCallableWrapper.java @@ -46,6 +46,7 @@ public SpanInScopeCallableWrapper(ElasticApmTracer tracer) { public SpanInScopeCallableWrapper wrap(Callable delegate, AbstractSpan span) { this.delegate = delegate; this.span = span; + span.incrementReferences(); return this; } @@ -74,6 +75,7 @@ public V call() throws Exception { try { if (localSpan != null) { localSpan.deactivate(); + span.decrementReferences(); } tracer.recycle(this); } catch (Throwable t) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java index 4eff4f6658..3844e10227 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/async/SpanInScopeRunnableWrapper.java @@ -45,6 +45,7 @@ public SpanInScopeRunnableWrapper(ElasticApmTracer tracer) { public SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan span) { this.delegate = delegate; this.span = span; + span.incrementReferences(); return this; } @@ -73,6 +74,7 @@ public void run() { try { if (localSpan != null) { localSpan.deactivate(); + localSpan.decrementReferences(); } tracer.recycle(this); } catch (Throwable t) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java index ee4341f444..0f94656ff3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/error/ErrorPayload.java @@ -63,13 +63,6 @@ public int getPayloadSize() { return errors.size(); } - @Override - public void recycle() { - for (ErrorCapture error : errors) { - error.recycle(); - } - } - @Override public void resetState() { errors.clear(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java index 5685bd5d33..3f39e7154c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/Payload.java @@ -76,5 +76,4 @@ public SystemInfo getSystem() { public abstract int getPayloadSize(); - public abstract void recycle(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java index d26e8ae44c..90c25aa0f2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/payload/TransactionPayload.java @@ -73,13 +73,4 @@ public int getPayloadSize() { return transactions.size() + spans.size(); } - @Override - public void recycle() { - for (int i = 0; i < transactions.size(); i++) { - transactions.get(i).recycle(); - } - for (int i = 0; i < spans.size(); i++) { - spans.get(i).recycle(); - } - } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java index 7f3a03c0cf..19f6791fde 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/AbstractSpan.java @@ -20,47 +20,115 @@ package co.elastic.apm.agent.impl.transaction; import co.elastic.apm.agent.impl.ElasticApmTracer; +import co.elastic.apm.agent.objectpool.Recyclable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractSpan extends TraceContextHolder { private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class); protected static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1); protected final TraceContext traceContext; - // used to mark this span as expected to switch lifecycle-managing-thread, eg span created by one thread and ended by another - private volatile boolean isLifecycleManagingThreadSwitch; - /** * Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id') */ protected final StringBuilder name = new StringBuilder(); private long timestamp; - /** - * How long the transaction took to complete, in ms with 3 decimal points - * (Required) - */ - protected double duration; - private volatile boolean finished = true; + // in microseconds + protected long duration; + protected ReentrantTimer childDurations = new ReentrantTimer(); + protected AtomicInteger references = new AtomicInteger(); + protected volatile boolean finished = true; + + public int getReferenceCount() { + return references.get(); + } + + public static class ReentrantTimer implements Recyclable { + + private AtomicInteger nestingLevel = new AtomicInteger(); + private AtomicLong start = new AtomicLong(); + private AtomicLong duration = new AtomicLong(); + + /** + * Starts the timer if it has not been started already. + * + * @param startTimestamp + */ + public void start(long startTimestamp) { + if (nestingLevel.incrementAndGet() == 1) { + start.set(startTimestamp); + } + } + + /** + * Stops the timer and increments the duration if no other direct children are still running + * @param endTimestamp + */ + public void stop(long endTimestamp) { + if (nestingLevel.decrementAndGet() == 0) { + incrementDuration(endTimestamp); + } + } + + /** + * Stops the timer and increments the duration even if there are direct children which are still running + * + * @param endTimestamp + */ + public void forceStop(long endTimestamp) { + if (nestingLevel.getAndSet(0) != 0) { + incrementDuration(endTimestamp); + } + } + + private void incrementDuration(long epochMicros) { + duration.addAndGet(epochMicros - start.get()); + } + + @Override + public void resetState() { + nestingLevel.set(0); + start.set(0); + duration.set(0); + } + + public long getDuration() { + return duration.get(); + } + } public AbstractSpan(ElasticApmTracer tracer) { super(tracer); traceContext = TraceContext.with64BitId(this.tracer); } + public boolean isReferenced() { + return references.get() > 0; + } + /** - * How long the transaction took to complete, in ms with 3 decimal points - * (Required) + * How long the transaction took to complete, in µs */ - public double getDuration() { + public long getDuration() { return duration; } + public long getSelfDuration() { + return duration - childDurations.getDuration(); + } + + public double getDurationMs() { + return duration / AbstractSpan.MS_IN_MICROS; + } + /** * Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id') */ @@ -113,8 +181,9 @@ public void resetState() { name.setLength(0); timestamp = 0; duration = 0; - isLifecycleManagingThreadSwitch = false; traceContext.resetState(); + childDurations.resetState(); + references.set(0); } public boolean isChildOf(AbstractSpan parent) { @@ -126,8 +195,11 @@ public Span createSpan() { return createSpan(traceContext.getClock().getEpochMicros()); } + @Override public Span createSpan(long epochMicros) { - return tracer.startSpan(this, epochMicros); + final Span span = tracer.startSpan(this, epochMicros); + onChildStart(span, epochMicros); + return span; } public abstract void addLabel(String key, String value); @@ -136,8 +208,15 @@ public Span createSpan(long epochMicros) { public abstract void addLabel(String key, Boolean value); - protected void onStart() { + /** + * Called after the span has been started and its parent references are set + */ + protected void onAfterStart() { this.finished = false; + // this final reference is decremented when the span is reported + // or even after its reported and the last child span is ended + references.set(0); + incrementReferences(); } public void end() { @@ -146,12 +225,14 @@ public void end() { public final void end(long epochMicros) { if (!finished) { - this.finished = true; - this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS; + this.duration = (epochMicros - timestamp); if (name.length() == 0) { name.append("unnamed"); } + childDurations.forceStop(epochMicros); doEnd(epochMicros); + // has to be set last so doEnd callbacks don't think it has already been finished + this.finished = true; } else { logger.warn("End has already been called: {}", this); assert false; @@ -165,20 +246,19 @@ public boolean isChildOf(TraceContextHolder other) { return getTraceContext().isChildOf(other); } - public void markLifecycleManagingThreadSwitchExpected() { - isLifecycleManagingThreadSwitch = true; + @Override + public T activate() { + incrementReferences(); + return super.activate(); } @Override - public T activate() { - if (isLifecycleManagingThreadSwitch) { - // This serves two goals: - // 1. resets the lifecycle management flag, so that the executing thread will remain in charge until set otherwise - // by setting this flag once more - // 2. reading this volatile field when span is activated on a new thread ensures proper visibility of other span data - isLifecycleManagingThreadSwitch = false; + public T deactivate() { + try { + return super.deactivate(); + } finally { + decrementReferences(); } - return super.activate(); } /** @@ -191,11 +271,7 @@ public T activate() { */ @Override public Runnable withActive(Runnable runnable) { - if (isLifecycleManagingThreadSwitch) { - return tracer.wrapRunnable(runnable, this); - } else { - return tracer.wrapRunnable(runnable, traceContext); - } + return tracer.wrapRunnable(runnable, this); } /** @@ -208,15 +284,28 @@ public Runnable withActive(Runnable runnable) { */ @Override public Callable withActive(Callable callable) { - if (isLifecycleManagingThreadSwitch) { - return tracer.wrapCallable(callable, this); - } else { - return tracer.wrapCallable(callable, traceContext); - } + return tracer.wrapCallable(callable, this); } public void setStartTimestamp(long epochMicros) { timestamp = epochMicros; } + private void onChildStart(Span span, long epochMicros) { + incrementReferences(); + childDurations.start(epochMicros); + } + + void onChildEnd(Span span, long epochMicros) { + childDurations.stop(epochMicros); + decrementReferences(); + } + + public void incrementReferences() { + references.incrementAndGet(); + logger.trace("increment references to {} ({})", this, references); + } + + public abstract void decrementReferences(); + } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java index 0456ca8a46..afa17f9cc3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Span.java @@ -58,14 +58,25 @@ public class Span extends AbstractSpan implements Recyclable { private final SpanContext context = new SpanContext(); @Nullable private Throwable stacktrace; + @Nullable + private AbstractSpan parent; + @Nullable + private Transaction transaction; public Span(ElasticApmTracer tracer) { super(tracer); } public Span start(TraceContext.ChildContextCreator childContextCreator, T parentContext, long epochMicros, boolean dropped) { - onStart(); childContextCreator.asChildOf(traceContext, parentContext); + if (parentContext instanceof Transaction) { + this.transaction = (Transaction) parentContext; + this.parent = this.transaction; + } else if (parentContext instanceof Span) { + final Span parentSpan = (Span) parentContext; + this.parent = parentSpan; + this.transaction = parentSpan.transaction; + } if (dropped) { traceContext.setRecorded(false); } @@ -81,6 +92,7 @@ public Span start(TraceContext.ChildContextCreator childContextCreator, T new RuntimeException("this exception is just used to record where the span has been started from")); } } + onAfterStart(); return this; } @@ -178,6 +190,9 @@ public void doEnd(long epochMicros) { if (type == null) { type = "custom"; } + if (parent != null) { + parent.onChildEnd(this, epochMicros); + } this.tracer.endSpan(this); } @@ -189,6 +204,8 @@ public void resetState() { type = null; subtype = null; action = null; + parent = null; + transaction = null; } @Override @@ -206,10 +223,6 @@ public void addLabel(String key, Boolean value) { context.addLabel(key, value); } - public void recycle() { - tracer.recycle(this); - } - @Override public String toString() { return String.format("'%s' %s", name, traceContext); @@ -219,4 +232,24 @@ public Span withStacktrace(Throwable stacktrace) { this.stacktrace = stacktrace; return this; } + + @Override + public void incrementReferences() { + if (transaction != null) { + transaction.incrementReferences(); + } + super.incrementReferences(); + } + + @Override + public void decrementReferences() { + if (transaction != null) { + transaction.decrementReferences(); + } + final int referenceCount = references.decrementAndGet(); + if (referenceCount == 0) { + tracer.recycle(this); + } + logger.trace("decrement references to {} ({})", this, referenceCount); + } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java index 5b7c88128a..0d97b1f786 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContext.java @@ -382,6 +382,11 @@ public Span createSpan() { return tracer.startSpan(fromParent(), this); } + @Override + public Span createSpan(long epochMicros) { + return tracer.startSpan(fromParent(), this, epochMicros); + } + public interface ChildContextCreator { boolean asChildOf(TraceContext child, T parent); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java index f0f229e90f..96ad68788e 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/TraceContextHolder.java @@ -70,6 +70,8 @@ public TraceContextHolder asExit() { public abstract Span createSpan(); + public abstract Span createSpan(long epochMicros); + /** * Creates a child Span representing a remote call event, unless this TraceContextHolder already represents an exit event. * If current TraceContextHolder is representing an Exit- returns null diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java index fb3ad2d32e..9e170d95c4 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/impl/transaction/Transaction.java @@ -22,6 +22,8 @@ import co.elastic.apm.agent.impl.ElasticApmTracer; import co.elastic.apm.agent.impl.context.TransactionContext; import co.elastic.apm.agent.impl.sampling.Sampler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -30,6 +32,8 @@ */ public class Transaction extends AbstractSpan { + private static final Logger logger = LoggerFactory.getLogger(Transaction.class); + public static final String TYPE_REQUEST = "request"; /** @@ -63,7 +67,6 @@ public Transaction(ElasticApmTracer tracer) { } public Transaction start(TraceContext.ChildContextCreator childContextCreator, @Nullable T parent, long epochMicros, Sampler sampler) { - onStart(); if (parent == null || !childContextCreator.asChildOf(traceContext, parent)) { traceContext.asRootSpan(sampler); } @@ -72,13 +75,14 @@ public Transaction start(TraceContext.ChildContextCreator childContextCre } else { setStartTimestamp(traceContext.getClock().getEpochMicros()); } + onAfterStart(); return this; } public Transaction startNoop() { - onStart(); this.name.append("noop"); this.noop = true; + onAfterStart(); return this; } @@ -189,10 +193,6 @@ public void resetState() { type = null; } - public void recycle() { - tracer.recycle(this); - } - public boolean isNoop() { return noop; } @@ -213,4 +213,17 @@ public String getType() { public String toString() { return String.format("'%s' %s", name, traceContext); } + + @Override + public void incrementReferences() { + super.incrementReferences(); + } + + public void decrementReferences() { + final int referenceCount = this.references.decrementAndGet(); + logger.trace("decrement references to {} ({})", this, referenceCount); + if (referenceCount == 0) { + tracer.recycle(this); + } + } } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java index f85f6771f5..a649782705 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java @@ -115,7 +115,7 @@ public Thread newThread(Runnable r) { @Override public void report(Transaction transaction) { if (!tryAddEventToRingBuffer(transaction, TRANSACTION_EVENT_TRANSLATOR)) { - transaction.recycle(); + transaction.decrementReferences(); } if (syncReport) { waitForFlush(); @@ -125,7 +125,7 @@ public void report(Transaction transaction) { @Override public void report(Span span) { if (!tryAddEventToRingBuffer(span, SPAN_EVENT_TRANSLATOR)) { - span.recycle(); + span.decrementReferences(); } if (syncReport) { waitForFlush(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java index c7c7758dfc..a14e545888 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java @@ -187,11 +187,11 @@ private void writeEvent(ReportingEvent event) { if (event.getTransaction() != null) { currentlyTransmitting++; payloadSerializer.serializeTransactionNdJson(event.getTransaction()); - event.getTransaction().recycle(); + event.getTransaction().decrementReferences(); } else if (event.getSpan() != null) { currentlyTransmitting++; payloadSerializer.serializeSpanNdJson(event.getSpan()); - event.getSpan().recycle(); + event.getSpan().decrementReferences(); } else if (event.getError() != null) { currentlyTransmitting++; payloadSerializer.serializeErrorNdJson(event.getError()); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java index 6446cad677..e6ef61b720 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/MockReporter.java @@ -24,10 +24,13 @@ import co.elastic.apm.agent.impl.payload.PayloadUtils; import co.elastic.apm.agent.impl.payload.TransactionPayload; import co.elastic.apm.agent.impl.stacktrace.StacktraceConfiguration; +import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import co.elastic.apm.agent.metrics.MetricRegistry; +import co.elastic.apm.agent.report.IntakeV2ReportingEventHandler; import co.elastic.apm.agent.report.Reporter; +import co.elastic.apm.agent.report.ReportingEvent; import co.elastic.apm.agent.report.serialize.DslJsonSerializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -243,4 +246,23 @@ public void reset() { errors.clear(); spans.clear(); } + + /** + * Calls {@link AbstractSpan#decrementReferences()} for all reported transactions and spans to emulate the references being decremented + * after reporting to the APM Server. + * See {@link IntakeV2ReportingEventHandler#writeEvent(ReportingEvent)} + */ + public void decrementReferences() { + transactions.forEach(Transaction::decrementReferences); + spans.forEach(Span::decrementReferences); + } + + public void assertRecycledAfterDecrementingReferences() { + transactions.forEach(t -> assertThat(t.getTraceContext().getId().isEmpty()).isFalse()); + spans.forEach(s -> assertThat(s.getTraceContext().getId().isEmpty()).isFalse()); + transactions.forEach(Transaction::decrementReferences); + spans.forEach(Span::decrementReferences); + transactions.forEach(t -> assertThat(t.getTraceContext().getId().isEmpty()).isTrue()); + spans.forEach(s -> assertThat(s.getTraceContext().getId().isEmpty()).isTrue()); + } } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java index da5bb3067f..31eaa935c9 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ElasticApmTracerTest.java @@ -62,7 +62,7 @@ void testThreadLocalStorage() { Transaction transaction = tracerImpl.startTransaction(TraceContext.asRoot(), null, getClass().getClassLoader()); try (Scope scope = transaction.activateInScope()) { assertThat(tracerImpl.currentTransaction()).isSameAs(transaction); - Span span = tracerImpl.getActive().createSpan(); + Span span = tracerImpl.getActive().createSpan().withType("app"); try (Scope spanScope = span.activateInScope()) { assertThat(tracerImpl.currentTransaction()).isSameAs(transaction); assertThat(tracerImpl.getActive()).isSameAs(span); @@ -346,9 +346,9 @@ void testTimestamps() { transaction.end(30); assertThat(transaction.getTimestamp()).isEqualTo(0); - assertThat(transaction.getDuration()).isEqualTo(0.03); + assertThat(transaction.getDuration()).isEqualTo(30); assertThat(span.getTimestamp()).isEqualTo(10); - assertThat(span.getDuration()).isEqualTo(0.01); + assertThat(span.getDuration()).isEqualTo(10); } @Test @@ -356,7 +356,8 @@ void testStartSpanAfterTransactionHasEnded() { final Transaction transaction = tracerImpl.startTransaction(TraceContext.asRoot(), null, getClass().getClassLoader()); final TraceContext transactionTraceContext = transaction.getTraceContext().copy(); transaction.end(); - transaction.resetState(); + + reporter.assertRecycledAfterDecrementingReferences(); tracerImpl.activate(transactionTraceContext); try { diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java index fb69b34efd..372fa6a250 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/impl/ScopeManagementTest.java @@ -93,21 +93,18 @@ void testActivateTwice() { } @Test - void testMissingDeactivation() { - runTestWithAssertionsDisabled(() -> { - final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.createSpan().activate(); - transaction.deactivate(); + void testRedundantActivation() { + tracer.startTransaction(TraceContext.asRoot(), null, null) + .activate().activate() + .deactivate().deactivate(); - assertThat(tracer.getActive()).isNull(); - }); + assertThat(tracer.getActive()).isNull(); } @Test void testContextAndSpanRunnableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); transaction.withActive(transaction.withActive((Runnable) () -> assertThat(tracer.getActive()).isSameAs(transaction))).run(); transaction.deactivate(); @@ -120,7 +117,6 @@ void testContextAndSpanRunnableActivation() { void testContextAndSpanCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); try { assertThat(transaction.withActive(transaction.withActive(() -> tracer.currentTransaction())).call()).isSameAs(transaction); } catch (Exception e) { @@ -138,7 +134,6 @@ void testSpanAndContextRunnableActivation() { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Runnable runnable = transaction.withActive((Runnable) () -> assertThat(tracer.currentTransaction()).isSameAs(transaction)); - transaction.markLifecycleManagingThreadSwitchExpected(); transaction.withActive(runnable).run(); transaction.deactivate(); @@ -151,7 +146,6 @@ void testSpanAndContextCallableActivation() { runTestWithAssertionsDisabled(() -> { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Callable callable = transaction.withActive(() -> tracer.currentTransaction()); - transaction.markLifecycleManagingThreadSwitchExpected(); try { assertThat(transaction.withActive(callable).call()).isSameAs(transaction); } catch (Exception e) { @@ -166,7 +160,6 @@ void testSpanAndContextCallableActivation() { @Test void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { assertThat(tracer.getActive()).isSameAs(transaction); assertThat(tracer.currentTransaction()).isSameAs(transaction); @@ -179,7 +172,6 @@ void testContextAndSpanRunnableActivationInDifferentThread() throws Exception { @Test void testContextAndSpanCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); - transaction.markLifecycleManagingThreadSwitchExpected(); Future transactionFuture = Executors.newSingleThreadExecutor().submit(transaction.withActive(transaction.withActive(() -> { assertThat(tracer.getActive()).isSameAs(transaction); return tracer.currentTransaction(); @@ -195,9 +187,8 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Runnable runnable = transaction.withActive(() -> { assertThat(tracer.currentTransaction()).isSameAs(transaction); - assertThat(tracer.getActive()).isInstanceOf(TraceContext.class); + assertThat(tracer.getActive()).isSameAs(transaction); }); - transaction.markLifecycleManagingThreadSwitchExpected(); Executors.newSingleThreadExecutor().submit(transaction.withActive(runnable)).get(); transaction.deactivate(); @@ -208,13 +199,31 @@ void testSpanAndContextRunnableActivationInDifferentThread() throws Exception { void testSpanAndContextCallableActivationInDifferentThread() throws Exception { final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); Callable callable = transaction.withActive(() -> { - assertThat(tracer.getActive()).isInstanceOf(TraceContext.class); + assertThat(tracer.currentTransaction()).isSameAs(transaction); return tracer.currentTransaction(); }); - transaction.markLifecycleManagingThreadSwitchExpected(); assertThat(Executors.newSingleThreadExecutor().submit(transaction.withActive(callable)).get()).isSameAs(transaction); transaction.deactivate(); assertThat(tracer.getActive()).isNull(); } + + @Test + void testAsyncActivationAfterEnd() throws Exception { + final Transaction transaction = tracer.startTransaction(TraceContext.asRoot(), null, null).activate(); + Callable callable = transaction.withActive(() -> { + assertThat(tracer.getActive()).isSameAs(transaction); + return tracer.currentTransaction(); + }); + transaction.deactivate().end(); + reporter.decrementReferences(); + assertThat(transaction.isReferenced()).isTrue(); + + assertThat(Executors.newSingleThreadExecutor().submit(callable).get()).isSameAs(transaction); + assertThat(transaction.isReferenced()).isFalse(); + // recycled because the transaction is finished, reported and the reference counter is 0 + assertThat(transaction.getTraceContext().getTraceId().isEmpty()).isTrue(); + + assertThat(tracer.getActive()).isNull(); + } } diff --git a/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java b/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java index 9f99c728d6..58beb65101 100644 --- a/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java +++ b/apm-agent-plugins/apm-api-plugin/src/test/java/co/elastic/apm/api/ElasticApmApiInstrumentationTest.java @@ -277,8 +277,8 @@ void testManualTimestamps() { transaction.startSpan().setStartTimestamp(1000).end(2000); transaction.end(3000); - assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(3); - assertThat(reporter.getFirstSpan().getDuration()).isEqualTo(1); + assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(3000); + assertThat(reporter.getFirstSpan().getDuration()).isEqualTo(1000); } @Test diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java index f9ab768a7a..183d72af78 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/agent/es/restclient/v5_6/ElasticsearchClientAsyncInstrumentation.java @@ -83,8 +83,6 @@ private static void onBeforeExecute(@Advice.Argument(0) String method, span = helper.createClientSpan(method, endpoint, entity); if (span != null) { responseListener = helper.wrapResponseListener(responseListener, span); - // write to the span's volatile field to ensure proper visibility on the executing thread - span.markLifecycleManagingThreadSwitchExpected(); wrapped = true; } } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java index b327d13d26..c1bfd55211 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/agent/es/restclient/v6_4/ElasticsearchClientAsyncInstrumentation.java @@ -75,8 +75,6 @@ private static void onBeforeExecute(@Advice.Argument(0) Request request, span = helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity()); if (span != null) { responseListener = helper.wrapResponseListener(responseListener, span); - // write to the span's volatile field to ensure proper visibility on the executing thread - span.markLifecycleManagingThreadSwitchExpected(); wrapped = true; } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java index b77e670e5b..4fdc03ff24 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentation.java @@ -61,7 +61,8 @@ public ElementMatcher getTypeMatcherPreFilter() { public ElementMatcher getTypeMatcher() { return hasSuperType(named("java.util.concurrent.Executor")) // hazelcast tries to serialize the Runnables/Callables to execute them on remote JVMs - .and(not(nameStartsWith("com.hazelcast"))); + .and(not(nameStartsWith("com.hazelcast"))) + .and(not(nameStartsWith("org.apache.felix.resolver"))); } @Override diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java index e7ca8c2504..d2ef892006 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/FailingExecutorInstrumentationTest.java @@ -20,8 +20,8 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.AbstractInstrumentationTest; -import co.elastic.apm.agent.impl.async.ContextInScopeCallableWrapper; -import co.elastic.apm.agent.impl.async.ContextInScopeRunnableWrapper; +import co.elastic.apm.agent.impl.async.SpanInScopeCallableWrapper; +import co.elastic.apm.agent.impl.async.SpanInScopeRunnableWrapper; import co.elastic.apm.agent.impl.transaction.TraceContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -47,7 +47,7 @@ void setUp() { executor = ExecutorServiceWrapper.wrap(new ForkJoinPool() { @Override public ForkJoinTask submit(Runnable task) { - if (task instanceof ContextInScopeRunnableWrapper) { + if (task instanceof SpanInScopeRunnableWrapper) { submitWithWrapperCounter.incrementAndGet(); throw new ClassCastException(); } @@ -56,7 +56,7 @@ public ForkJoinTask submit(Runnable task) { @Override public ForkJoinTask submit(Callable task) { - if (task instanceof ContextInScopeCallableWrapper) { + if (task instanceof SpanInScopeCallableWrapper) { submitWithWrapperCounter.incrementAndGet(); throw new IllegalArgumentException(); } diff --git a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java index dfe852207e..274c534d8e 100644 --- a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttp3ClientAsyncInstrumentation.java @@ -90,7 +90,7 @@ private static void onBeforeEnqueue(@Advice.Origin Class clazz, okhttp3.Request request = originalRequest; span = HttpClientHelper.startHttpClientSpan(parent, request.method(), request.url().toString(), request.url().host()); if (span != null) { - span.activate().markLifecycleManagingThreadSwitchExpected(); + span.activate(); originalRequest = originalRequest.newBuilder().addHeader(TraceContext.TRACE_PARENT_HEADER, span.getTraceContext().getOutgoingTraceParentHeader().toString()).build(); callback = wrapperCreator.wrap(callback, span); } diff --git a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java index 2a62707d5f..5e1ac6fda3 100644 --- a/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java +++ b/apm-agent-plugins/apm-okhttp-plugin/src/main/java/co/elastic/apm/agent/okhttp/OkHttpClientAsyncInstrumentation.java @@ -89,7 +89,7 @@ private static void onBeforeEnqueue(@Advice.Origin Class clazz, Request request = originalRequest; span = HttpClientHelper.startHttpClientSpan(parent, request.method(), request.url().toString(), request.url().getHost()); if (span != null) { - span.activate().markLifecycleManagingThreadSwitchExpected(); + span.activate(); originalRequest = originalRequest.newBuilder().addHeader(TraceContext.TRACE_PARENT_HEADER, span.getTraceContext().getOutgoingTraceParentHeader().toString()).build(); callback = wrapperCreator.wrap(callback, span); } diff --git a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java index 92a1ad31d0..0ac53fe4cf 100644 --- a/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java +++ b/apm-agent-plugins/apm-servlet-plugin/src/main/java/co/elastic/apm/agent/servlet/AsyncInstrumentation.java @@ -175,7 +175,6 @@ private static void onEnterAsyncContextStart(@Advice.Argument(value = 0, readOnl if (tracer != null && runnable != null) { final Transaction transaction = tracer.currentTransaction(); if (transaction != null) { - transaction.markLifecycleManagingThreadSwitchExpected(); runnable = transaction.withActive(runnable); } } diff --git a/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java b/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java index 600ed1d85f..2edb07714d 100644 --- a/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java +++ b/apm-opentracing/src/test/java/co/elastic/apm/opentracing/OpenTracingBridgeTest.java @@ -63,7 +63,7 @@ void testCreateNonActiveTransaction() { span.finish(TimeUnit.MILLISECONDS.toMicros(1)); assertThat(reporter.getTransactions()).hasSize(1); - assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(1); + assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(1000); assertThat(reporter.getFirstTransaction().getName().toString()).isEqualTo("test"); } @@ -75,11 +75,11 @@ void sanityCheckRealTimestamps() { final long epochMicros = System.currentTimeMillis() * 1000; assertThat(reporter.getTransactions()).hasSize(1); - assertThat(reporter.getFirstTransaction().getDuration()).isLessThan(MINUTES.toMillis(1)); + assertThat(reporter.getFirstTransaction().getDuration()).isLessThan(MINUTES.toMicros(1)); assertThat(reporter.getFirstTransaction().getTimestamp()).isCloseTo(epochMicros, offset(MINUTES.toMicros(1))); assertThat(reporter.getSpans()).hasSize(1); - assertThat(reporter.getFirstSpan().getDuration()).isLessThan(MINUTES.toMillis(1)); + assertThat(reporter.getFirstSpan().getDuration()).isLessThan(MINUTES.toMicros(1)); assertThat(reporter.getFirstSpan().getTimestamp()).isCloseTo(epochMicros, offset(MINUTES.toMicros(1))); } @@ -184,7 +184,7 @@ void testCreateActiveTransaction() { assertThat(reporter.getTransactions()).hasSize(0); // manually finish span - scope.span().finish(TimeUnit.MILLISECONDS.toMicros(1)); + scope.span().finish(1); assertThat(reporter.getTransactions()).hasSize(1); assertThat(reporter.getFirstTransaction().getDuration()).isEqualTo(1); assertThat(reporter.getFirstTransaction().getName().toString()).isEqualTo("test"); diff --git a/integration-tests/application-server-integration-tests/pom.xml b/integration-tests/application-server-integration-tests/pom.xml index a10e100493..5e195ede47 100644 --- a/integration-tests/application-server-integration-tests/pom.xml +++ b/integration-tests/application-server-integration-tests/pom.xml @@ -85,6 +85,11 @@ commons-io 1.3.2 + + com.github.terma + javaniotcpproxy + 1.5 +