Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer remote components to avoid OkHttp class-loading side-effects #8131

Merged
merged 8 commits into from
Dec 30, 2024
2 changes: 1 addition & 1 deletion communication/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies {
}

ext {
minimumBranchCoverage = 0.6
minimumBranchCoverage = 0.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the whole module not testable or it the cost too high?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the double-checked lock that tips things over - attempting to cover all the cases for this piece of code would be very intensive (you'd end up testing the narrow volatile visibility edge.) There are other branches in SharedCommunicationObjects which are not currently tested which is why we're at this boundary to begin with, but adding tests for those unrelated pieces of code in this PR is IMHO confusing for future reviewers.

I could just exclude SharedCommunicationObjects from branch coverage completely, but that feels worse - it's already excluded from instrumentation coverage - so reducing the coverage requirement here by a small amount is the least worst option.

The proper solution is to add tests to increase coverage in a separate PR, as a separate work item.

minimumInstructionCoverage = 0.8
excludedClassesCoverage = [
'datadog.communication.ddagent.ExternalAgentLauncher',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.remoteconfig.DefaultConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import okhttp3.HttpUrl;
Expand All @@ -21,12 +23,23 @@
public class SharedCommunicationObjects {
private static final Logger log = LoggerFactory.getLogger(SharedCommunicationObjects.class);

private final List<Runnable> pausedComponents = new ArrayList<>();
private volatile boolean paused;

public OkHttpClient okHttpClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private DDAgentFeaturesDiscovery featuresDiscovery;
private ConfigurationPoller configurationPoller;

public SharedCommunicationObjects() {
this(false);
}

public SharedCommunicationObjects(boolean paused) {
this.paused = paused;
}

public void createRemaining(Config config) {
if (monitoring == null) {
monitoring = Monitoring.DISABLED;
Expand All @@ -46,6 +59,32 @@ public void createRemaining(Config config) {
}
}

public void whenReady(Runnable callback) {
if (paused) {
synchronized (pausedComponents) {
if (paused) {
pausedComponents.add(callback);
return;
}
}
}
callback.run(); // not paused, run immediately
}

public void resume() {
paused = false;
synchronized (pausedComponents) {
for (Runnable callback : pausedComponents) {
try {
callback.run();
} catch (Throwable e) {
log.warn("Problem resuming remote component {}", callback, e);
}
}
pausedComponents.clear();
}
}

private static HttpUrl parseAgentUrl(Config config) {
String agentUrl = config.getAgentUrl();
if (agentUrl.startsWith("unix:")) {
Expand Down Expand Up @@ -100,11 +139,16 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread

if (paused) {
// defer remote discovery until remote I/O is allowed
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
}
}
}
return featuresDiscovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.PROFILER_STARTUP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_STARTUP;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static datadog.trace.util.Strings.getResourceName;
import static datadog.trace.util.Strings.propertyNameToSystemPropertyName;
import static datadog.trace.util.Strings.toEnvVar;

Expand Down Expand Up @@ -348,7 +347,7 @@ public void run() {
* logging facility. Likewise on IBM JDKs OkHttp may indirectly load 'IBMSASL' which in turn loads LogManager.
*/
InstallDatadogTracerCallback installDatadogTracerCallback =
new InstallDatadogTracerCallback(initTelemetry, inst);
new InstallDatadogTracerCallback(initTelemetry, inst, delayOkHttp);
if (delayOkHttp) {
log.debug("Custom logger detected. Delaying Datadog Tracer initialization.");
registerLogManagerCallback(installDatadogTracerCallback);
Expand Down Expand Up @@ -497,28 +496,21 @@ public void execute() {
}

protected static class InstallDatadogTracerCallback extends ClassLoadCallBack {
private final InitializationTelemetry initTelemetry;
private final Instrumentation instrumentation;
private final Object sco;
private final Class<?> scoClass;
private final boolean delayOkHttp;

public InstallDatadogTracerCallback(
InitializationTelemetry initTelemetry, Instrumentation instrumentation) {
this.initTelemetry = initTelemetry;
InitializationTelemetry initTelemetry,
Instrumentation instrumentation,
boolean delayOkHttp) {
this.delayOkHttp = delayOkHttp;
this.instrumentation = instrumentation;
}

@Override
public AgentThread agentThread() {
return TRACE_STARTUP;
}

@Override
public void execute() {
Object sco;
Class<?> scoClass;
try {
scoClass =
AGENT_CLASSLOADER.loadClass("datadog.communication.ddagent.SharedCommunicationObjects");
sco = scoClass.getConstructor().newInstance();
sco = scoClass.getConstructor(boolean.class).newInstance(delayOkHttp);
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
Expand All @@ -528,10 +520,23 @@ public void execute() {
}

installDatadogTracer(initTelemetry, scoClass, sco);
maybeInstallLogsIntake(scoClass, sco);
}

@Override
public AgentThread agentThread() {
return TRACE_STARTUP;
}

@Override
public void execute() {
if (delayOkHttp) {
resumeRemoteComponents();
}

maybeStartAppSec(scoClass, sco);
maybeStartIast(instrumentation, scoClass, sco);
maybeStartCiVisibility(instrumentation, scoClass, sco);
maybeStartLogsIntake(scoClass, sco);
// start debugger before remote config to subscribe to it before starting to poll
maybeStartDebugger(instrumentation, scoClass, sco);
maybeStartRemoteConfig(scoClass, sco);
Expand All @@ -540,6 +545,18 @@ public void execute() {
startTelemetry(instrumentation, scoClass, sco);
}
}

private void resumeRemoteComponents() {
try {
// remote components were paused for custom log-manager/jmx-builder
// add small delay before resuming remote I/O to help stabilization
Thread.sleep(1_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question: is this an empirical delay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - having no delay or a delay of a few 100ms caused issues because the custom log manager might not have finished its setup by the time the delay expires. The resulting JUL class-loading triggered from this thread via OkHttp could then lead to the logging setup not being complete, which can in turn lead to missing logging.

There's no good class-loader signal to tell us the custom log manager is ready, only when JUL has been touched (and if we try to be too smart then we might miss the signal, which would lead to us never resuming remote I/O)

So this is an empirical practical solution which stabilizes the test locally and on CI. That's also why it's important to install the tracer (and logs-intake collector) early so they can capture data while holding back the remote I/O. That way we avoid triggering JUL issues while not losing trace or log data.

Also note this code path is only used for very specific setups, specifically Java 8 with JFR or IBM JDKs.

scoClass.getMethod("resume").invoke(sco);
} catch (InterruptedException ignore) {
} catch (Throwable e) {
log.error("Error resuming remote components", e);
}
}
}

protected static class StartProfilingAgentCallback extends ClassLoadCallBack {
Expand Down Expand Up @@ -866,17 +883,18 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
}
}

private static void maybeStartLogsIntake(Class<?> scoClass, Object sco) {
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
if (agentlessLogSubmissionEnabled) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod = logsIntakeSystemClass.getMethod("start", scoClass);
final Method logsIntakeInstallerMethod =
logsIntakeSystemClass.getMethod("install", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
} catch (final Throwable e) {
log.warn("Not starting Logs Intake subsystem", e);
log.warn("Not installing Logs Intake subsystem", e);
}

StaticEventLogger.end("Logs Intake");
Expand Down Expand Up @@ -1267,14 +1285,8 @@ private static boolean isAppUsingCustomLogManager(final EnumSet<Library> librari

final String logManagerProp = System.getProperty("java.util.logging.manager");
if (logManagerProp != null) {
final boolean onSysClasspath =
ClassLoader.getSystemResource(getResourceName(logManagerProp)) != null;
log.debug("Prop - logging.manager: {}", logManagerProp);
log.debug("logging.manager on system classpath: {}", onSysClasspath);
// Some applications set java.util.logging.manager but never actually initialize the logger.
// Check to see if the configured manager is on the system classpath.
// If so, it should be safe to initialize jmxfetch which will setup the log manager.
return !onSysClasspath;
return true;
}

return false;
Expand Down Expand Up @@ -1305,14 +1317,8 @@ private static boolean isAppUsingCustomJMXBuilder(final EnumSet<Library> librari

final String jmxBuilderProp = System.getProperty("javax.management.builder.initial");
if (jmxBuilderProp != null) {
final boolean onSysClasspath =
ClassLoader.getSystemResource(getResourceName(jmxBuilderProp)) != null;
log.debug("Prop - javax.management.builder.initial: {}", jmxBuilderProp);
log.debug("javax.management.builder.initial on system classpath: {}", onSysClasspath);
// Some applications set javax.management.builder.initial but never actually initialize JMX.
// Check to see if the configured JMX builder is on the system classpath.
// If so, it should be safe to initialize jmxfetch which will setup JMX.
return !onSysClasspath;
return true;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.logging.intake;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
Expand All @@ -12,18 +11,16 @@ public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void start(SharedCommunicationObjects sco) {
public static void install(SharedCommunicationObjects sco) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
BackendApi backendApi = apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS);
LogsDispatcher dispatcher = new LogsDispatcher(backendApi);
LogsWriterImpl writer = new LogsWriterImpl(config, dispatcher);
writer.start();
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
sco.whenReady(writer::start);

LogsIntake.registerWriter(writer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.BackendApiFactory.Intake;
import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsWriter;
import datadog.trace.util.AgentThreadFactory;
Expand All @@ -23,12 +26,12 @@ public class LogsWriterImpl implements LogsWriter {
private static final int ENQUEUE_LOG_TIMEOUT_MILLIS = 1_000;

private final Map<String, Object> commonTags;
private final LogsDispatcher logsDispatcher;
private final BackendApiFactory apiFactory;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, LogsDispatcher logsDispatcher) {
this.logsDispatcher = logsDispatcher;
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
this.apiFactory = apiFactory;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
Expand Down Expand Up @@ -84,6 +87,9 @@ public void log(Map<String, Object> message) {
}

private void logPollingLoop() {
BackendApi backendApi = apiFactory.createBackendApi(Intake.LOGS);
LogsDispatcher logsDispatcher = new LogsDispatcher(backendApi);

while (!Thread.currentThread().isInterrupted()) {
try {
List<Map<String, Object>> batch = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CustomLogManagerTest extends Specification {
, true) == 0
}

def "agent services starts up in premain if configured log manager on system classpath"() {
def "agent services startup is delayed even if configured log manager on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(LogManagerSetter.getName()
, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CustomMBeanServerBuilderTest extends Specification {
, true) == 0
}

def "JMXFetch starts up in premain if configured MBeanServerBuilder on system classpath"() {
def "JMXFetch startup is delayed even if configured MBeanServerBuilder on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(MBeanServerBuilderSetter.getName()
, [
Expand Down
Loading
Loading