Skip to content

Commit

Permalink
Extract network monitoring instrumentation (#456)
Browse files Browse the repository at this point in the history
* Extract network monitoring instrumentation

* Apply suggestions from code review

Co-authored-by: jason plumb <[email protected]>

* spotless

---------

Co-authored-by: jason plumb <[email protected]>
  • Loading branch information
Mateusz Rzeszutek and breedx-splk authored Jan 30, 2023
1 parent 984f05f commit f90c8f2
Show file tree
Hide file tree
Showing 33 changed files with 563 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import android.util.Log;
import androidx.annotation.Nullable;
import io.opentelemetry.rum.internal.instrumentation.network.CurrentNetworkProvider;
import java.io.File;
import java.util.Comparator;
import java.util.List;
Expand All @@ -39,7 +40,7 @@ class DiskToZipkinExporter {
static final double DEFAULT_MAX_UNCOMPRESSED_BANDWIDTH = 15.0 * 1024;

private final ScheduledExecutorService threadPool;
private final ConnectionUtil connectionUtil;
private final CurrentNetworkProvider currentNetworkProvider;
private final FileSender fileSender;
private final File spanFilesPath;
private final FileUtils fileUtils;
Expand All @@ -48,7 +49,7 @@ class DiskToZipkinExporter {

DiskToZipkinExporter(Builder builder) {
this.threadPool = builder.threadPool;
this.connectionUtil = requireNonNull(builder.connectionUtil);
this.currentNetworkProvider = requireNonNull(builder.currentNetworkProvider);
this.fileSender = requireNonNull(builder.fileSender);
this.spanFilesPath = requireNonNull(builder.spanFilesPath);
this.fileUtils = builder.fileUtils;
Expand All @@ -72,7 +73,7 @@ void doExportCycle() {
}

private void exportPendingFiles() {
if (!connectionUtil.refreshNetworkStatus().isOnline()) {
if (!currentNetworkProvider.refreshNetworkStatus().isOnline()) {
Log.i(
SplunkRum.LOG_TAG,
"Network offline, leaving spans on disk for for eventual export.");
Expand Down Expand Up @@ -123,7 +124,7 @@ static class Builder {
@Nullable private FileSender fileSender;
@Nullable private BandwidthTracker bandwidthTracker;
private ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
@Nullable private ConnectionUtil connectionUtil;
@Nullable private CurrentNetworkProvider currentNetworkProvider;
@Nullable private File spanFilesPath;
private FileUtils fileUtils = new FileUtils();
private double bandwidthLimit = DEFAULT_MAX_UNCOMPRESSED_BANDWIDTH;
Expand All @@ -133,8 +134,8 @@ Builder threadPool(ScheduledExecutorService threadPool) {
return this;
}

Builder connectionUtil(ConnectionUtil connectionUtil) {
this.connectionUtil = connectionUtil;
Builder connectionUtil(CurrentNetworkProvider currentNetworkProvider) {
this.currentNetworkProvider = currentNetworkProvider;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import android.util.Log;
import androidx.annotation.NonNull;
import io.opentelemetry.rum.internal.instrumentation.network.CurrentNetworkProvider;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
Expand All @@ -30,21 +31,21 @@
class MemoryBufferingExporter implements SpanExporter {
private static final int MAX_BACKLOG_SIZE = 100;

private final ConnectionUtil connectionUtil;
private final CurrentNetworkProvider currentNetworkProvider;
private final SpanExporter delegate;
// note: no need to make this queue thread-safe since it will only ever be called from the
// BatchSpanProcessor worker thread.
private final Queue<SpanData> backlog = new ArrayDeque<>(MAX_BACKLOG_SIZE);

MemoryBufferingExporter(ConnectionUtil connectionUtil, SpanExporter delegate) {
this.connectionUtil = connectionUtil;
MemoryBufferingExporter(CurrentNetworkProvider currentNetworkProvider, SpanExporter delegate) {
this.currentNetworkProvider = currentNetworkProvider;
this.delegate = delegate;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
backlog.addAll(spans);
if (!connectionUtil.refreshNetworkStatus().isOnline()) {
if (!currentNetworkProvider.refreshNetworkStatus().isOnline()) {
Log.i(
SplunkRum.LOG_TAG,
"Network offline, buffering " + spans.size() + " spans for eventual export.");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@
import io.opentelemetry.rum.internal.OpenTelemetryRumBuilder;
import io.opentelemetry.rum.internal.instrumentation.anr.AnrDetector;
import io.opentelemetry.rum.internal.instrumentation.crash.CrashReporter;
import io.opentelemetry.rum.internal.instrumentation.network.CurrentNetworkProvider;
import io.opentelemetry.rum.internal.instrumentation.network.NetworkAttributesSpanAppender;
import io.opentelemetry.rum.internal.instrumentation.network.NetworkChangeMonitor;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.resources.ResourceBuilder;
import io.opentelemetry.sdk.trace.SpanLimits;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
Expand All @@ -62,6 +66,7 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
import zipkin2.reporter.Sender;
Expand All @@ -87,7 +92,9 @@ class RumInitializer {
this.timingClock = startupTimer.startupClock;
}

SplunkRum initialize(ConnectionUtil.Factory connectionUtilFactory, Looper mainLooper) {
SplunkRum initialize(
Function<Application, CurrentNetworkProvider> currentNetworkProviderFactory,
Looper mainLooper) {
VisibleScreenTracker visibleScreenTracker = new VisibleScreenTracker();

long startTimeNanos = timingClock.now();
Expand All @@ -97,23 +104,24 @@ SplunkRum initialize(ConnectionUtil.Factory connectionUtilFactory, Looper mainLo
initializationEvents.add(
new RumInitializer.InitializationEvent("resourceInitialized", timingClock.now()));

ConnectionUtil connectionUtil = connectionUtilFactory.createAndStart(application);
CurrentNetworkProvider currentNetworkProvider =
currentNetworkProviderFactory.apply(application);
initializationEvents.add(
new InitializationEvent("connectionUtilInitialized", timingClock.now()));

GlobalAttributesSpanAppender globalAttributesSpanAppender =
GlobalAttributesSpanAppender.create(builder.globalAttributes);
otelRumBuilder.addTracerProviderCustomizer(
(tracerProviderBuilder, app) -> {
NetworkAttributesAppender networkAttributesAppender =
new NetworkAttributesAppender(connectionUtil);
SpanProcessor networkAttributesSpanAppender =
NetworkAttributesSpanAppender.create(currentNetworkProvider);
ScreenAttributesAppender screenAttributesAppender =
new ScreenAttributesAppender(visibleScreenTracker);
initializationEvents.add(
new RumInitializer.InitializationEvent(
"attributeAppenderInitialized", timingClock.now()));

SpanExporter zipkinExporter = buildFilteringExporter(connectionUtil);
SpanExporter zipkinExporter = buildFilteringExporter(currentNetworkProvider);
initializationEvents.add(
new RumInitializer.InitializationEvent(
"exporterInitialized", timingClock.now()));
Expand All @@ -126,7 +134,7 @@ SplunkRum initialize(ConnectionUtil.Factory connectionUtilFactory, Looper mainLo

tracerProviderBuilder
.addSpanProcessor(globalAttributesSpanAppender)
.addSpanProcessor(networkAttributesAppender)
.addSpanProcessor(networkAttributesSpanAppender)
.addSpanProcessor(screenAttributesAppender)
.addSpanProcessor(batchSpanProcessor)
.setSpanLimits(
Expand Down Expand Up @@ -175,12 +183,8 @@ SplunkRum initialize(ConnectionUtil.Factory connectionUtilFactory, Looper mainLo
if (builder.networkMonitorEnabled) {
otelRumBuilder.addInstrumentation(
instrumentedApplication -> {
NetworkMonitor networkMonitor = new NetworkMonitor(connectionUtil);
networkMonitor.addConnectivityListener(
instrumentedApplication
.getOpenTelemetrySdk()
.getTracer(SplunkRum.RUM_TRACER_NAME));
instrumentedApplication.registerApplicationStateListener(networkMonitor);
NetworkChangeMonitor.create(currentNetworkProvider)
.installOn(instrumentedApplication);
initializationEvents.add(
new RumInitializer.InitializationEvent(
"networkMonitorInitialized", timingClock.now()));
Expand Down Expand Up @@ -336,8 +340,8 @@ private Resource buildResource(String applicationName, String rumVersion) {
}

// visible for testing
SpanExporter buildFilteringExporter(ConnectionUtil connectionUtil) {
SpanExporter exporter = buildExporter(connectionUtil);
SpanExporter buildFilteringExporter(CurrentNetworkProvider currentNetworkProvider) {
SpanExporter exporter = buildExporter(currentNetworkProvider);
SpanExporter splunkTranslatedExporter =
new SplunkSpanDataModifier(exporter, builder.reactNativeSupportEnabled);
SpanExporter filteredExporter = builder.decorateWithSpanFilter(splunkTranslatedExporter);
Expand All @@ -346,7 +350,7 @@ SpanExporter buildFilteringExporter(ConnectionUtil connectionUtil) {
return filteredExporter;
}

private SpanExporter buildExporter(ConnectionUtil connectionUtil) {
private SpanExporter buildExporter(CurrentNetworkProvider currentNetworkProvider) {
if (builder.debugEnabled) {
// tell the Zipkin exporter to shut up already. We're on mobile, network stuff happens.
// we'll do our best to hang on to the spans with the wrapping BufferingExporter.
Expand All @@ -356,13 +360,14 @@ private SpanExporter buildExporter(ConnectionUtil connectionUtil) {
}

if (builder.diskBufferingEnabled) {
return buildStorageBufferingExporter(connectionUtil);
return buildStorageBufferingExporter(currentNetworkProvider);
}

return buildMemoryBufferingThrottledExporter(connectionUtil);
return buildMemoryBufferingThrottledExporter(currentNetworkProvider);
}

private SpanExporter buildStorageBufferingExporter(ConnectionUtil connectionUtil) {
private SpanExporter buildStorageBufferingExporter(
CurrentNetworkProvider currentNetworkProvider) {
Sender sender = OkHttpSender.newBuilder().endpoint(getEndpoint()).build();
File spanFilesPath = FileUtils.getSpansDirectory(application);
BandwidthTracker bandwidthTracker = new BandwidthTracker();
Expand All @@ -371,7 +376,7 @@ private SpanExporter buildStorageBufferingExporter(ConnectionUtil connectionUtil
FileSender.builder().sender(sender).bandwidthTracker(bandwidthTracker).build();
DiskToZipkinExporter diskToZipkinExporter =
DiskToZipkinExporter.builder()
.connectionUtil(connectionUtil)
.connectionUtil(currentNetworkProvider)
.fileSender(fileSender)
.bandwidthTracker(bandwidthTracker)
.spanFilesPath(spanFilesPath)
Expand All @@ -386,11 +391,12 @@ private String getEndpoint() {
return builder.beaconEndpoint + "?auth=" + builder.rumAccessToken;
}

private SpanExporter buildMemoryBufferingThrottledExporter(ConnectionUtil connectionUtil) {
private SpanExporter buildMemoryBufferingThrottledExporter(
CurrentNetworkProvider currentNetworkProvider) {
String endpoint = getEndpoint();
SpanExporter zipkinSpanExporter = getCoreSpanExporter(endpoint);
return ThrottlingExporter.newBuilder(
new MemoryBufferingExporter(connectionUtil, zipkinSpanExporter))
new MemoryBufferingExporter(currentNetworkProvider, zipkinSpanExporter))
.categorizeByAttribute(SplunkRum.COMPONENT_KEY)
.maxSpansInWindow(100)
.windowSize(Duration.ofSeconds(30))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import io.opentelemetry.instrumentation.okhttp.v3_0.OkHttpTelemetry;
import io.opentelemetry.rum.internal.GlobalAttributesSpanAppender;
import io.opentelemetry.rum.internal.OpenTelemetryRum;
import io.opentelemetry.rum.internal.instrumentation.network.CurrentNetworkProvider;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import okhttp3.Call;
import okhttp3.OkHttpClient;

Expand Down Expand Up @@ -100,15 +102,15 @@ public static SplunkRumBuilder builder() {
static SplunkRum initialize(
SplunkRumBuilder builder,
Application application,
ConnectionUtil.Factory connectionUtilFactory) {
Function<Application, CurrentNetworkProvider> currentNetworkProviderFactory) {
if (INSTANCE != null) {
Log.w(LOG_TAG, "Singleton SplunkRum instance has already been initialized.");
return INSTANCE;
}

INSTANCE =
new RumInitializer(builder, application, startupTimer)
.initialize(connectionUtilFactory, Looper.getMainLooper());
.initialize(currentNetworkProviderFactory, Looper.getMainLooper());

if (builder.debugEnabled) {
Log.i(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import android.util.Log;
import androidx.annotation.Nullable;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.rum.internal.instrumentation.network.CurrentNetworkProvider;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.function.Consumer;
Expand Down Expand Up @@ -314,7 +315,7 @@ public SplunkRum build(Application application) {
throw new IllegalStateException(
"You must provide a rumAccessToken, a realm (or full beaconEndpoint), and an applicationName to create a valid Config instance.");
}
return SplunkRum.initialize(this, application, new ConnectionUtil.Factory());
return SplunkRum.initialize(this, application, CurrentNetworkProvider::createAndStart);
}

SpanExporter decorateWithSpanFilter(SpanExporter exporter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.splunk.rum;
package io.opentelemetry.rum.internal.instrumentation.network;

import android.os.Build;
import android.telephony.TelephonyManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.splunk.rum;
package io.opentelemetry.rum.internal.instrumentation.network;

import android.os.Build;
import android.telephony.TelephonyManager;
Expand Down
Loading

0 comments on commit f90c8f2

Please sign in to comment.