Skip to content

Commit

Permalink
Merge pull request #17980 from cescoffier/discovery-amqp-dev-service
Browse files Browse the repository at this point in the history
Implement shared  broker for the AMQP Dev Service
  • Loading branch information
Ladicek authored Jun 17, 2021
2 parents 232ceda + aa743fe commit 4252dda
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 28 deletions.
14 changes: 14 additions & 0 deletions docs/src/main/asciidoc/amqp-dev-services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ Dev Services for AMQP relies on Docker to start the broker.
If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker.
You can configure the broker access using the `amqp-host`, `amqp-port`, `amqp-user` and `amqp-password` properties.

== Shared broker

Most of the time you need to share the broker between applications.
Dev Services for AMQP implements a _service discovery_ mechanism for your multiple Quarkus applications running in _dev_ mode to share a single broker.

NOTE: Dev Services for AMQP starts the container with the `quarkus-dev-service-amqp` label which is used to identify the container.

If you need multiple (shared) brokers, you can configure the `quarkus.amqp.devservices.service-name` attribute and indicate the broker name.
It looks for a container with the same value, or starts a new one if none can be found.
The default service name is `amqp`.

Sharing is enabled by default in dev mode, but disabled in test mode.
You can disable the sharing with `quarkus.amqp.devservices.shared=false`.

== Setting the port

By default, Dev Services for AMQP picks a random port and configures the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,31 @@ public class AmqpDevServicesBuildTimeConfig {
@ConfigItem(defaultValue = "--no-autotune --mapped --no-fsync")
public String extraArgs;

/**
* Indicates if the AMQP broker managed by Quarkus Dev Services is shared.
* When shared, Quarkus looks for running containers using label-based service discovery.
* If a matching container is found, it is used, and so a second one is not started.
* Otherwise, Dev Services for AMQP starts a new container.
* <p>
* The discovery uses the {@code quarkus-dev-service-amqp} label.
* The value is configured using the {@code service-name} property.
* <p>
* Container sharing is only used in dev mode.
*/
@ConfigItem(defaultValue = "true")
public boolean shared;

/**
* The value of the {@code quarkus-dev-service-aqmp} label attached to the started container.
* This property is used when {@code shared} is set to {@code true}.
* In this case, before starting a container, Dev Services for AMQP looks for a container with the
* {@code quarkus-dev-service-amqp} label
* set to the configured value. If found, it will use this container instead of starting a new one. Otherwise it
* starts a new container with the {@code quarkus-dev-service-amqp} label set to the specified value.
* <p>
* This property is used when you need multiple shared AMQP brokers.
*/
@ConfigItem(defaultValue = "amqp")
public String serviceName;

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package io.quarkus.smallrye.reactivemessaging.amqp.deployment;

import java.io.Closeable;
import java.util.List;
import java.util.Objects;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.api.model.ContainerPort;

import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
Expand All @@ -31,6 +36,13 @@
public class DevServicesAmqpProcessor {

private static final Logger log = Logger.getLogger(DevServicesAmqpProcessor.class);

/**
* Label to add to shared Dev Service for AMQP running in containers.
* This allows other applications to discover the running service and use it instead of starting a new instance.
*/
private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-amqp";

private static final int AMQP_PORT = 5672;
private static final String AMQP_HOST_PROP = "amqp-host";
private static final String AMQP_PORT_PROP = "amqp-port";
Expand Down Expand Up @@ -68,7 +80,7 @@ public DevServicesAmqpBrokerBuildItem startAmqpDevService(
cfg = null;
}

AmqpBroker broker = startAmqpBroker(configuration);
AmqpBroker broker = startAmqpBroker(configuration, launchMode);
DevServicesAmqpBrokerBuildItem artemis = null;
if (broker != null) {
closeable = broker.getCloseable();
Expand Down Expand Up @@ -108,19 +120,23 @@ public void run() {
}
cfg = configuration;

if (artemis != null) {
log.infof(
"Dev Services for AMQP started. Start applications that need to use the same AMQP broker "
+ "using -Damqp.host=%s -Damqp.port=%d -Damqp.user=%s -Damqp.password=%s",
broker.host, broker.port, broker.user, broker.password);
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_HOST_PROP, broker.host));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PORT_PROP, Integer.toString(broker.port)));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_USER_PROP, broker.user));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PASSWORD_PROP, broker.password));
if (broker != null) {
if (broker.isOwner()) {
log.infof(
"Dev Services for AMQP started. Other Quarkus applications in dev mode will find the "
+ "broker automatically. For Quarkus applications in production mode, you can connect to"
+ " this by starting your application with -Damqp.host=%s -Damqp.port=%d -Damqp.user=%s -Damqp.password=%s",
broker.host, broker.port, broker.user, broker.password);
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_HOST_PROP, broker.host));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PORT_PROP,
Integer.toString(broker.port)));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_USER_PROP, broker.user));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PASSWORD_PROP, broker.password));
}
}

return artemis;
Expand All @@ -138,7 +154,29 @@ private void shutdownBroker() {
}
}

private AmqpBroker startAmqpBroker(AmqpDevServiceCfg config) {
private static Container lookup(String expectedLabelValue) {
List<Container> containers = DockerClientFactory.lazyClient().listContainersCmd().exec();
for (Container container : containers) {
String s = container.getLabels().get(DEV_SERVICE_LABEL);
if (expectedLabelValue.equalsIgnoreCase(s)) {
return container;
}
}
return null;
}

private static ContainerPort getMappedPort(Container container, int port) {
for (ContainerPort p : container.getPorts()) {
Integer mapped = p.getPrivatePort();
Integer publicPort = p.getPublicPort();
if (mapped != null && mapped == port && publicPort != null) {
return p;
}
}
return null;
}

private AmqpBroker startAmqpBroker(AmqpDevServiceCfg config, LaunchModeBuildItem launchMode) {
if (!config.devServicesEnabled) {
// explicitly disabled
log.debug("Not starting dev services for AMQP, as it has been disabled in the config.");
Expand All @@ -162,11 +200,29 @@ private AmqpBroker startAmqpBroker(AmqpDevServiceCfg config) {
return null;
}

if (config.shared && launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT) {
// Detect if there is a broker already started using container labels.
Container container = lookup(config.serviceName);
if (container != null) {
ContainerPort port = getMappedPort(container, AMQP_PORT);
if (port != null) {
Integer p = port.getPublicPort();
String url = port.getIp() + ":" + p;
log.infof("Dev Services for AMQP container found: %s (%s). "
+ "Connecting to: %s.",
container.getId(),
container.getImage(), url);
return new AmqpBroker(port.getIp(), p, "admin", "admin", null);
}
}
}

// Starting the broker
ArtemisContainer container = new ArtemisContainer(
DockerImageName.parse(config.imageName),
config.extra,
config.fixedExposedPort);
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null);
container.start();

return new AmqpBroker(
Expand Down Expand Up @@ -206,11 +262,7 @@ private boolean hasAmqpChannelWithoutHostAndPort() {

private AmqpDevServiceCfg getConfiguration(AmqpBuildTimeConfig cfg) {
AmqpDevServicesBuildTimeConfig devServicesConfig = cfg.devservices;
boolean devServicesEnabled = devServicesConfig.enabled.orElse(true);
return new AmqpDevServiceCfg(devServicesEnabled,
devServicesConfig.imageName,
devServicesConfig.port.orElse(0),
devServicesConfig.extraArgs);
return new AmqpDevServiceCfg(devServicesConfig);
}

private static class AmqpBroker {
Expand All @@ -228,6 +280,10 @@ public AmqpBroker(String host, int port, String user, String password, Closeable
this.closeable = closeable;
}

public boolean isOwner() {
return closeable != null;
}

public Closeable getCloseable() {
return closeable;
}
Expand All @@ -238,12 +294,17 @@ private static final class AmqpDevServiceCfg {
private final String imageName;
private final Integer fixedExposedPort;
private final String extra;

public AmqpDevServiceCfg(boolean devServicesEnabled, String imageName, Integer fixedExposedPort, String extra) {
this.devServicesEnabled = devServicesEnabled;
this.imageName = imageName;
this.fixedExposedPort = fixedExposedPort;
this.extra = extra;
private final boolean shared;
private final String serviceName;

public AmqpDevServiceCfg(AmqpDevServicesBuildTimeConfig devServicesConfig) {
this.devServicesEnabled = devServicesConfig.enabled.orElse(true);
this.imageName = devServicesConfig.imageName;
this.fixedExposedPort = devServicesConfig.port.orElse(0);
this.extra = devServicesConfig.extraArgs;
this.shared = devServicesConfig.shared;
this.serviceName = devServicesConfig.serviceName;
;
}

@Override
Expand Down Expand Up @@ -272,14 +333,17 @@ private static final class ArtemisContainer extends GenericContainer<ArtemisCont

private final int port;

private ArtemisContainer(DockerImageName dockerImageName, String extra, int fixedExposedPort) {
private ArtemisContainer(DockerImageName dockerImageName, String extra, int fixedExposedPort, String serviceName) {
super(dockerImageName);
this.port = fixedExposedPort;
withNetwork(Network.SHARED);
withExposedPorts(AMQP_PORT);
withEnv("AMQ_USER", DEFAULT_USER);
withEnv("AMQ_PASSWORD", DEFAULT_PASSWORD);
withEnv("AMQ_EXTRA_ARGS", extra);
if (serviceName != null) { // Only adds the label in dev mode.
withLabel(DEV_SERVICE_LABEL, serviceName);
}
if (dockerImageName.getRepository().equals("artemiscloud/activemq-artemis-broker")) {
waitingFor(Wait.forLogMessage(".*AMQ241004.*", 1)); // Artemis console available.
} else {
Expand Down

0 comments on commit 4252dda

Please sign in to comment.