From 7b361bb8a70b373bba9bf4331832f891b3ee74b1 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Sun, 23 Jun 2019 17:01:53 +0800 Subject: [PATCH] Adds ActiveMQ 5.x transport Due to popular demand, this adds support for ActiveMQ 5.x. This is enabled when the env variable `ACTIVEMQ_URL` is set to a valid broker. Thanks very much to @IAMTJW for early work towards this change. To try this change, you can use jitpack https://jitpack.io/#openzipkin/zipkin Ex. ```bash TAG=activemq-SNAPSHOT curl -sSL https://jitpack.io/com/github/openzipkin/zipkin/zipkin-server/${TAG}/zipkin-server-${TAG}-exec.jar > zipkin.jar ACTIVEMQ_URL=tcp://localhost:61616 java -jar zipkin.jar ``` Supercedes #2466 Fixes #1990 --- README.md | 4 +- zipkin-collector/activemq/RATIONALE.md | 32 +++ zipkin-collector/activemq/README.md | 21 ++ zipkin-collector/activemq/pom.xml | 69 +++++ .../collector/activemq/ActiveMQCollector.java | 105 ++++++++ .../activemq/ActiveMQSpanConsumer.java | 138 ++++++++++ .../zipkin2/collector/activemq/LazyInit.java | 95 +++++++ .../activemq/ITActiveMQCollector.java | 240 ++++++++++++++++++ .../src/test/resources/log4j2.properties | 10 + zipkin-collector/pom.xml | 1 + zipkin-server/README.md | 20 ++ zipkin-server/pom.xml | 8 + .../internal/InternalZipkinConfiguration.java | 2 + .../ZipkinActiveMQCollectorConfiguration.java | 64 +++++ .../ZipkinActiveMQCollectorProperties.java | 100 ++++++++ .../main/resources/zipkin-server-shared.yml | 12 + ...ActiveMQCollectorPropertiesOverrideTest.kt | 81 ++++++ .../server/internal/activemq/Access.kt | 35 +++ ...ipkinActiveMQCollectorConfigurationTest.kt | 70 +++++ .../ZipkinActiveMQCollectorPropertiesTest.kt | 28 ++ 20 files changed, 1133 insertions(+), 2 deletions(-) create mode 100644 zipkin-collector/activemq/RATIONALE.md create mode 100644 zipkin-collector/activemq/README.md create mode 100644 zipkin-collector/activemq/pom.xml create mode 100644 zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java create mode 100644 zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQSpanConsumer.java create mode 100644 zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/LazyInit.java create mode 100644 zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java create mode 100755 zipkin-collector/activemq/src/test/resources/log4j2.properties create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfiguration.java create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorProperties.java create mode 100644 zipkin-server/src/test/kotlin/zipkin2/collector/activemq/ZipkinActiveMQCollectorPropertiesOverrideTest.kt create mode 100644 zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/Access.kt create mode 100644 zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfigurationTest.kt create mode 100644 zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorPropertiesTest.kt diff --git a/README.md b/README.md index cb6f4689a1f..914bd6a1aba 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,8 @@ Features include both the collection and lookup of this data. This repository includes a dependency-free Java library and a [spring-boot](http://projects.spring.io/spring-boot/) server. Storage options include in-memory, MySQL, Apache Cassandra and Elasticsearch. -Transport options include HTTP, gRPC, Scribe (Apache Thrift), RabbitMQ and -Apache Kafka. +Transport options include HTTP, Apache ActiveMQ, Apache Kafka, gRPC, RabbitMQ +and Scribe (Apache Thrift). ## Quick-start diff --git a/zipkin-collector/activemq/RATIONALE.md b/zipkin-collector/activemq/RATIONALE.md new file mode 100644 index 00000000000..b4c589a6d65 --- /dev/null +++ b/zipkin-collector/activemq/RATIONALE.md @@ -0,0 +1,32 @@ +# Rational for collector-activemq + +## Diverse need +ActiveMQ was formerly requested in April, 2018 through issue #1990 which had two other thumbs-up. An +early draft of this implementation was developed by @IAMTJW and resulting in another user asking for +it. In June of 2019 there were a couple more requests for this on gitter, notably about Amazon MQ. + +## On ActiveMQ 5.x +All users who expressed interest were interestd in ActiveMQ 5.x (aka Classic), not Artemis. +Moreover, at the time of creation Amazon MQ only supported ActiveMQ 5.x. + +Artemis has higher throughput potential, but has more conflicting dependencies and would add 8MiB to +the server. Moreover, no-one has asked for it. + +## On part of the default server +ActiveMQ's client is 2MiB, which will increase the jar size, something that we've been tracking +recently. To be fair, this is not a large module. In comparison, one dependency of Kafka, `zstd-jni` +alone is almost 4MiB. There are no dependencies likely to conflict at runtime, and only one dodgy +dependency, [hawtbuf](https://github.com/fusesource/hawtbuf), on account of it being abandoned since +2014. + +Apart from size, ActiveMQ is a stable integration, included in Spring Boot, and could be useful for +other integrations as an in-memory queue. Moreover, bundling makes integration with zipkin-aws far +easier in the same way as bundling elasticsearch does. + +## On a potential single-transport client + +This package is using the normal activemq-jms client. During a [mail thread](http://activemq.2283324.n4.nabble.com/Interest-in-using-ActiveMQ-as-a-trace-data-transport-for-Zipkin-td4749755.html), we learned the +the STOMP and AMQP 1.0 protocol are the more portable options for a portable integration as +ActiveMQ, Artemis and RabbitMQ all support these. On the other hand Kafka does not support these +protocols. Any future portability work could be limited by this. Meanwhile, using the standard JMS +client will make troubleshooting most natural to end users. diff --git a/zipkin-collector/activemq/README.md b/zipkin-collector/activemq/README.md new file mode 100644 index 00000000000..f5e6982eb19 --- /dev/null +++ b/zipkin-collector/activemq/README.md @@ -0,0 +1,21 @@ +# collector-activemq + +## ActiveMQCollector +This collector consumes an ActiveMQ 5.x queue for messages that contain a list of spans. Underneath +this uses the ActiveMQ 5.x JMS client, which has two notable dependencies `slf4j-api` and `hawtbuf`. + +The message's binary data includes a list of spans. Supported encodings +are the same as the http [POST /spans](https://zipkin.io/zipkin-api/#/paths/%252Fspans) body. + +### Json +The message's binary data is a list of spans in json. The first character must be '[' (decimal 91). + +`Codec.JSON.writeSpans(spans)` performs the correct json encoding. + +Here's an example, sending a list of a single span to the zipkin queue: + +```bash +$ curl -u admin:admin -X POST -s localhost:8161/api/message/zipkin?type=queue \ + -H "Content-Type: application/json" \ + -d '[{"traceId":"1","name":"bang","id":"2","timestamp":1470150004071068,"duration":1,"localEndpoint":{"serviceName":"flintstones"},"tags":{"lc":"bamm-bamm"}}]' +``` diff --git a/zipkin-collector/activemq/pom.xml b/zipkin-collector/activemq/pom.xml new file mode 100644 index 00000000000..0aa1f8ff361 --- /dev/null +++ b/zipkin-collector/activemq/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + + io.zipkin.zipkin2 + zipkin-collector-parent + 2.14.3-SNAPSHOT + + + zipkin-collector-activemq + Collector: ActiveMQ + Zipkin span collector for ActiveMQ transport + + + ${project.basedir}/../.. + 5.15.9 + + + + + ${project.groupId} + zipkin-collector + ${project.version} + + + + org.apache.activemq + activemq-client + ${amqp-client.version} + + + + org.apache.activemq + activemq-broker + ${amqp-client.version} + test + + + + org.apache.activemq.tooling + activemq-junit + ${amqp-client.version} + test + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java new file mode 100644 index 00000000000..4c0e52051fb --- /dev/null +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -0,0 +1,105 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import java.io.IOException; +import org.apache.activemq.ActiveMQConnectionFactory; +import zipkin2.CheckResult; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorComponent; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.storage.StorageComponent; + +/** This collector consumes encoded binary messages from a ActiveMQ queue. */ +public final class ActiveMQCollector extends CollectorComponent { + public static Builder builder() { + return new Builder(); + } + + /** Configuration including defaults needed to consume spans from a ActiveMQ queue. */ + public static final class Builder extends CollectorComponent.Builder { + Collector.Builder delegate = Collector.newBuilder(ActiveMQCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + ActiveMQConnectionFactory connectionFactory; + String queue = "zipkin"; + int concurrency = 1; + + @Override public Builder storage(StorageComponent storage) { + this.delegate.storage(storage); + return this; + } + + @Override public Builder sampler(CollectorSampler sampler) { + this.delegate.sampler(sampler); + return this; + } + + @Override public Builder metrics(CollectorMetrics metrics) { + if (metrics == null) throw new NullPointerException("metrics == null"); + this.metrics = metrics.forTransport("activemq"); + this.delegate.metrics(this.metrics); + return this; + } + + public Builder connectionFactory(ActiveMQConnectionFactory connectionFactory) { + if (connectionFactory == null) throw new NullPointerException("connectionFactory == null"); + this.connectionFactory = connectionFactory; + return this; + } + + /** Queue zipkin spans will be consumed from. Defaults to "zipkin". */ + public Builder queue(String queue) { + if (queue == null) throw new NullPointerException("queue == null"); + this.queue = queue; + return this; + } + + /** Count of concurrent message listeners on the queue. Defaults to 1 */ + public Builder concurrency(int concurrency) { + if (concurrency < 1) throw new IllegalArgumentException("concurrency < 1"); + this.concurrency = concurrency; + return this; + } + + @Override public ActiveMQCollector build() { + if (connectionFactory == null) throw new NullPointerException("connectionFactory == null"); + return new ActiveMQCollector(this); + } + } + + final String queue; + final LazyInit lazyInit; + + ActiveMQCollector(Builder builder) { + this.queue = builder.queue; + this.lazyInit = new LazyInit(builder); + } + + @Override public ActiveMQCollector start() { + lazyInit.init(); + return this; + } + + @Override public CheckResult check() { + if (lazyInit.result == null) { + return CheckResult.failed(new IllegalStateException("Collector not yet started")); + } + return lazyInit.result.checkResult; + } + + @Override public void close() throws IOException { + lazyInit.close(); + } +} diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQSpanConsumer.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQSpanConsumer.java new file mode 100644 index 00000000000..33f596d36a0 --- /dev/null +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQSpanConsumer.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.transport.TransportListener; +import zipkin2.Callback; +import zipkin2.CheckResult; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorMetrics; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Consumes spans from messages on a ActiveMQ queue. Malformed messages will be discarded. Errors in + * the storage component will similarly be ignored, with no retry of the message. + */ +final class ActiveMQSpanConsumer implements TransportListener, MessageListener, Closeable { + static final Callback NOOP = new Callback() { + @Override public void onSuccess(Void value) { + } + + @Override public void onError(Throwable t) { + } + }; + + static final CheckResult + CLOSED = CheckResult.failed(new IllegalStateException("Collector intentionally closed")), + INTERRUPTION = CheckResult.failed(new IOException("Recoverable error on ActiveMQ connection")); + + final Collector collector; + final CollectorMetrics metrics; + + final ActiveMQConnection connection; + final Map sessionToReceiver = new LinkedHashMap<>(); + + volatile CheckResult checkResult = CheckResult.OK; + + ActiveMQSpanConsumer(Collector collector, CollectorMetrics metrics, ActiveMQConnection conn) { + this.collector = collector; + this.metrics = metrics; + this.connection = conn; + connection.addTransportListener(this); + } + + /** JMS contract is one session per thread: we need a new session up to our concurrency level. */ + void registerInNewSession(ActiveMQConnection connection, String queue) throws JMSException { + // Pass redundant info as we can't use default method in activeMQ + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + // No need to do anything on ActiveMQ side as physical queues are created on demand + Queue destination = session.createQueue(queue); + QueueReceiver receiver = session.createReceiver(destination); + receiver.setMessageListener(this); + sessionToReceiver.put(session, receiver); + } + + @Override public void onCommand(Object o) { + } + + @Override public void onException(IOException error) { + checkResult = CheckResult.failed(error); + } + + @Override public void transportInterupted() { + checkResult = INTERRUPTION; + } + + @Override public void transportResumed() { + checkResult = CheckResult.OK; + } + + @Override public void onMessage(Message message) { + metrics.incrementMessages(); + byte[] serialized; // TODO: consider how to reuse buffers here + try { + if (message instanceof BytesMessage) { + BytesMessage bytesMessage = (BytesMessage) message; + serialized = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(serialized); + } else if (message instanceof TextMessage) { + String text = ((TextMessage) message).getText(); + serialized = text.getBytes(UTF_8); + } else { + metrics.incrementMessagesDropped(); + return; + } + } catch (Exception e) { + metrics.incrementMessagesDropped(); + return; + } + + metrics.incrementBytes(serialized.length); + if (serialized.length == 0) return; // lenient on empty messages + collector.acceptSpans(serialized, NOOP); + } + + @Override public void close() throws IOException { + if (checkResult == CLOSED) return; + checkResult = CLOSED; + connection.removeTransportListener(this); + try { + for (Map.Entry sessionReceiver : sessionToReceiver.entrySet()) { + sessionReceiver.getValue().setMessageListener(null); // deregister this + sessionReceiver.getKey().close(); + } + connection.close(); + } catch (JMSException e) { + if (e.getLinkedException() instanceof IOException) { + throw (IOException) e.getLinkedException(); + } + throw new IOException(e); + } + } +} diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/LazyInit.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/LazyInit.java new file mode 100644 index 00000000000..e8e6adf4a97 --- /dev/null +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/LazyInit.java @@ -0,0 +1,95 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import java.io.IOException; +import java.io.UncheckedIOException; +import javax.jms.JMSException; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorMetrics; + +/** + * Lazy creates a connection and registers a message listener up to the specified concurrency level. + * This listener will also receive health notifications. + */ +final class LazyInit { + final Collector collector; + final CollectorMetrics metrics; + final ActiveMQConnectionFactory connectionFactory; + final String queue; + final int concurrency; + + volatile ActiveMQSpanConsumer result; + + LazyInit(ActiveMQCollector.Builder builder) { + collector = builder.delegate.build(); + metrics = builder.metrics; + connectionFactory = builder.connectionFactory; + queue = builder.queue; + concurrency = builder.concurrency; + } + + ActiveMQSpanConsumer init() { + if (result == null) { + synchronized (this) { + if (result == null) { + result = doInit(); + } + } + } + return result; + } + + void close() throws IOException { + ActiveMQSpanConsumer maybe = result; + if (maybe != null) result.close(); + } + + ActiveMQSpanConsumer doInit() { + final ActiveMQConnection connection; + try { + connection = (ActiveMQConnection) connectionFactory.createQueueConnection(); + connection.start(); + } catch (JMSException e) { + String prefix = "Unable to establish connection to ActiveMQ broker: "; + Exception cause = e.getLinkedException(); + if (cause instanceof IOException) { + throw new UncheckedIOException(prefix + message(cause), (IOException) cause); + } + throw new RuntimeException(prefix + message(e), e); + } + + try { + ActiveMQSpanConsumer result = new ActiveMQSpanConsumer(collector, metrics, connection); + + for (int i = 0; i < concurrency; i++) { + result.registerInNewSession(connection, queue); + } + + return result; + } catch (JMSException e) { + try { + connection.close(); + } catch (JMSException ignored) { + } + throw new RuntimeException("Unable to create receiver for queue: " + queue, e); + } + } + + String message(Exception cause) { + return cause.getMessage() != null ? cause.getMessage() : cause.getClass().getSimpleName(); + } +} diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java new file mode 100644 index 00000000000..3b6242b6588 --- /dev/null +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java @@ -0,0 +1,240 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.junit.EmbeddedActiveMQBroker; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.CheckResult; +import zipkin2.Span; +import zipkin2.codec.SpanBytesEncoder; +import zipkin2.collector.InMemoryCollectorMetrics; +import zipkin2.storage.SpanConsumer; +import zipkin2.storage.SpanStore; +import zipkin2.storage.StorageComponent; + +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin2.TestObjects.LOTS_OF_SPANS; +import static zipkin2.TestObjects.UTF_8; +import static zipkin2.codec.SpanBytesEncoder.PROTO3; +import static zipkin2.codec.SpanBytesEncoder.THRIFT; + +public class ITActiveMQCollector { + List spans = Arrays.asList(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1]); + + @ClassRule public static EmbeddedActiveMQBroker active = new EmbeddedActiveMQBroker(); + @Rule public TestName testName = new TestName(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); + InMemoryCollectorMetrics activemqMetrics = metrics.forTransport("activemq"); + + CopyOnWriteArraySet threadsProvidingSpans = new CopyOnWriteArraySet<>(); + LinkedBlockingQueue> receivedSpans = new LinkedBlockingQueue<>(); + SpanConsumer consumer = (spans) -> { + threadsProvidingSpans.add(Thread.currentThread()); + receivedSpans.add(spans); + return Call.create(null); + }; + + ActiveMQCollector collector; + + @Before public void start() { + collector = builder().build().start(); + } + + @After public void stop() throws IOException { + collector.close(); + } + + @Test public void checkPasses() { + assertThat(collector.check().ok()).isTrue(); + } + + @Test public void startFailsWithInvalidActiveMqServer() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + // we can be pretty certain ActiveMQ isn't running on localhost port 80 + connectionFactory.setBrokerURL("tcp://localhost:80"); + try (ActiveMQCollector collector = builder().connectionFactory(connectionFactory).build()) { + thrown.expect(UncheckedIOException.class); + thrown.expectMessage("Unable to establish connection to ActiveMQ broker: Connection refused"); + collector.start(); + } + } + + /** Ensures list encoding works: a json encoded list of spans */ + @Test public void messageWithMultipleSpans_json() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.JSON_V1); + } + + /** Ensures list encoding works: a version 2 json list of spans */ + @Test public void messageWithMultipleSpans_json2() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.JSON_V2); + } + + /** Ensures list encoding works: proto3 ListOfSpans */ + @Test public void messageWithMultipleSpans_proto3() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.PROTO3); + } + + void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception { + byte[] message = encoder.encodeList(spans); + active.pushMessage(collector.queue, message); + + assertThat(receivedSpans.take()).isEqualTo(spans); + + assertThat(activemqMetrics.messages()).isEqualTo(1); + assertThat(activemqMetrics.messagesDropped()).isZero(); + assertThat(activemqMetrics.bytes()).isEqualTo(message.length); + assertThat(activemqMetrics.spans()).isEqualTo(spans.size()); + assertThat(activemqMetrics.spansDropped()).isZero(); + } + + /** Ensures malformed spans don't hang the collector */ + @Test public void skipsMalformedData() throws Exception { + byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json + byte[] malformed2 = "malformed".getBytes(UTF_8); + active.pushMessage(collector.queue, THRIFT.encodeList(spans)); + active.pushMessage(collector.queue, new byte[0]); + active.pushMessage(collector.queue, malformed1); + active.pushMessage(collector.queue, malformed2); + active.pushMessage(collector.queue, THRIFT.encodeList(spans)); + + Thread.sleep(1000); + + assertThat(activemqMetrics.messages()).isEqualTo(5); + assertThat(activemqMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty + assertThat(activemqMetrics.bytes()) + .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length); + assertThat(activemqMetrics.spans()).isEqualTo(spans.size() * 2); + assertThat(activemqMetrics.spansDropped()).isZero(); + } + + /** Guards against errors that leak from storage, such as InvalidQueryException */ + @Test public void skipsOnSpanStorageException() throws Exception { + collector.close(); + + AtomicInteger counter = new AtomicInteger(); + consumer = (input) -> new Call.Base() { + @Override protected Void doExecute() { + throw new AssertionError(); + } + + @Override protected void doEnqueue(Callback callback) { + if (counter.getAndIncrement() == 1) { + callback.onError(new RuntimeException("storage fell over")); + } else { + receivedSpans.add(spans); + callback.onSuccess(null); + } + } + + @Override public Call clone() { + throw new AssertionError(); + } + }; + + active.pushMessage(collector.queue, PROTO3.encodeList(spans)); + active.pushMessage(collector.queue, PROTO3.encodeList(spans)); // tossed on error + active.pushMessage(collector.queue, PROTO3.encodeList(spans)); + + collector = builder().storage(buildStorage(consumer)).build().start(); + + assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); + // the only way we could read this, is if the malformed span was skipped. + assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); + + assertThat(activemqMetrics.messages()).isEqualTo(3); + assertThat(activemqMetrics.messagesDropped()).isZero(); // storage failure not message failure + assertThat(activemqMetrics.bytes()).isEqualTo(PROTO3.encodeList(spans).length * 3); + assertThat(activemqMetrics.spans()).isEqualTo(spans.size() * 3); + assertThat(activemqMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped + } + + @Test public void messagesDistributedAcrossMultipleThreadsSuccessfully() throws Exception { + collector.close(); + + CountDownLatch latch = new CountDownLatch(2); + collector = builder().concurrency(2).storage(buildStorage((spans) -> { + latch.countDown(); + try { + latch.await(); // await the other one as this proves 2 threads are in use + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return consumer.accept(spans); + })).build().start(); + + active.pushMessage(collector.queue, ""); // empty bodies don't go to storage + active.pushMessage(collector.queue, PROTO3.encodeList(spans)); + active.pushMessage(collector.queue, PROTO3.encodeList(spans)); + + assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); + latch.countDown(); + assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); + + assertThat(threadsProvidingSpans.size()).isEqualTo(2); + + assertThat(activemqMetrics.messages()).isEqualTo(3); // 2 + empty body for warmup + assertThat(activemqMetrics.messagesDropped()).isZero(); + assertThat(activemqMetrics.bytes()).isEqualTo(PROTO3.encodeList(spans).length * 2); + assertThat(activemqMetrics.spans()).isEqualTo(spans.size() * 2); + assertThat(activemqMetrics.spansDropped()).isZero(); + } + + ActiveMQCollector.Builder builder() { + return ActiveMQCollector.builder() + .connectionFactory(active.createConnectionFactory()) + .storage(buildStorage(consumer)) + .metrics(metrics) + // prevent test flakes by having each run in an individual queue + .queue(testName.getMethodName()); + } + + static StorageComponent buildStorage(final SpanConsumer spanConsumer) { + return new StorageComponent() { + @Override public SpanStore spanStore() { + throw new AssertionError(); + } + + @Override public SpanConsumer spanConsumer() { + return spanConsumer; + } + + @Override public CheckResult check() { + return CheckResult.OK; + } + + @Override public void close() { + throw new AssertionError(); + } + }; + } +} diff --git a/zipkin-collector/activemq/src/test/resources/log4j2.properties b/zipkin-collector/activemq/src/test/resources/log4j2.properties new file mode 100755 index 00000000000..5fef102cd6a --- /dev/null +++ b/zipkin-collector/activemq/src/test/resources/log4j2.properties @@ -0,0 +1,10 @@ +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT +logger.activemq.name=zipkin2.collector.activemq +logger.activemq.level=debug diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml index 5a63a8e9a9b..024c51778d5 100644 --- a/zipkin-collector/pom.xml +++ b/zipkin-collector/pom.xml @@ -36,6 +36,7 @@ core + activemq kafka rabbitmq scribe diff --git a/zipkin-server/README.md b/zipkin-server/README.md index f1d4894cadd..0ecc83facaa 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -302,6 +302,26 @@ Environment Variable | Property | Description `COLLECTOR_PORT` | `zipkin.collector.scribe.port` | The port to listen for thrift RPC scribe requests. Defaults to 9410 `SCRIBE_CATEGORY` | `zipkin.collector.scribe.category` | Category zipkin spans will be consumed from. Defaults to `zipkin` + +### ActiveMQ Collector +The [ActiveMQ Collector](../zipkin-collector/activemq) is enabled when `ACTIVEMQ_URL` is set to a v5.x broker. The following +settings apply in this case. + +Environment Variable | Property | Description +--- | --- | --- +`ACTIVEMQ_URL` | `zipkin.collector.activemq.url` | [Connection URL](https://activemq.apache.org/uri-protocols) to the ActiveMQ broker, ex. `tcp://localhost:61616` or `failover:(tcp://localhost:61616,tcp://remotehost:61616)` +`ACTIVEMQ_QUEUE` | `zipkin.collector.activemq.queue` | Queue from which to collect span messages. Defaults to `zipkin` +`ACTIVEMQ_CLIENT_ID_PREFIX` | `zipkin.collector.activemq.client-id-prefix` | Client ID prefix for queue consumers. Defaults to `zipkin` +`ACTIVEMQ_CONCURRENCY` | `zipkin.collector.activemq.concurrency` | Number of concurrent span consumers. Defaults to `1` +`ACTIVEMQ_USER` | `zipkin.collector.activemq.user` | Optional username to connect to the broker +`ACTIVEMQ_PASSWORD`| `zipkin.collector.activemq.password` | Optional password to connect to the broker + +Example usage: + +```bash +$ ACTIVEMQ_URL=tcp://localhost:61616 java -jar zipkin.jar +``` + ### Kafka Collector The Kafka collector is enabled when `KAFKA_BOOTSTRAP_SERVERS` is set to a v0.10+ server. The following settings apply in this case. Some settings diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index da52b65cafe..0611ac699d8 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -220,6 +220,14 @@ true + + + ${project.groupId}.zipkin2 + zipkin-collector-activemq + ${project.version} + true + + ${project.groupId}.zipkin2 diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java index 6d1e781c9d1..b375821e8ea 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java @@ -15,6 +15,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import zipkin2.server.internal.activemq.ZipkinActiveMQCollectorConfiguration; import zipkin2.server.internal.brave.TracingConfiguration; import zipkin2.server.internal.cassandra.ZipkinCassandraStorageConfiguration; import zipkin2.server.internal.cassandra3.ZipkinCassandra3StorageConfiguration; @@ -39,6 +40,7 @@ ZipkinQueryApiV2.class, ZipkinHttpCollector.class, ZipkinGrpcCollector.class, + ZipkinActiveMQCollectorConfiguration.class, ZipkinKafkaCollectorConfiguration.class, ZipkinRabbitMQCollectorConfiguration.class, MetricsHealthController.class, diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfiguration.java new file mode 100644 index 00000000000..a3ebc33d532 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfiguration.java @@ -0,0 +1,64 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.server.internal.activemq; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.type.AnnotatedTypeMetadata; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.collector.activemq.ActiveMQCollector; +import zipkin2.storage.StorageComponent; + +/** Auto-configuration for {@link ActiveMQCollector}. */ +@Configuration +@Conditional(ZipkinActiveMQCollectorConfiguration.ActiveMQUrlSet.class) +@EnableConfigurationProperties(ZipkinActiveMQCollectorProperties.class) +public class ZipkinActiveMQCollectorConfiguration { + + @Bean(initMethod = "start") + ActiveMQCollector activeMq( + ZipkinActiveMQCollectorProperties properties, + CollectorSampler sampler, + CollectorMetrics metrics, + StorageComponent storage) { + return properties.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build(); + } + + /** + * This condition passes when {@link ZipkinActiveMQCollectorProperties#getUrl()}} is set to + * non-empty. + * + *

This is here because the yaml defaults this property to empty like this, and spring-boot + * doesn't have an option to treat empty properties as unset. + * + *

{@code
+   * url: ${ACTIVEMQ_URL:}
+   * }
+ */ + static final class ActiveMQUrlSet implements Condition { + @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata a) { + return !isEmpty( + context.getEnvironment().getProperty("zipkin.collector.activemq.url")); + } + + private static boolean isEmpty(String s) { + return s == null || s.isEmpty(); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorProperties.java new file mode 100644 index 00000000000..6dbe6fb9359 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorProperties.java @@ -0,0 +1,100 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.server.internal.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import zipkin2.collector.activemq.ActiveMQCollector; + +/** Properties for configuring and building a {@link ActiveMQCollector}. */ +@ConfigurationProperties("zipkin.collector.activemq") +class ZipkinActiveMQCollectorProperties { + /** URL of the ActiveMQ broker. */ + private String url; + + /** ActiveMQ queue from which to collect the Zipkin spans */ + private String queue; + + /** Client ID prefix for queue consumers */ + private String clientIdPrefix = "zipkin"; + + /** Number of concurrent span consumers */ + private Integer concurrency; + + /** Login user of the broker. */ + private String username; + + /** Login password of the broker. */ + private String password; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = emptyToNull(url); + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = emptyToNull(queue); + } + + public Integer getConcurrency() { + return concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = emptyToNull(username); + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = emptyToNull(password); + } + + public ActiveMQCollector.Builder toBuilder() { + final ActiveMQCollector.Builder result = ActiveMQCollector.builder(); + if (concurrency != null) result.concurrency(concurrency); + if (queue != null) result.queue(queue); + + ActiveMQConnectionFactory connectionFactory; + if (username != null) { + connectionFactory = new ActiveMQConnectionFactory(username, password, url); + } else { + connectionFactory = new ActiveMQConnectionFactory(url); + } + connectionFactory.setClientIDPrefix(clientIdPrefix); + result.connectionFactory(connectionFactory); + return result; + } + + private static String emptyToNull(String s) { + return "".equals(s) ? null : s; + } +} diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 8e88800339c..67907f6dd6f 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -9,6 +9,17 @@ zipkin: collector: # percentage to traces to retain sample-rate: ${COLLECTOR_SAMPLE_RATE:1.0} + activemq: + # ActiveMQ broker url. Ex. tcp://localhost:61616 or failover:(tcp://localhost:61616,tcp://remotehost:61616) + url: ${ACTIVEMQ_URL:} + # Queue from which to collect span messages. + queue: ${ACTIVEMQ_QUEUE:zipkin} + # Number of concurrent span consumers. + concurrency: ${ACTIVEMQ_CONCURRENCY:1} + # Optional username to connect to the broker + username: ${ACTIVEMQ_USERNAME:} + # Optional password to connect to the broker + password: ${ACTIVEMQ_PASSWORD:} http: # Set to false to disable creation of spans via HTTP collector API enabled: ${HTTP_COLLECTOR_ENABLED:true} @@ -189,6 +200,7 @@ spring: # otherwise we might initialize even when not needed (ex when storage type is cassandra) - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration - org.springframework.boot.autoconfigure.jooq.JooqAutoConfiguration + - org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration info: zipkin: version: "@project.version@" diff --git a/zipkin-server/src/test/kotlin/zipkin2/collector/activemq/ZipkinActiveMQCollectorPropertiesOverrideTest.kt b/zipkin-server/src/test/kotlin/zipkin2/collector/activemq/ZipkinActiveMQCollectorPropertiesOverrideTest.kt new file mode 100644 index 00000000000..770d123bf3e --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/collector/activemq/ZipkinActiveMQCollectorPropertiesOverrideTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq + +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import zipkin2.server.internal.activemq.Access + +@RunWith(Parameterized::class) +class ZipkinActiveMQPropertiesOverrideTest( + val property: String, + val value: Any, + val builderExtractor: (ActiveMQCollector.Builder) -> Any +) { + + companion object { + @JvmStatic @Parameterized.Parameters fun data(): List> { + return listOf( + parameters("url", "failover:(tcp://localhost:61616,tcp://remotehost:61616)", + { builder -> builder.connectionFactory.brokerURL.toString() }), + parameters("concurrency", 2, + { builder -> builder.concurrency }), + parameters("queue", "zapkin", + { builder -> builder.queue }), + parameters("client-id-prefix", "zipkin-prod", + { builder -> builder.connectionFactory.clientIDPrefix }), + parameters("username", "u", + { builder -> builder.connectionFactory.userName }), + parameters("password", "p", + { builder -> builder.connectionFactory.password }) + ) + } + + /** to allow us to define with a lambda */ + internal fun parameters( + propertySuffix: String, value: T, builderExtractor: (ActiveMQCollector.Builder) -> T + ): Array = arrayOf(propertySuffix, value, builderExtractor) + } + + val context = AnnotationConfigApplicationContext() + @After fun closeContext() = context.close() + + @Test fun propertyTransferredToCollectorBuilder() { + if (property != "url") { + TestPropertyValues.of("zipkin.collector.activemq.url:tcp://localhost:61616").applyTo(context) + } + + TestPropertyValues.of("zipkin.collector.activemq.$property:$value").applyTo(context) + + if (property == "username") { + TestPropertyValues.of("zipkin.collector.activemq.password:p").applyTo(context) + } + + if (property == "password") { + TestPropertyValues.of("zipkin.collector.activemq.username:u").applyTo(context) + } + + Access.registerActiveMQProperties(context) + context.refresh() + + assertThat(Access.collectorBuilder(context)) + .extracting(builderExtractor) + .isEqualTo(value) + } +} diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/Access.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/Access.kt new file mode 100644 index 00000000000..46ec42f58e6 --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/Access.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.server.internal.activemq + +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.context.annotation.Configuration +import zipkin2.collector.activemq.ActiveMQCollector + +/** opens package access for testing */ +object Access { + /** Just registering properties to avoid automatically connecting to a Active MQ server */ + fun registerActiveMQProperties(context: AnnotationConfigApplicationContext) = context.register( + PropertyPlaceholderAutoConfiguration::class.java, + EnableActiveMQCollectorProperties::class.java) + + @Configuration + @EnableConfigurationProperties(ZipkinActiveMQCollectorProperties::class) + open class EnableActiveMQCollectorProperties + + fun collectorBuilder(context: AnnotationConfigApplicationContext): ActiveMQCollector.Builder = + context.getBean(ZipkinActiveMQCollectorProperties::class.java).toBuilder() +} diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfigurationTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfigurationTest.kt new file mode 100644 index 00000000000..662acdfb456 --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorConfigurationTest.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.server.internal.activemq + +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.junit.After +import org.junit.Test +import org.springframework.beans.factory.BeanCreationException +import org.springframework.beans.factory.NoSuchBeanDefinitionException +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import zipkin2.collector.activemq.ActiveMQCollector +import zipkin2.server.internal.InMemoryCollectorConfiguration + +class ZipkinActiveMQCollectorConfigurationTest { + val context = AnnotationConfigApplicationContext() + @After fun closeContext() = context.close() + + @Test(expected = NoSuchBeanDefinitionException::class) + fun doesNotProvideCollectorComponent_whenAddressAndUriNotSet() { + context.register( + PropertyPlaceholderAutoConfiguration::class.java, + ZipkinActiveMQCollectorConfiguration::class.java, + InMemoryCollectorConfiguration::class.java) + context.refresh() + + context.getBean(ActiveMQCollector::class.java) + } + + @Test(expected = NoSuchBeanDefinitionException::class) + fun doesNotProvideCollectorComponent_whenUrlIsEmptyString() { + TestPropertyValues.of("zipkin.collector.activemq.uri:").applyTo(context) + context.register( + PropertyPlaceholderAutoConfiguration::class.java, + ZipkinActiveMQCollectorConfiguration::class.java, + InMemoryCollectorConfiguration::class.java) + context.refresh() + + context.getBean(ActiveMQCollector::class.java) + } + + @Test fun providesCollectorComponent_whenUrlSet() { + TestPropertyValues.of("zipkin.collector.activemq.url=vm://localhost").applyTo(context) + context.register( + PropertyPlaceholderAutoConfiguration::class.java, + ZipkinActiveMQCollectorConfiguration::class.java, + InMemoryCollectorConfiguration::class.java) + + try { + context.refresh() + fail("should have failed") + } catch (e: BeanCreationException) { + assertThat(e.cause).hasMessage( + "Unable to establish connection to ActiveMQ broker: Transport scheme NOT recognized: [vm]") + } + } +} diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorPropertiesTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorPropertiesTest.kt new file mode 100644 index 00000000000..c5c72378490 --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/activemq/ZipkinActiveMQCollectorPropertiesTest.kt @@ -0,0 +1,28 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.server.internal.activemq + +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +class ZipkinActiveMQCollectorPropertiesTest { + + /** This prevents an empty ACTIVEMQ_URL variable from being mistaken as a real one */ + @Test fun ignoresEmptyURL() { + val properties = ZipkinActiveMQCollectorProperties() + properties.url = "" + + assertThat(properties.url).isNull() + } +}