Skip to content

Commit

Permalink
Merge pull request quarkusio#27896 from geoand/quarkusio#27627
Browse files Browse the repository at this point in the history
Ensure that Mongo DNS lookup does not happen on the event loop
geoand authored Sep 13, 2022
2 parents c04644b + e04b11f commit 20d9c4f
Showing 4 changed files with 66 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@
import io.quarkus.mongodb.runtime.dns.MongoDnsClientProvider;
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.vertx.deployment.VertxBuildItem;

public class MongoClientProcessor {
private static final String MONGODB_TRACING_COMMANDLISTENER_CLASSNAME = "io.quarkus.mongodb.tracing.MongoTracingCommandListener";
@@ -281,7 +282,8 @@ void generateClientBeans(MongoClientRecorder recorder,
MongoClientBuildTimeConfig mongoClientBuildTimeConfig,
MongodbConfig mongodbConfig,
List<MongoUnremovableClientsBuildItem> mongoUnremovableClientsBuildItem,
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer,
VertxBuildItem vertxBuildItem) {

boolean makeUnremovable = !mongoUnremovableClientsBuildItem.isEmpty();

@@ -328,6 +330,8 @@ void generateClientBeans(MongoClientRecorder recorder,
.produce(createReactiveSyntheticBean(recorder, mongodbConfig, makeUnremovable, mongoClientName.getName(),
mongoClientName.isAddQualifier()));
}

recorder.performInitialization(mongodbConfig, vertxBuildItem.getVertx());
}

private SyntheticBeanBuildItem createBlockingSyntheticBean(MongoClientRecorder recorder, MongodbConfig mongodbConfig,
Original file line number Diff line number Diff line change
@@ -9,15 +9,18 @@
import javax.enterprise.inject.literal.NamedLiteral;
import javax.enterprise.util.AnnotationLiteral;

import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.event.ConnectionPoolListener;

import io.quarkus.arc.Arc;
import io.quarkus.mongodb.metrics.MicrometerConnectionPoolListener;
import io.quarkus.mongodb.metrics.MongoMetricsConnectionPoolListener;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.quarkus.mongodb.runtime.dns.MongoDnsClientProvider;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Vertx;

@Recorder
public class MongoClientRecorder {
@@ -104,4 +107,25 @@ public ConnectionPoolListener get() {
}
};
}

/**
* We need to perform some initialization work on the main thread to ensure that reactive operations (such as DNS
* resolution)
* don't end up being performed on the event loop
*/
public void performInitialization(MongodbConfig config, RuntimeValue<Vertx> vertx) {
MongoDnsClientProvider.vertx = vertx.getValue();
initializeDNSLookup(config.defaultMongoClientConfig);
for (MongoClientConfig mongoClientConfig : config.mongoClientConfigs.values()) {
initializeDNSLookup(mongoClientConfig);
}
}

private void initializeDNSLookup(MongoClientConfig mongoClientConfig) {
if (mongoClientConfig.connectionString.isEmpty()) {
return;
}
// this ensures that DNS resolution will take place if necessary
new ConnectionString(mongoClientConfig.connectionString.get());
}
}
Original file line number Diff line number Diff line change
@@ -9,6 +9,9 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@@ -20,7 +23,6 @@
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.DnsException;

import io.quarkus.arc.Arc;
import io.quarkus.mongodb.runtime.MongodbConfig;
import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.dns.DnsClientOptions;
@@ -46,8 +48,14 @@ public class MongoDnsClient implements DnsClient {

private final io.vertx.mutiny.core.dns.DnsClient dnsClient;

MongoDnsClient() {
Vertx vertx = Arc.container().instance(Vertx.class).get();
// the static fields are used in order to hold DNS resolution result that has been performed on the main thread
// at application startup
// the reason we need this is to ensure that no blocking of event loop threads will occur due to DNS resolution
private static final Map<String, List<SrvRecord>> SRV_CACHE = new ConcurrentHashMap<>();
private static final Map<String, List<String>> TXT_CACHE = new ConcurrentHashMap<>();

MongoDnsClient(io.vertx.core.Vertx vertx) {
Vertx mutinyVertx = new io.vertx.mutiny.core.Vertx(vertx);

boolean activity = config.getOptionalValue(DNS_LOG_ACTIVITY, Boolean.class).orElse(false);

@@ -69,7 +77,7 @@ public class MongoDnsClient implements DnsClient {
.setHost(server)
.setPort(port);
}
dnsClient = vertx.createDnsClient(dnsClientOptions);
dnsClient = mutinyVertx.createDnsClient(dnsClientOptions);
}

private static List<String> nameServers() {
@@ -118,7 +126,17 @@ private List<String> resolveSrvRequest(final String srvHost) {
.orElse(Duration.ofSeconds(5));

try {
List<SrvRecord> srvRecords = dnsClient.resolveSRV(srvHost).await().atMost(timeout);
List<SrvRecord> srvRecords;
if (SRV_CACHE.containsKey(srvHost)) {
srvRecords = SRV_CACHE.get(srvHost);
} else {
srvRecords = dnsClient.resolveSRV(srvHost).invoke(new Consumer<>() {
@Override
public void accept(List<SrvRecord> srvRecords) {
SRV_CACHE.put(srvHost, srvRecords);
}
}).await().atMost(timeout);
}

if (srvRecords.isEmpty()) {
throw new MongoConfigurationException("No SRV records available for host " + srvHost);
@@ -143,11 +161,18 @@ private List<String> resolveSrvRequest(final String srvHost) {
* Here we concatenate TXT records together with a '&' separator as required by connection strings
*/
public List<String> resolveTxtRequest(final String host) {
if (TXT_CACHE.containsKey(host)) {
return TXT_CACHE.get(host);
}
try {
Duration timeout = config.getOptionalValue(DNS_LOOKUP_TIMEOUT, Duration.class)
.orElse(Duration.ofSeconds(5));

return dnsClient.resolveTXT(host).await().atMost(timeout);
return dnsClient.resolveTXT(host).invoke(new Consumer<>() {
@Override
public void accept(List<String> strings) {
TXT_CACHE.put(host, strings);
}
}).await().atMost(timeout);
} catch (Throwable e) {
throw new MongoConfigurationException("Unable to look up TXT record for host " + host, e);
}
Original file line number Diff line number Diff line change
@@ -4,11 +4,15 @@
import com.mongodb.spi.dns.DnsClientProvider;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.Vertx;

@RegisterForReflection
public class MongoDnsClientProvider implements DnsClientProvider {

public static volatile Vertx vertx;

@Override
public DnsClient create() {
return new MongoDnsClient();
return new MongoDnsClient(vertx);
}
}

0 comments on commit 20d9c4f

Please sign in to comment.