Skip to content

Commit

Permalink
Force flush after finishing lambda function. (#1204)
Browse files Browse the repository at this point in the history
* Force span flush after lambda invocation.

* Force flush after finishing lambda function.

* bleh

* Better revert

* Accessor

* OpenTelemetrySdkAccess
  • Loading branch information
Anuraag Agrawal authored Sep 17, 2020
1 parent d952354 commit 9635a5e
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opentelemetry.instrumentation.auto.api;

import java.util.concurrent.TimeUnit;

/**
* A helper to facilitate accessing OpenTelemetry SDK methods from instrumentation. Because
* instrumentation runs in the app classloader, they do not have access to our SDK in the agent
* classloader. So we use this class in the bootstrap classloader to bridge between the two - the
* agent classloader will register implementations of needed SDK functions that can be called from
* instrumentation.
*/
public class OpenTelemetrySdkAccess {

/**
* Interface matching {@link io.opentelemetry.sdk.trace.TracerSdkProvider#forceFlush()} to allow
* holding a reference to it.
*/
public interface ForceFlusher {
/** Executes force flush. */
void run(int timeout, TimeUnit unit);
}

private static volatile ForceFlusher FORCE_FLUSH;

/** Forces flush of pending spans and metrics. */
public static void forceFlush(int timeout, TimeUnit unit) {
FORCE_FLUSH.run(timeout, unit);
}

/**
* Sets the {@link Runnable} to execute when instrumentation needs to force flush. This is called
* from the agent classloader to execute the SDK's force flush mechanism. Instrumentation must not
* call this.
*/
public static void internalSetForceFlush(ForceFlusher forceFlush) {
if (FORCE_FLUSH != null) {
// Only possible by misuse of this API, just ignore.
return;
}
FORCE_FLUSH = forceFlush;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.auto.api.OpenTelemetrySdkAccess;
import io.opentelemetry.javaagent.tooling.Instrumenter;
import io.opentelemetry.trace.Span;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -81,6 +83,7 @@ public static void stopSpan(
} else {
TRACER.end(span);
}
OpenTelemetrySdkAccess.forceFlush(1, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import io.opentelemetry.instrumentation.awslambda.v1_0.AbstractAwsLambdaTest

class AwsLambdaTest extends AbstractAwsLambdaTest implements AgentTestTrait {

def cleanup() {
assert testWriter.forceFlushCalled()
}

class TestRequestHandler implements RequestHandler<String, String> {
@Override
String handleRequest(String input, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ apply from: "$rootDir/gradle/instrumentation-library.gradle"

dependencies {
library group: 'com.amazonaws', name: 'aws-lambda-java-core', version: '1.0.0'
compileOnly deps.opentelemetrySdk

testImplementation project(':instrumentation:aws-lambda-1.0:testing')
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import java.util.concurrent.TimeUnit;

/**
* A base class similar to {@link RequestHandler} but will automatically trace invocations of {@link
Expand Down Expand Up @@ -65,6 +67,7 @@ public final O handleRequest(I input, Context context) {
} else {
tracer.end(span);
}
OpenTelemetrySdk.getTracerProvider().forceFlush().join(1, TimeUnit.SECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import io.opentelemetry.auto.test.InstrumentationTestTrait

class AwsLambdaTest extends AbstractAwsLambdaTest implements InstrumentationTestTrait {

def cleanup() {
assert testWriter.forceFlushCalled()
}

class TestRequestHandler extends TracingRequestHandler<String, String> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@

import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.auto.api.OpenTelemetrySdkAccess;
import io.opentelemetry.instrumentation.auto.api.OpenTelemetrySdkAccess.ForceFlusher;
import io.opentelemetry.instrumentation.auto.api.SafeServiceLoader;
import io.opentelemetry.javaagent.tooling.context.FieldBackedProvider;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.lang.instrument.Instrumentation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.agent.builder.ResettableClassFileTransformer;
import net.bytebuddy.description.type.TypeDefinition;
Expand Down Expand Up @@ -95,6 +99,14 @@ public static ResettableClassFileTransformer installBytebuddyAgent(
Thread.currentThread().setContextClassLoader(savedContextClassLoader);
}

OpenTelemetrySdkAccess.internalSetForceFlush(
new ForceFlusher() {
@Override
public void run(int timeout, TimeUnit unit) {
OpenTelemetrySdk.getTracerProvider().forceFlush().join(timeout, unit);
}
});

INSTRUMENTATION = inst;

addByteBuddyRawSetting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class InMemoryExporter implements SpanProcessor {
private final Map<String, Integer> spanOrders = new ConcurrentHashMap<>();
private final AtomicInteger nextSpanOrder = new AtomicInteger();

private volatile boolean forceFlushCalled;

@Override
public void onStart(ReadWriteSpan readWriteSpan) {
SpanData sd = readWriteSpan.toSpanData();
Expand Down Expand Up @@ -227,6 +229,7 @@ public void clear() {
traces.clear();
spanOrders.clear();
}
forceFlushCalled = false;
}

@Override
Expand All @@ -236,9 +239,14 @@ public CompletableResultCode shutdown() {

@Override
public CompletableResultCode forceFlush() {
forceFlushCalled = true;
return CompletableResultCode.ofSuccess();
}

public boolean forceFlushCalled() {
return forceFlushCalled;
}

// must be called under tracesLock
private void sortTraces() {
Collections.sort(
Expand Down

0 comments on commit 9635a5e

Please sign in to comment.