Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recycle via reference counting #587

Merged
merged 8 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public <T> Transaction startTransaction(TraceContext.ChildContextCreator<T> chil
if (!coreConfiguration.isActive()) {
transaction = noopTransaction();
} else {
transaction = transactionPool.createInstance().start(childContextCreator, parent, epochMicros, sampler);
transaction = createTransaction().start(childContextCreator, parent, epochMicros, sampler);
}
if (logger.isDebugEnabled()) {
logger.debug("startTransaction {} {", transaction);
Expand All @@ -252,7 +252,16 @@ public <T> Transaction startTransaction(TraceContext.ChildContextCreator<T> chil
}

public Transaction noopTransaction() {
return transactionPool.createInstance().startNoop();
return createTransaction().startNoop();
}

private Transaction createTransaction() {
Transaction transaction = transactionPool.createInstance();
while (transaction.getReferenceCount() != 0) {
logger.warn("Tried to start a transaction with a non-zero reference count {} {}", transaction.getReferenceCount(), transaction);
eyalkoren marked this conversation as resolved.
Show resolved Hide resolved
transaction = transactionPool.createInstance();
}
return transaction;
}

@Nullable
Expand Down Expand Up @@ -295,7 +304,7 @@ public Span startSpan(AbstractSpan<?> parent, long epochMicros) {
* @see #startSpan(TraceContext.ChildContextCreator, Object)
*/
public <T> Span startSpan(TraceContext.ChildContextCreator<T> childContextCreator, T parentContext, long epochMicros) {
Span span = spanPool.createInstance();
Span span = createSpan();
final boolean dropped;
Transaction transaction = currentTransaction();
if (transaction != null) {
Expand All @@ -313,6 +322,15 @@ public <T> Span startSpan(TraceContext.ChildContextCreator<T> childContextCreato
return span;
}

private Span createSpan() {
Span span = spanPool.createInstance();
while (span.getReferenceCount() != 0) {
logger.warn("Tried to start a span with a non-zero reference count {} {}", span.getReferenceCount(), span);
span = spanPool.createInstance();
eyalkoren marked this conversation as resolved.
Show resolved Hide resolved
}
return span;
}

private boolean isTransactionSpanLimitReached(Transaction transaction) {
return coreConfiguration.getTransactionMaxSpans() <= transaction.getSpanCount().getStarted().get();
}
Expand Down Expand Up @@ -372,7 +390,7 @@ public void endTransaction(Transaction transaction) {
// we do report non-sampled transactions (without the context)
reporter.report(transaction);
} else {
transaction.recycle();
transaction.decrementReferences();
}
}

Expand All @@ -387,7 +405,7 @@ public void endSpan(Span span) {
}
reporter.report(span);
} else {
span.recycle();
span.decrementReferences();
}
}

Expand Down Expand Up @@ -489,14 +507,14 @@ public List<ActivationListener> getActivationListeners() {

public void activate(TraceContextHolder<?> holder) {
if (logger.isDebugEnabled()) {
logger.debug("Activating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId());
logger.debug("Activating {} on thread {}", holder, Thread.currentThread().getId());
}
activeStack.get().push(holder);
}

public void deactivate(TraceContextHolder<?> holder) {
if (logger.isDebugEnabled()) {
logger.debug("Deactivating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId());
logger.debug("Deactivating {} on thread {}", holder, Thread.currentThread().getId());
}
final Deque<TraceContextHolder<?>> stack = activeStack.get();
assertIsActive(holder, stack.poll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ protected boolean beforeDelegation(final AbstractSpan<?> localSpan) {

protected void afterDelegation(final AbstractSpan<?> localSpan, boolean activated) {
try {
if (localSpan != null && activated) {
localSpan.deactivate();
if (localSpan != null) {
if (activated) {
localSpan.deactivate();
}
localSpan.decrementReferences();
}
doRecycle();
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public SpanInScopeCallableWrapper(ElasticApmTracer tracer) {
public SpanInScopeCallableWrapper<V> wrap(Callable<V> delegate, AbstractSpan<?> span) {
this.delegate = delegate;
this.span = span;
span.incrementReferences();
eyalkoren marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public SpanInScopeRunnableWrapper(ElasticApmTracer tracer) {
public SpanInScopeRunnableWrapper wrap(Runnable delegate, AbstractSpan<?> span) {
this.delegate = delegate;
this.span = span;
span.incrementReferences();
felixbarny marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ public int getPayloadSize() {
return errors.size();
}

@Override
public void recycle() {
for (ErrorCapture error : errors) {
error.recycle();
}
}

@Override
public void resetState() {
errors.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,4 @@ public SystemInfo getSystem() {

public abstract int getPayloadSize();

public abstract void recycle();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,4 @@ public int getPayloadSize() {
return transactions.size() + spans.size();
}

@Override
public void recycle() {
for (int i = 0; i < transactions.size(); i++) {
transactions.get(i).recycle();
}
for (int i = 0; i < spans.size(); i++) {
spans.get(i).recycle();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,40 @@
import javax.annotation.Nullable;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractSpan<T extends AbstractSpan> extends TraceContextHolder<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractSpan.class);
protected static final double MS_IN_MICROS = TimeUnit.MILLISECONDS.toMicros(1);
protected final TraceContext traceContext;

// used to mark this span as expected to switch lifecycle-managing-thread, eg span created by one thread and ended by another
private volatile boolean isLifecycleManagingThreadSwitch;

/**
eyalkoren marked this conversation as resolved.
Show resolved Hide resolved
* Generic designation of a transaction in the scope of a single service (eg: 'GET /users/:id')
*/
protected final StringBuilder name = new StringBuilder();
private long timestamp;

/**
* How long the transaction took to complete, in ms with 3 decimal points
* (Required)
*/
protected double duration;
protected AtomicInteger references = new AtomicInteger();
protected volatile boolean finished = true;

private volatile boolean finished = true;
public int getReferenceCount() {
return references.get();
}

public AbstractSpan(ElasticApmTracer tracer) {
super(tracer);
traceContext = TraceContext.with64BitId(this.tracer);
}

public boolean isReferenced() {
return references.get() > 0;
}

/**
* How long the transaction took to complete, in ms with 3 decimal points
* (Required)
Expand Down Expand Up @@ -116,8 +123,8 @@ public void resetState() {
name.setLength(0);
timestamp = 0;
duration = 0;
isLifecycleManagingThreadSwitch = false;
traceContext.resetState();
references.set(0);
eyalkoren marked this conversation as resolved.
Show resolved Hide resolved
}

public boolean isChildOf(AbstractSpan<?> parent) {
Expand All @@ -129,6 +136,7 @@ public Span createSpan() {
return createSpan(traceContext.getClock().getEpochMicros());
}

@Override
public Span createSpan(long epochMicros) {
return tracer.startSpan(this, epochMicros);
}
Expand All @@ -153,8 +161,14 @@ public void addLabel(String key, Boolean value) {

public abstract AbstractContext getContext();

protected void onStart() {
/**
* Called after the span has been started and its parent references are set
*/
protected void onAfterStart() {
this.finished = false;
// this final reference is decremented when the span is reported
// or even after its reported and the last child span is ended
incrementReferences();
}

public void end() {
Expand All @@ -163,12 +177,13 @@ public void end() {

public final void end(long epochMicros) {
if (!finished) {
this.finished = true;
this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS;
this.duration = (epochMicros - timestamp) / AbstractSpan.MS_IN_MICROS;
if (name.length() == 0) {
name.append("unnamed");
}
doEnd(epochMicros);
// has to be set last so doEnd callbacks don't think it has already been finished
this.finished = true;
} else {
logger.warn("End has already been called: {}", this);
assert false;
Expand All @@ -182,20 +197,19 @@ public boolean isChildOf(TraceContextHolder other) {
return getTraceContext().isChildOf(other);
}

public void markLifecycleManagingThreadSwitchExpected() {
isLifecycleManagingThreadSwitch = true;
@Override
public T activate() {
incrementReferences();
return super.activate();
}

@Override
public T activate() {
if (isLifecycleManagingThreadSwitch) {
// This serves two goals:
// 1. resets the lifecycle management flag, so that the executing thread will remain in charge until set otherwise
// by setting this flag once more
// 2. reading this volatile field when span is activated on a new thread ensures proper visibility of other span data
isLifecycleManagingThreadSwitch = false;
public T deactivate() {
try {
return super.deactivate();
} finally {
decrementReferences();
}
return super.activate();
}

/**
Expand All @@ -208,11 +222,7 @@ public T activate() {
*/
@Override
public Runnable withActive(Runnable runnable) {
if (isLifecycleManagingThreadSwitch) {
return tracer.wrapRunnable(runnable, this);
} else {
return tracer.wrapRunnable(runnable, traceContext);
}
return tracer.wrapRunnable(runnable, this);
}

/**
Expand All @@ -225,15 +235,32 @@ public Runnable withActive(Runnable runnable) {
*/
@Override
public <V> Callable<V> withActive(Callable<V> callable) {
if (isLifecycleManagingThreadSwitch) {
return tracer.wrapCallable(callable, this);
} else {
return tracer.wrapCallable(callable, traceContext);
}
return tracer.wrapCallable(callable, this);
}

public void setStartTimestamp(long epochMicros) {
timestamp = epochMicros;
}

public void incrementReferences() {
references.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("increment references to {} ({})", this, references);
if (logger.isTraceEnabled()) {
logger.trace("incrementing references at",
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been incremented."));
}
}
}

public void decrementReferences() {
if (logger.isDebugEnabled()) {
logger.debug("decrement references to {} ({})", this, references);
if (logger.isTraceEnabled()) {
logger.trace("decrementing references at",
new RuntimeException("This is an expected exception. Is just used to record where the reference count has been decremented."));
}
}
}

}
Loading