Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement shared broker for the AMQP Dev Service #17980

Merged
merged 1 commit into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/src/main/asciidoc/amqp-dev-services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ Dev Services for AMQP relies on Docker to start the broker.
If your environment does not support Docker, you will need to start the broker manually, or connect to an already running broker.
You can configure the broker access using the `amqp-host`, `amqp-port`, `amqp-user` and `amqp-password` properties.

== Shared broker

Most of the time you need to share the broker between applications.
Dev Services for AMQP implements a _service discovery_ mechanism for your multiple Quarkus applications running in _dev_ mode to share a single broker.

NOTE: Dev Services for AMQP starts the container with the `quarkus-dev-service-amqp` label which is used to identify the container.

If you need multiple (shared) brokers, you can configure the `quarkus.amqp.devservices.service-name` attribute and indicate the broker name.
It looks for a container with the same value, or starts a new one if none can be found.
The default service name is `amqp`.

Sharing is enabled by default in dev mode, but disabled in test mode.
You can disable the sharing with `quarkus.amqp.devservices.shared=false`.

== Setting the port

By default, Dev Services for AMQP picks a random port and configures the application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,31 @@ public class AmqpDevServicesBuildTimeConfig {
@ConfigItem(defaultValue = "--no-autotune --mapped --no-fsync")
public String extraArgs;

/**
* Indicates if the AMQP broker managed by Quarkus Dev Services is shared.
* When shared, Quarkus looks for running containers using label-based service discovery.
* If a matching container is found, it is used, and so a second one is not started.
* Otherwise, Dev Services for AMQP starts a new container.
* <p>
* The discovery uses the {@code quarkus-dev-service-amqp} label.
* The value is configured using the {@code service-name} property.
* <p>
* Container sharing is only used in dev mode.
*/
@ConfigItem(defaultValue = "true")
public boolean shared;

/**
* The value of the {@code quarkus-dev-service-aqmp} label attached to the started container.
* This property is used when {@code shared} is set to {@code true}.
* In this case, before starting a container, Dev Services for AMQP looks for a container with the
* {@code quarkus-dev-service-amqp} label
* set to the configured value. If found, it will use this container instead of starting a new one. Otherwise it
* starts a new container with the {@code quarkus-dev-service-amqp} label set to the specified value.
* <p>
* This property is used when you need multiple shared AMQP brokers.
*/
@ConfigItem(defaultValue = "amqp")
public String serviceName;

}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package io.quarkus.smallrye.reactivemessaging.amqp.deployment;

import java.io.Closeable;
import java.util.List;
import java.util.Objects;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.api.model.ContainerPort;

import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
Expand All @@ -31,6 +36,13 @@
public class DevServicesAmqpProcessor {

private static final Logger log = Logger.getLogger(DevServicesAmqpProcessor.class);

/**
* Label to add to shared Dev Service for AMQP running in containers.
* This allows other applications to discover the running service and use it instead of starting a new instance.
*/
private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-amqp";

private static final int AMQP_PORT = 5672;
private static final String AMQP_HOST_PROP = "amqp-host";
private static final String AMQP_PORT_PROP = "amqp-port";
Expand Down Expand Up @@ -68,7 +80,7 @@ public DevServicesAmqpBrokerBuildItem startAmqpDevService(
cfg = null;
}

AmqpBroker broker = startAmqpBroker(configuration);
AmqpBroker broker = startAmqpBroker(configuration, launchMode);
DevServicesAmqpBrokerBuildItem artemis = null;
if (broker != null) {
closeable = broker.getCloseable();
Expand Down Expand Up @@ -108,19 +120,23 @@ public void run() {
}
cfg = configuration;

if (artemis != null) {
log.infof(
"Dev Services for AMQP started. Start applications that need to use the same AMQP broker "
+ "using -Damqp.host=%s -Damqp.port=%d -Damqp.user=%s -Damqp.password=%s",
broker.host, broker.port, broker.user, broker.password);
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_HOST_PROP, broker.host));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PORT_PROP, Integer.toString(broker.port)));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_USER_PROP, broker.user));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PASSWORD_PROP, broker.password));
if (broker != null) {
if (broker.isOwner()) {
log.infof(
"Dev Services for AMQP started. Other Quarkus applications in dev mode will find the "
+ "broker automatically. For Quarkus applications in production mode, you can connect to"
+ " this by starting your application with -Damqp.host=%s -Damqp.port=%d -Damqp.user=%s -Damqp.password=%s",
broker.host, broker.port, broker.user, broker.password);
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_HOST_PROP, broker.host));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PORT_PROP,
Integer.toString(broker.port)));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_USER_PROP, broker.user));
devServicePropertiesProducer
.produce(new DevServicesNativeConfigResultBuildItem(AMQP_PASSWORD_PROP, broker.password));
}
}

return artemis;
Expand All @@ -138,7 +154,29 @@ private void shutdownBroker() {
}
}

private AmqpBroker startAmqpBroker(AmqpDevServiceCfg config) {
private static Container lookup(String expectedLabelValue) {
List<Container> containers = DockerClientFactory.lazyClient().listContainersCmd().exec();
for (Container container : containers) {
String s = container.getLabels().get(DEV_SERVICE_LABEL);
if (expectedLabelValue.equalsIgnoreCase(s)) {
return container;
}
}
return null;
}

private static ContainerPort getMappedPort(Container container, int port) {
for (ContainerPort p : container.getPorts()) {
Integer mapped = p.getPrivatePort();
Integer publicPort = p.getPublicPort();
if (mapped != null && mapped == port && publicPort != null) {
return p;
}
}
return null;
}

private AmqpBroker startAmqpBroker(AmqpDevServiceCfg config, LaunchModeBuildItem launchMode) {
if (!config.devServicesEnabled) {
// explicitly disabled
log.debug("Not starting dev services for AMQP, as it has been disabled in the config.");
Expand All @@ -162,11 +200,29 @@ private AmqpBroker startAmqpBroker(AmqpDevServiceCfg config) {
return null;
}

if (config.shared && launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT) {
// Detect if there is a broker already started using container labels.
Container container = lookup(config.serviceName);
if (container != null) {
ContainerPort port = getMappedPort(container, AMQP_PORT);
if (port != null) {
Integer p = port.getPublicPort();
String url = port.getIp() + ":" + p;
log.infof("Dev Services for AMQP container found: %s (%s). "
+ "Connecting to: %s.",
container.getId(),
container.getImage(), url);
return new AmqpBroker(port.getIp(), p, "admin", "admin", null);
}
}
}

// Starting the broker
ArtemisContainer container = new ArtemisContainer(
DockerImageName.parse(config.imageName),
config.extra,
config.fixedExposedPort);
config.fixedExposedPort,
launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT ? config.serviceName : null);
container.start();

return new AmqpBroker(
Expand Down Expand Up @@ -206,11 +262,7 @@ private boolean hasAmqpChannelWithoutHostAndPort() {

private AmqpDevServiceCfg getConfiguration(AmqpBuildTimeConfig cfg) {
AmqpDevServicesBuildTimeConfig devServicesConfig = cfg.devservices;
boolean devServicesEnabled = devServicesConfig.enabled.orElse(true);
return new AmqpDevServiceCfg(devServicesEnabled,
devServicesConfig.imageName,
devServicesConfig.port.orElse(0),
devServicesConfig.extraArgs);
return new AmqpDevServiceCfg(devServicesConfig);
}

private static class AmqpBroker {
Expand All @@ -228,6 +280,10 @@ public AmqpBroker(String host, int port, String user, String password, Closeable
this.closeable = closeable;
}

public boolean isOwner() {
return closeable != null;
}

public Closeable getCloseable() {
return closeable;
}
Expand All @@ -238,12 +294,17 @@ private static final class AmqpDevServiceCfg {
private final String imageName;
private final Integer fixedExposedPort;
private final String extra;

public AmqpDevServiceCfg(boolean devServicesEnabled, String imageName, Integer fixedExposedPort, String extra) {
this.devServicesEnabled = devServicesEnabled;
this.imageName = imageName;
this.fixedExposedPort = fixedExposedPort;
this.extra = extra;
private final boolean shared;
private final String serviceName;

public AmqpDevServiceCfg(AmqpDevServicesBuildTimeConfig devServicesConfig) {
this.devServicesEnabled = devServicesConfig.enabled.orElse(true);
this.imageName = devServicesConfig.imageName;
this.fixedExposedPort = devServicesConfig.port.orElse(0);
this.extra = devServicesConfig.extraArgs;
this.shared = devServicesConfig.shared;
this.serviceName = devServicesConfig.serviceName;
;
}

@Override
Expand Down Expand Up @@ -272,14 +333,17 @@ private static final class ArtemisContainer extends GenericContainer<ArtemisCont

private final int port;

private ArtemisContainer(DockerImageName dockerImageName, String extra, int fixedExposedPort) {
private ArtemisContainer(DockerImageName dockerImageName, String extra, int fixedExposedPort, String serviceName) {
super(dockerImageName);
this.port = fixedExposedPort;
withNetwork(Network.SHARED);
withExposedPorts(AMQP_PORT);
withEnv("AMQ_USER", DEFAULT_USER);
withEnv("AMQ_PASSWORD", DEFAULT_PASSWORD);
withEnv("AMQ_EXTRA_ARGS", extra);
if (serviceName != null) { // Only adds the label in dev mode.
withLabel(DEV_SERVICE_LABEL, serviceName);
}
if (dockerImageName.getRepository().equals("artemiscloud/activemq-artemis-broker")) {
waitingFor(Wait.forLogMessage(".*AMQ241004.*", 1)); // Artemis console available.
} else {
Expand Down