Skip to content

Commit

Permalink
Defer remote components to avoid OkHttp class-loading side-effects (#…
Browse files Browse the repository at this point in the history
…8131)

* Additional logging to help triage custom log-manager / jmx-builder test failures.
* Move SharedCommunicationObjects creation to InstallDatadogTracerCallback constructor
* Add ability to pause remote components until SharedCommunicationObjects is ready
* Delay starting trace-writer and data-stream-monitoring until remote I/O is allowed
* Delay starting logs-intake backend until remote I/O is allowed
* Install tracer and logs-intake as soon as necessary, but defer any remote components when use of OkHttp should be delayed
* Remove exemption where we didn't defer if the custom logging manager
or JMX builder was on the system classpath (because the main thread
would find it there if OkHttp triggered initialization of JUL.).
  We now make OkHttp calls from our own background threads, which are
isolated from the system classloader, not the main thread - so this
exemption no longer makes sense.
* Adjust minimumBranchCoverage to account for new double-checked lock in SharedCommunicationObjects
  • Loading branch information
mcculls authored Dec 30, 2024
1 parent 5b1d1e4 commit d6e2c2e
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 138 deletions.
2 changes: 1 addition & 1 deletion communication/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies {
}

ext {
minimumBranchCoverage = 0.6
minimumBranchCoverage = 0.5
minimumInstructionCoverage = 0.8
excludedClassesCoverage = [
'datadog.communication.ddagent.ExternalAgentLauncher',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.remoteconfig.DefaultConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import okhttp3.HttpUrl;
Expand All @@ -21,12 +23,23 @@
public class SharedCommunicationObjects {
private static final Logger log = LoggerFactory.getLogger(SharedCommunicationObjects.class);

private final List<Runnable> pausedComponents = new ArrayList<>();
private volatile boolean paused;

public OkHttpClient okHttpClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private DDAgentFeaturesDiscovery featuresDiscovery;
private ConfigurationPoller configurationPoller;

public SharedCommunicationObjects() {
this(false);
}

public SharedCommunicationObjects(boolean paused) {
this.paused = paused;
}

public void createRemaining(Config config) {
if (monitoring == null) {
monitoring = Monitoring.DISABLED;
Expand All @@ -46,6 +59,32 @@ public void createRemaining(Config config) {
}
}

public void whenReady(Runnable callback) {
if (paused) {
synchronized (pausedComponents) {
if (paused) {
pausedComponents.add(callback);
return;
}
}
}
callback.run(); // not paused, run immediately
}

public void resume() {
paused = false;
synchronized (pausedComponents) {
for (Runnable callback : pausedComponents) {
try {
callback.run();
} catch (Throwable e) {
log.warn("Problem resuming remote component {}", callback, e);
}
}
pausedComponents.clear();
}
}

private static HttpUrl parseAgentUrl(Config config) {
String agentUrl = config.getAgentUrl();
if (agentUrl.startsWith("unix:")) {
Expand Down Expand Up @@ -100,11 +139,16 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread

if (paused) {
// defer remote discovery until remote I/O is allowed
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
}
}
}
return featuresDiscovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.PROFILER_STARTUP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_STARTUP;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static datadog.trace.util.Strings.getResourceName;
import static datadog.trace.util.Strings.propertyNameToSystemPropertyName;
import static datadog.trace.util.Strings.toEnvVar;

Expand Down Expand Up @@ -348,7 +347,7 @@ public void run() {
* logging facility. Likewise on IBM JDKs OkHttp may indirectly load 'IBMSASL' which in turn loads LogManager.
*/
InstallDatadogTracerCallback installDatadogTracerCallback =
new InstallDatadogTracerCallback(initTelemetry, inst);
new InstallDatadogTracerCallback(initTelemetry, inst, delayOkHttp);
if (delayOkHttp) {
log.debug("Custom logger detected. Delaying Datadog Tracer initialization.");
registerLogManagerCallback(installDatadogTracerCallback);
Expand Down Expand Up @@ -497,28 +496,21 @@ public void execute() {
}

protected static class InstallDatadogTracerCallback extends ClassLoadCallBack {
private final InitializationTelemetry initTelemetry;
private final Instrumentation instrumentation;
private final Object sco;
private final Class<?> scoClass;
private final boolean delayOkHttp;

public InstallDatadogTracerCallback(
InitializationTelemetry initTelemetry, Instrumentation instrumentation) {
this.initTelemetry = initTelemetry;
InitializationTelemetry initTelemetry,
Instrumentation instrumentation,
boolean delayOkHttp) {
this.delayOkHttp = delayOkHttp;
this.instrumentation = instrumentation;
}

@Override
public AgentThread agentThread() {
return TRACE_STARTUP;
}

@Override
public void execute() {
Object sco;
Class<?> scoClass;
try {
scoClass =
AGENT_CLASSLOADER.loadClass("datadog.communication.ddagent.SharedCommunicationObjects");
sco = scoClass.getConstructor().newInstance();
sco = scoClass.getConstructor(boolean.class).newInstance(delayOkHttp);
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
Expand All @@ -528,10 +520,23 @@ public void execute() {
}

installDatadogTracer(initTelemetry, scoClass, sco);
maybeInstallLogsIntake(scoClass, sco);
}

@Override
public AgentThread agentThread() {
return TRACE_STARTUP;
}

@Override
public void execute() {
if (delayOkHttp) {
resumeRemoteComponents();
}

maybeStartAppSec(scoClass, sco);
maybeStartIast(instrumentation, scoClass, sco);
maybeStartCiVisibility(instrumentation, scoClass, sco);
maybeStartLogsIntake(scoClass, sco);
// start debugger before remote config to subscribe to it before starting to poll
maybeStartDebugger(instrumentation, scoClass, sco);
maybeStartRemoteConfig(scoClass, sco);
Expand All @@ -540,6 +545,18 @@ public void execute() {
startTelemetry(instrumentation, scoClass, sco);
}
}

private void resumeRemoteComponents() {
try {
// remote components were paused for custom log-manager/jmx-builder
// add small delay before resuming remote I/O to help stabilization
Thread.sleep(1_000);
scoClass.getMethod("resume").invoke(sco);
} catch (InterruptedException ignore) {
} catch (Throwable e) {
log.error("Error resuming remote components", e);
}
}
}

protected static class StartProfilingAgentCallback extends ClassLoadCallBack {
Expand Down Expand Up @@ -866,17 +883,18 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
}
}

private static void maybeStartLogsIntake(Class<?> scoClass, Object sco) {
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
if (agentlessLogSubmissionEnabled) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod = logsIntakeSystemClass.getMethod("start", scoClass);
final Method logsIntakeInstallerMethod =
logsIntakeSystemClass.getMethod("install", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
} catch (final Throwable e) {
log.warn("Not starting Logs Intake subsystem", e);
log.warn("Not installing Logs Intake subsystem", e);
}

StaticEventLogger.end("Logs Intake");
Expand Down Expand Up @@ -1267,14 +1285,8 @@ private static boolean isAppUsingCustomLogManager(final EnumSet<Library> librari

final String logManagerProp = System.getProperty("java.util.logging.manager");
if (logManagerProp != null) {
final boolean onSysClasspath =
ClassLoader.getSystemResource(getResourceName(logManagerProp)) != null;
log.debug("Prop - logging.manager: {}", logManagerProp);
log.debug("logging.manager on system classpath: {}", onSysClasspath);
// Some applications set java.util.logging.manager but never actually initialize the logger.
// Check to see if the configured manager is on the system classpath.
// If so, it should be safe to initialize jmxfetch which will setup the log manager.
return !onSysClasspath;
return true;
}

return false;
Expand Down Expand Up @@ -1305,14 +1317,8 @@ private static boolean isAppUsingCustomJMXBuilder(final EnumSet<Library> librari

final String jmxBuilderProp = System.getProperty("javax.management.builder.initial");
if (jmxBuilderProp != null) {
final boolean onSysClasspath =
ClassLoader.getSystemResource(getResourceName(jmxBuilderProp)) != null;
log.debug("Prop - javax.management.builder.initial: {}", jmxBuilderProp);
log.debug("javax.management.builder.initial on system classpath: {}", onSysClasspath);
// Some applications set javax.management.builder.initial but never actually initialize JMX.
// Check to see if the configured JMX builder is on the system classpath.
// If so, it should be safe to initialize jmxfetch which will setup JMX.
return !onSysClasspath;
return true;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.logging.intake;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
Expand All @@ -12,18 +11,16 @@ public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void start(SharedCommunicationObjects sco) {
public static void install(SharedCommunicationObjects sco) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
BackendApi backendApi = apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS);
LogsDispatcher dispatcher = new LogsDispatcher(backendApi);
LogsWriterImpl writer = new LogsWriterImpl(config, dispatcher);
writer.start();
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
sco.whenReady(writer::start);

LogsIntake.registerWriter(writer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.BackendApiFactory.Intake;
import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsWriter;
import datadog.trace.util.AgentThreadFactory;
Expand All @@ -23,12 +26,12 @@ public class LogsWriterImpl implements LogsWriter {
private static final int ENQUEUE_LOG_TIMEOUT_MILLIS = 1_000;

private final Map<String, Object> commonTags;
private final LogsDispatcher logsDispatcher;
private final BackendApiFactory apiFactory;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, LogsDispatcher logsDispatcher) {
this.logsDispatcher = logsDispatcher;
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
this.apiFactory = apiFactory;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
Expand Down Expand Up @@ -84,6 +87,9 @@ public void log(Map<String, Object> message) {
}

private void logPollingLoop() {
BackendApi backendApi = apiFactory.createBackendApi(Intake.LOGS);
LogsDispatcher logsDispatcher = new LogsDispatcher(backendApi);

while (!Thread.currentThread().isInterrupted()) {
try {
List<Map<String, Object>> batch = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CustomLogManagerTest extends Specification {
, true) == 0
}

def "agent services starts up in premain if configured log manager on system classpath"() {
def "agent services startup is delayed even if configured log manager on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(LogManagerSetter.getName()
, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CustomMBeanServerBuilderTest extends Specification {
, true) == 0
}

def "JMXFetch starts up in premain if configured MBeanServerBuilder on system classpath"() {
def "JMXFetch startup is delayed even if configured MBeanServerBuilder on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(MBeanServerBuilderSetter.getName()
, [
Expand Down
Loading

0 comments on commit d6e2c2e

Please sign in to comment.