Skip to content

Commit

Permalink
Prevent ANR during SDK initialization
Browse files Browse the repository at this point in the history
Description
When initializing the OpenTelemetry Android SDK with disk buffering enabled, we
discovered that synchronous disk space checks were causing ANRs in production.
These checks occur during the creation of disk buffering exporters, specifically
in `DiskManager.getMaxFolderSize()`, which makes blocking IPC calls through
`StorageManager.getAllocatableBytes()` on the main thread. The issue manifests
in the following ANR stacktrace:
```
android.os.BinderProxy.transact (BinderProxy.java:662)
android.os.storage.IStorageManager$Stub$Proxy.getAllocatableBytes (IStorageManager.java:2837)
android.os.storage.StorageManager.getAllocatableBytes (StorageManager.java:2414)
android.os.storage.StorageManager.getAllocatableBytes (StorageManager.java:2404)
io.opentelemetry.android.internal.services.CacheStorage.getAvailableSpace (CacheStorage.java:66)
io.opentelemetry.android.internal.services.CacheStorage.ensureCacheSpaceAvailable (CacheStorage.java:50)
io.opentelemetry.android.internal.features.persistence.DiskManager.getMaxFolderSize (DiskManager.kt:58)
io.opentelemetry.android.OpenTelemetryRumBuilder.createStorageConfiguration (OpenTelemetryRumBuilder.java:338)
io.opentelemetry.android.OpenTelemetryRumBuilder.build (OpenTelemetryRumBuilder.java:286)
```

Our Solution
To fix this we moved initialization to run on a background executor and
buffer the data in memory until it completes.

The process works like this:
1. Initialize the SDK with `BufferDelegatingExporter` instances that can
   immediately accept telemetry data.
2. Move exporter initialization off the main thread.
3. Once async initialization completes, flush buffered signals to initialized
   exporters and delegate all future signals.

The primary goal of this solution is to be unobtrusive and prevent ANRs caused
by initialization of disk exporters, while preventing signals from being
dropped.

Testing
We have added unit tests to cover the buffering, delevation, and RUM
building. We've also verified this with both disk enabled and disk
disabled.
  • Loading branch information
ansman committed Nov 22, 2024
1 parent c10adba commit 6f1e90f
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import android.app.Application;
import android.util.Log;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import io.opentelemetry.android.common.RumConstants;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.export.BufferDelegatingLogExporter;
import io.opentelemetry.android.export.BufferDelegatingSpanExporter;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.features.diskbuffering.SignalFromDiskExporter;
import io.opentelemetry.android.features.diskbuffering.scheduler.DefaultExportScheduleHandler;
Expand Down Expand Up @@ -63,7 +66,6 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import kotlin.jvm.functions.Function0;

/**
Expand Down Expand Up @@ -94,8 +96,13 @@ public final class OpenTelemetryRumBuilder {

private Resource resource;

@Nullable private ServiceManager serviceManager;
@Nullable private ExportScheduleHandler exportScheduleHandler;
private final Object lock = new Object();

// Writes guarded by "lock"
@Nullable private volatile ServiceManager serviceManager;

// Writes guarded by "lock"
@Nullable private volatile ExportScheduleHandler exportScheduleHandler;

private static TextMapPropagator buildDefaultPropagator() {
return TextMapPropagator.composite(
Expand Down Expand Up @@ -279,6 +286,56 @@ public OpenTelemetryRum build() {
InitializationEvents initializationEvents = InitializationEvents.get();
applyConfiguration(initializationEvents);

BufferDelegatingLogExporter bufferDelegatingLogExporter = new BufferDelegatingLogExporter();

BufferDelegatingSpanExporter bufferDelegatingSpanExporter =
new BufferDelegatingSpanExporter();

SessionManager sessionManager =
SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos());

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
.setTracerProvider(
buildTracerProvider(
sessionManager, application, bufferDelegatingSpanExporter))
.setLoggerProvider(
buildLoggerProvider(
sessionManager, application, bufferDelegatingLogExporter))
.setMeterProvider(buildMeterProvider(application))
.setPropagators(buildFinalPropagators())
.build();

otelSdkReadyListeners.forEach(listener -> listener.accept(sdk));

SdkPreconfiguredRumBuilder delegate =
new SdkPreconfiguredRumBuilder(
application,
sdk,
timeoutHandler,
sessionManager,
config.shouldDiscoverInstrumentations(),
getServiceManager());

// AsyncTask is deprecated but the thread pool is still used all over the Android SDK
// and it provides a way to get a background thread without having to create a new one.
android.os.AsyncTask.THREAD_POOL_EXECUTOR.execute(
() ->
initializeExporters(
initializationEvents,
bufferDelegatingSpanExporter,
bufferDelegatingLogExporter));

instrumentations.forEach(delegate::addInstrumentation);

return delegate.build();
}

private void initializeExporters(
InitializationEvents initializationEvents,
BufferDelegatingSpanExporter bufferDelegatingSpanExporter,
BufferDelegatingLogExporter bufferedDelegatingLogExporter) {

DiskBufferingConfiguration diskBufferingConfiguration =
config.getDiskBufferingConfiguration();
SpanExporter spanExporter = buildSpanExporter();
Expand Down Expand Up @@ -306,45 +363,31 @@ public OpenTelemetryRum build() {
}
initializationEvents.spanExporterInitialized(spanExporter);

SessionManager sessionManager =
SessionManager.create(timeoutHandler, config.getSessionTimeout().toNanos());
bufferedDelegatingLogExporter.setDelegate(logsExporter);

OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder()
.setTracerProvider(
buildTracerProvider(sessionManager, application, spanExporter))
.setLoggerProvider(
buildLoggerProvider(sessionManager, application, logsExporter))
.setMeterProvider(buildMeterProvider(application))
.setPropagators(buildFinalPropagators())
.build();

otelSdkReadyListeners.forEach(listener -> listener.accept(sdk));
bufferDelegatingSpanExporter.setDelegate(spanExporter);

scheduleDiskTelemetryReader(signalFromDiskExporter);

SdkPreconfiguredRumBuilder delegate =
new SdkPreconfiguredRumBuilder(
application,
sdk,
timeoutHandler,
sessionManager,
config.shouldDiscoverInstrumentations(),
getServiceManager());
instrumentations.forEach(delegate::addInstrumentation);
return delegate.build();
}

@NonNull
private ServiceManager getServiceManager() {
if (serviceManager == null) {
serviceManager = ServiceManagerImpl.Companion.create(application);
synchronized (lock) {
if (serviceManager == null) {
serviceManager = ServiceManagerImpl.Companion.create(application);
}
}
}
return serviceManager;
// This can never be null since we never write `null` to it
return requireNonNull(serviceManager);
}

public OpenTelemetryRumBuilder setServiceManager(ServiceManager serviceManager) {
this.serviceManager = serviceManager;
public OpenTelemetryRumBuilder setServiceManager(@NonNull ServiceManager serviceManager) {
requireNonNull(serviceManager, "serviceManager cannot be null");
synchronized (lock) {
this.serviceManager = serviceManager;
}
return this;
}

Expand All @@ -353,8 +396,11 @@ public OpenTelemetryRumBuilder setServiceManager(ServiceManager serviceManager)
* If not specified, the default schedule exporter will be used.
*/
public OpenTelemetryRumBuilder setExportScheduleHandler(
ExportScheduleHandler exportScheduleHandler) {
this.exportScheduleHandler = exportScheduleHandler;
@NonNull ExportScheduleHandler exportScheduleHandler) {
requireNonNull(exportScheduleHandler, "exportScheduleHandler cannot be null");
synchronized (lock) {
this.exportScheduleHandler = exportScheduleHandler;
}
return this;
}

Expand All @@ -376,17 +422,24 @@ private StorageConfiguration createStorageConfiguration() throws IOException {
}

private void scheduleDiskTelemetryReader(@Nullable SignalFromDiskExporter signalExporter) {

if (exportScheduleHandler == null) {
ServiceManager serviceManager = getServiceManager();
// TODO: Is it safe to get the work service yet here? If so, we can
// avoid all this lazy supplier stuff....
Function0<PeriodicWorkService> getWorkService = serviceManager::getPeriodicWorkService;
exportScheduleHandler =
new DefaultExportScheduleHandler(
new DefaultExportScheduler(getWorkService), getWorkService);
synchronized (lock) {
if (exportScheduleHandler == null) {
ServiceManager serviceManager = getServiceManager();
// TODO: Is it safe to get the work service yet here? If so, we can
// avoid all this lazy supplier stuff....
Function0<PeriodicWorkService> getWorkService =
serviceManager::getPeriodicWorkService;
exportScheduleHandler =
new DefaultExportScheduleHandler(
new DefaultExportScheduler(getWorkService), getWorkService);
}
}
}

final ExportScheduleHandler exportScheduleHandler =
requireNonNull(this.exportScheduleHandler);

if (signalExporter == null) {
// Disabling here allows to cancel previously scheduled exports using tools that
// can run even after the app has been terminated (such as WorkManager).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.export

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.logs.data.LogRecordData
import io.opentelemetry.sdk.logs.export.LogRecordExporter

/**
* An in-memory buffer delegating log exporter that buffers log records in memory until a delegate is set.
* Once a delegate is set, the buffered log records are exported to the delegate.
*
* The buffer size is set to 5,000 log entries by default. If the buffer is full, the exporter will drop new log records.
*/
internal class BufferDelegatingLogExporter(
maxBufferedLogs: Int = 5_000,
) : BufferedDelegatingExporter<LogRecordData, LogRecordExporter>(bufferedSignals = maxBufferedLogs),
LogRecordExporter {
override fun exportToDelegate(
delegate: LogRecordExporter,
data: Collection<LogRecordData>,
): CompletableResultCode = delegate.export(data)

override fun shutdownDelegate(delegate: LogRecordExporter): CompletableResultCode = delegate.shutdown()

override fun export(logs: Collection<LogRecordData>): CompletableResultCode = bufferOrDelegate(logs)

override fun flush(): CompletableResultCode =
withDelegateOrNull { delegate ->
delegate?.flush() ?: CompletableResultCode.ofSuccess()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.export

import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.sdk.trace.export.SpanExporter

/**
* An in-memory buffer delegating span exporter that buffers span data in memory until a delegate is set.
* Once a delegate is set, the buffered span data is exported to the delegate.
*
* The buffer size is set to 5,000 spans by default. If the buffer is full, the exporter will drop new span data.
*/
internal class BufferDelegatingSpanExporter(
maxBufferedSpans: Int = 5_000,
) : BufferedDelegatingExporter<SpanData, SpanExporter>(bufferedSignals = maxBufferedSpans),
SpanExporter {
override fun exportToDelegate(
delegate: SpanExporter,
data: Collection<SpanData>,
): CompletableResultCode = delegate.export(data)

override fun shutdownDelegate(delegate: SpanExporter): CompletableResultCode = delegate.shutdown()

override fun export(spans: Collection<SpanData>): CompletableResultCode = bufferOrDelegate(spans)

override fun flush(): CompletableResultCode =
withDelegateOrNull { delegate ->
delegate?.flush() ?: CompletableResultCode.ofSuccess()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.export

import io.opentelemetry.sdk.common.CompletableResultCode
import java.util.concurrent.atomic.AtomicBoolean

/**
* An in-memory buffer delegating signal exporter that buffers signal in memory until a delegate is set.
* Once a delegate is set, the buffered signals are exported to the delegate.
*
* The buffer size is set to 5,000 by default. If the buffer is full, the exporter will drop new signals.
*/
internal abstract class BufferedDelegatingExporter<T, D>(private val bufferedSignals: Int = 5_000) {
@Volatile
private var delegate: D? = null
private val buffer = arrayListOf<T>()
private val lock = Any()
private var isShutDown = AtomicBoolean(false)

/**
* Sets the delegate for this exporter and flushes the buffer to the delegate.
*
* If the delegate has already been set, an [IllegalStateException] will be thrown.
* If this exporter has been shut down, the delegate will be shut down immediately.
*
* @param delegate the delegate to set
*
* @throws IllegalStateException if a delegate has already been set
*/
fun setDelegate(delegate: D) {
synchronized(lock) {
check(this.delegate == null) { "Exporter delegate has already been set." }

flushToDelegate(delegate)

this.delegate = delegate

if (isShutDown.get()) {
shutdownDelegate(delegate)
}
}
}

/**
* Buffers the given data if the delegate has not been set, otherwise exports the data to the delegate.
*
* @param data the data to buffer or export
*/
protected fun bufferOrDelegate(data: Collection<T>): CompletableResultCode =
withDelegateOrNull {
if (it != null) {
exportToDelegate(it, data)
} else {
val amountToTake = bufferedSignals - buffer.size
buffer.addAll(data.take(amountToTake))
CompletableResultCode.ofSuccess()
}
}

/**
* Executes the given block with the delegate if it has been set, otherwise executes the block with a null delegate.
*
* @param block the block to execute
*/
protected fun <R> withDelegateOrNull(block: (D?) -> R): R {
delegate?.let { return block(it) }
return synchronized(lock) { block(delegate) }
}

open fun shutdown(): CompletableResultCode = bufferedShutDown()

protected abstract fun exportToDelegate(
delegate: D,
data: Collection<T>,
): CompletableResultCode

protected abstract fun shutdownDelegate(delegate: D): CompletableResultCode

private fun flushToDelegate(delegate: D) {
exportToDelegate(delegate, buffer)
buffer.clear()
buffer.trimToSize()
}

private fun bufferedShutDown(): CompletableResultCode {
isShutDown.set(true)

return withDelegateOrNull {
if (it != null) {
shutdownDelegate(it)
} else {
CompletableResultCode.ofSuccess()
}
}
}
}
Loading

0 comments on commit 6f1e90f

Please sign in to comment.