diff --git a/pom.xml b/pom.xml index da095303e18..f95460eee93 100755 --- a/pom.xml +++ b/pom.xml @@ -213,6 +213,12 @@ ${project.version} + + io.zipkin.zipkin2 + zipkin-collector-activemq + ${project.version} + + io.zipkin.zipkin2 zipkin-collector-scribe diff --git a/zipkin-autoconfigure/collector-activemq/pom.xml b/zipkin-autoconfigure/collector-activemq/pom.xml new file mode 100644 index 00000000000..90accadf1fc --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/pom.xml @@ -0,0 +1,43 @@ + + + + + zipkin-autoconfigure + io.zipkin.java + 2.12.7-SNAPSHOT + + 4.0.0 + + zipkin-autoconfigure-collector-activemq + + Auto Configuration: ActiveMQ Collector + + + ${project.basedir}/../.. + + + + + io.zipkin.zipkin2 + zipkin-collector-activemq + + + + + diff --git a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java new file mode 100644 index 00000000000..e4ab98b3e36 --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.autoconfigure.collector.activemq; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.*; +import org.springframework.core.type.AnnotatedTypeMetadata; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.collector.activemq.ActiveMQCollector; +import zipkin2.storage.StorageComponent; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** Auto-configuration for {@link ActiveMQCollector}. */ +@Configuration +@Conditional(ZipkinActiveMQCollectorAutoConfiguration.ActiveMQAddressesOrUriSet.class) +@EnableConfigurationProperties(ZipkinActiveMQCollectorProperties.class) +class ZipkinActiveMQCollectorAutoConfiguration { + + @Bean(initMethod = "start") + ActiveMQCollector activemq( + ZipkinActiveMQCollectorProperties properties, + CollectorSampler sampler, + CollectorMetrics metrics, + StorageComponent storage) + throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + return properties.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build(); + } + + /** + * This condition passes when {@link ZipkinActiveMQCollectorProperties#getAddresses()} is set to a non-empty value. + * + *

This is here because the yaml defaults this property to empty like this, and Spring Boot + * doesn't have an option to treat empty properties as unset. + * + *

{@code
+   * addresses: ${RABBIT_ADDRESSES:}
+   * uri: ${RABBIT_URI:}
+   * }
+ */ + static final class ActiveMQAddressesOrUriSet implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata a) { + return !isEmpty(context.getEnvironment().getProperty("zipkin.collector.activemq.addresses")); + } + + private static boolean isEmpty(String s) { + return s == null || s.isEmpty(); + } + } +} diff --git a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java new file mode 100644 index 00000000000..79513f9134c --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java @@ -0,0 +1,106 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.autoconfigure.collector.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import zipkin2.collector.activemq.ActiveMQCollector; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** Properties for configuring and building a {@link zipkin2.collector.activemq.ActiveMQCollector}. */ +@ConfigurationProperties("zipkin.collector.activemq") +class ZipkinActiveMQCollectorProperties { + + /** RabbitMQ server addresses in the form of a (comma-separated) list of host:port pairs */ + private String addresses; + + /** TCP connection timeout in milliseconds */ + private Integer connectionTimeout; + /** RabbitMQ user password */ + private String password; + /** RabbitMQ queue from which to collect the Zipkin spans */ + private String queue; + /** RabbitMQ username */ + private String username; + + + public String getAddresses() { + return addresses; + } + + public void setAddresses(String addresses) { + this.addresses = addresses; + } + + + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(Integer connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + + public ActiveMQCollector.Builder toBuilder() + throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException { + final ActiveMQCollector.Builder result = ActiveMQCollector.builder(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + if (connectionTimeout != null) { + connectionFactory.setConnectResponseTimeout(connectionTimeout); + } + if (queue != null) { + result.queue(queue); + } + if (addresses != null) { + result.addresses(addresses); + } + if (password != null) { + connectionFactory.setPassword(password); + } + if (username != null) { + connectionFactory.setUserName(username); + } + + result.connectionFactory(connectionFactory); + return result; + } +} diff --git a/zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000000..8063e6fc561 --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + zipkin2.autoconfigure.collector.activemq.ZipkinActiveMQCollectorAutoConfiguration diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml index 726835e09d8..d6dfda62105 100644 --- a/zipkin-autoconfigure/pom.xml +++ b/zipkin-autoconfigure/pom.xml @@ -46,6 +46,7 @@ storage-mysql storage-cassandra metrics-prometheus + collector-activemq diff --git a/zipkin-collector/activemq/README.md b/zipkin-collector/activemq/README.md new file mode 100644 index 00000000000..2c5cb718a0a --- /dev/null +++ b/zipkin-collector/activemq/README.md @@ -0,0 +1,56 @@ +# collector-activemq + +## ActiveMQCollector +This collector consumes a ActiveMQ queue for messages that contain a list of spans. +Its only dependencies besides Zipkin core are the `slf4j-api` and the [ActiveMQ Java Client](https://github.com/apache/activemq). + +### Configuration + +The following configuration can be set for the ActiveMQ Collector. + +Property | Environment Variable | Description +--- | --- | --- +`zipkin.collector.activemq.connection-timeout` | `ACTIVEMQ_CONNECTION_TIMEOUT` | Milliseconds to wait establishing a connection. Defaults to `60000` (1 minute) +`zipkin.collector.activemq.queue` | `ACTIVEMQ_QUEUE` | Queue from which to collect span messages. Defaults to `zipkin` + +If the URI is set, the following properties will be ignored. + +Property | Environment Variable | Description +--- | --- | --- +`zipkin.collector.activemq.addresses` | `ACTIVEMQ_ADDRESSES` | Comma-separated list of ActiveMQ addresses, ex. `tcp://localhost:61616,tcp://localhost2:61616` +`zipkin.collector.activemq.password` | `ACTIVEMQ_PASSWORD`| Password to use when connecting to ActiveMQ. Defaults to `system` +`zipkin.collector.activemq.username` | `ACTIVEMQ_USER` | Username to use when connecting to ActiveMQ. Defaults to `manager` + +### Caveats + +The configured queue will be idempotently declared as a durable queue. + + +Consumption is done with `autoAck` on, so messages that fail to process successfully are not retried. + +## Encoding spans into ActiveMQ messages +The message's body should be the bytes of an encoded list of spans. + +### JSON +A list of Spans in JSON. The first character must be '[' (decimal 91). + +`SpanBytesEncoder.JSON_V2.encodeList(spans)` performs the correct JSON encoding. + +## Local testing + +The following assumes you are running an instance of ActiveMQ locally on the default port (61616). +You can download and install ActiveMQ following [instructions available here](http://activemq.apache.org/download.html). +With the [ActiveMQ Management Admin](http://localhost:8161/admin/) you can easily publish +one-off spans to ActiveMQ to be collected by this collector. + +1. Start ActiveMQ server +2. Start Zipkin server +```bash +$ ACTIVEMQ_ADDRESSES=tcp://localhost:61616 java -jar zipkin.jar +``` +3. Save an array of spans to a file like `sample-spans.json` +```json +[{"traceId":"9032b04972e475c5","id":"9032b04972e475c5","kind":"SERVER","name":"get","timestamp":1505990621526000,"duration":612898,"localEndpoint":{"serviceName":"brave-webmvc-example","ipv4":"192.168.1.113"},"remoteEndpoint":{"serviceName":"","ipv4":"127.0.0.1","port":60149},"tags":{"error":"500 Internal Server Error","http.path":"/a"}}] +``` +4. Publish them using the admin + diff --git a/zipkin-collector/activemq/pom.xml b/zipkin-collector/activemq/pom.xml new file mode 100644 index 00000000000..9178707eea6 --- /dev/null +++ b/zipkin-collector/activemq/pom.xml @@ -0,0 +1,79 @@ + + + + + zipkin-collector-parent + io.zipkin.zipkin2 + 2.12.7-SNAPSHOT + + 4.0.0 + + zipkin-collector-activemq + + Collector: ActiveMQ + Zipkin span collector for ActiveMQ transport + + + ${project.basedir}/../.. + + 5.15.0 + + + + + ${project.groupId} + zipkin-collector + + + + org.apache.activemq + activemq-client + ${activemq-client.version} + + + + org.apache.activemq + activemq-all + ${activemq-client.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + org.testcontainers + testcontainers + test + + + + ${project.groupId} + zipkin + test-jar + test + + + + diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java new file mode 100644 index 00000000000..dad964f3863 --- /dev/null +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -0,0 +1,234 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import zipkin2.Callback; +import zipkin2.CheckResult; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorComponent; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.storage.StorageComponent; + +import javax.jms.*; +import java.io.IOException; +import java.lang.IllegalStateException; +import java.util.concurrent.atomic.AtomicReference; + + +/** This collector consumes encoded binary messages from a ActiveMQ queue. */ +public final class ActiveMQCollector extends CollectorComponent { + + static final Callback NOOP = + new Callback() { + @Override + public void onSuccess(Void value) {} + + @Override + public void onError(Throwable t) {} + }; + + public static Builder builder() { + return new Builder(); + } + + /** Configuration including defaults needed to consume spans from a ActiveMQ queue. */ + public static final class Builder extends CollectorComponent.Builder { + Collector.Builder delegate = Collector.newBuilder(ActiveMQCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + String queue = "zipkin"; + String addresses; + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + + + @Override + public Builder storage(StorageComponent storage) { + this.delegate.storage(storage); + return this; + } + + @Override + public Builder sampler(CollectorSampler sampler) { + this.delegate.sampler(sampler); + return this; + } + + @Override + public Builder metrics(CollectorMetrics metrics) { + if (metrics == null) { + throw new NullPointerException("metrics == null"); + } + this.metrics = metrics.forTransport("activemq"); + this.delegate.metrics(this.metrics); + return this; + } + + public Builder addresses(String addresses) { + this.addresses = addresses; + return this; + } + + public Builder connectionFactory(ActiveMQConnectionFactory connectionFactory) { + if (connectionFactory == null) { + throw new NullPointerException("connectionFactory == null"); + } + return this; + } + + /** + * Queue zipkin spans will be consumed from. Defaults to "zipkin". + */ + public Builder queue(String queue) { + if (queue == null) { + throw new NullPointerException("queue == null"); + } + return this; + } + + public Builder username(String username) { + connectionFactory.setUserName(username); + return this; + } + + /** The password to use when connecting to the broker. Defaults to "guest" */ + public Builder password(String password) { + connectionFactory.setPassword(password); + return this; + } + + @Override + public ActiveMQCollector build() { + return new ActiveMQCollector(this); + } + } + + final String queue; + final LazyInit connection; + + ActiveMQCollector(Builder builder) { + this.queue = builder.queue; + this.connection = new LazyInit(builder); + } + + @Override + public ActiveMQCollector start() { + connection.get(); + return this; + } + + @Override + public CheckResult check() { + try { + CheckResult failure = connection.failure.get(); + if (failure != null) return failure; + return CheckResult.OK; + } catch (RuntimeException e) { + return CheckResult.failed(e); + } + } + + @Override + public void close() throws IOException { + connection.close(); + } + + /** Lazy creates a connection and a queue before starting consumers */ + static final class LazyInit { + final Builder builder; + final AtomicReference failure = new AtomicReference<>(); + Connection connection; + + LazyInit(Builder builder) { + this.builder = builder; + } + + protected Connection compute() { + try { + builder.connectionFactory.setBrokerURL("failover:("+builder.addresses+")?initialReconnectDelay=100&maxReconnectAttempts=10"); + connection = builder.connectionFactory.createConnection(); + connection.start(); + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(builder.queue); + MessageConsumer messageConsumer = session.createConsumer(destination); + Collector collector = builder.delegate.build(); + CollectorMetrics metrics = builder.metrics; + messageConsumer.setMessageListener(new ActiveMQSpanConsumerMessageListener(collector,metrics)); + }catch (JMSException e){ + throw new IllegalStateException("Unable to establish connection to ActiveMQ server", e); + } + return connection; + } + + void close() throws IOException { + Connection maybeConnection = connection; + if (maybeConnection != null) { + try { + maybeConnection.close(); + }catch (Exception e){ + throw new IOException(e); + } + + } + } + + Connection get() { + if (connection == null) { + synchronized (this) { + if (connection == null) { + connection = compute(); + } + } + } + return connection; + } + + + } + + + /** + * Consumes spans from messages on a ActiveMQ queue. Malformed messages will be discarded. Errors + * in the storage component will similarly be ignored, with no retry of the message. + */ + static class ActiveMQSpanConsumerMessageListener implements MessageListener { + final Collector collector; + final CollectorMetrics metrics; + + ActiveMQSpanConsumerMessageListener(Collector collector, CollectorMetrics metrics) { + this.collector = collector; + this.metrics = metrics; + } + + @Override + public void onMessage(Message message) { + metrics.incrementMessages(); + try { + if(message instanceof BytesMessage) { + byte [] data = new byte[(int)((BytesMessage)message).getBodyLength()]; + ((BytesMessage)message).readBytes(data); + this.collector.acceptSpans(data, NOOP); + }else if(message instanceof TextMessage){ + String text = ((TextMessage)message).getText(); + byte [] data = text.getBytes(); + this.collector.acceptSpans(data, NOOP); + } + }catch (Exception e){ + } + } + + + } + +} diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java new file mode 100644 index 00000000000..c66838e115e --- /dev/null +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java @@ -0,0 +1,141 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.AssumptionViolatedException; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import zipkin2.CheckResult; +import zipkin2.collector.InMemoryCollectorMetrics; +import zipkin2.storage.InMemoryStorage; + +import javax.jms.*; +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +class ActiveMQCollectorRule extends ExternalResource { + static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQCollectorRule.class); + + final InMemoryStorage storage = InMemoryStorage.newBuilder().build(); + final InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); + + final InMemoryCollectorMetrics activemqMetrics = metrics.forTransport("activemq"); + + + GenericContainer container; + ActiveMQCollector collector; + + BrokerService broker; + + + + @Override + protected void before() throws Throwable { + + BrokerService broker; + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setBrokerName("MyBroker"); + broker.setPersistent(false); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + + Thread.sleep(10000); + + + try { + this.collector = tryToInitializeCollector(); + } catch (RuntimeException| Error e) { + if (container == null) throw e; + container.stop(); + container = null; // try with local connection instead + this.collector = tryToInitializeCollector(); + } + } + + ActiveMQCollector tryToInitializeCollector() { + ActiveMQCollector result = computeCollectorBuilder().build(); + result.start(); + + CheckResult check = result.check(); + if (!check.ok()) { + throw new AssumptionViolatedException(check.error().getMessage(), check.error()); + } + return result; + } + + ActiveMQCollector.Builder computeCollectorBuilder() { + return ActiveMQCollector.builder() + .storage(storage) + .metrics(metrics) + .queue("zipkin") + .addresses("tcp://localhost:61616"); + } + + + void publish(byte[] message) throws IOException, TimeoutException { + ConnectionFactory connectionFactory; //创建链接工厂 + Connection connection = null;//链接 + Session session;//创建会话 + Destination destination;//消息目的地 消息队列 + MessageProducer messageProducer;//消息生产者 + //实例化链接工厂 参数为 用户,密码,url + connectionFactory = new ActiveMQConnectionFactory("", "", ActiveMQConnection.DEFAULT_BROKER_URL); + try { + connection=connectionFactory.createConnection();//通过链接工厂创建链接 + connection.start();//启动链接 + //创建会话 Session.AUTO_ACKNOWLEDGE。receive 或MessageListener.onMessage()成功返回的时候,自动确认收到消息。 + session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //创建一个消息队列名称为hello ActiveMQ 消息队列中可包含需要发布消息 + destination = session.createQueue("zipkin"); + //将创建的消息队列hello ActiveMQ交给消息发布者messageProdecer + messageProducer=session.createProducer(destination); + + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(message); + messageProducer.send(bytesMessage); + + + } catch (JMSException e) { + e.printStackTrace(); + }finally{ + try { + //关闭连接 + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + } + + + @Override + protected void after() { + try { + if (collector != null) collector.close(); + } catch (IOException e) { + LOGGER.warn("error closing collector " + e.getMessage(), e); + } finally { + if (container != null) { + container.stop(); + } + } + } +} diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java new file mode 100644 index 00000000000..703a3be2da7 --- /dev/null +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java @@ -0,0 +1,127 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin2.Span; +import zipkin2.TestObjects; +import zipkin2.codec.SpanBytesEncoder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +public class ITActiveMQCollector { + List spans = Arrays.asList(TestObjects.LOTS_OF_SPANS[0], TestObjects.LOTS_OF_SPANS[1]); + + @ClassRule + public static ActiveMQCollectorRule activemq = new ActiveMQCollectorRule(); + + @After public void clear() { + activemq.metrics.clear(); + activemq.storage.clear(); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test public void checkPasses() { + assertThat(activemq.collector.check().ok()).isTrue(); + } + + @Test + public void startFailsWithInvalidActiveMqServer() throws Exception { + // we can be pretty certain ActiveMQ isn't running on localhost port 61614 + String notActiveMqAddress = "localhost:61614"; + try (ActiveMQCollector collector = ActiveMQCollector.builder().addresses(notActiveMqAddress).build()) { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to establish connection to ActiveMQ server"); + collector.start(); + } + } + + + /** Ensures list encoding works: a json encoded list of spans */ + @Test + public void messageWithMultipleSpans_json() throws Exception { + byte[] message = SpanBytesEncoder.JSON_V1.encodeList(spans); + activemq.publish(message); + + Thread.sleep(1000); + assertThat(activemq.storage.acceptedSpanCount()).isEqualTo(spans.size()); + + assertThat(activemq.activemqMetrics.messages()).isEqualTo(1); + assertThat(activemq.activemqMetrics.bytes()).isEqualTo(message.length); + assertThat(activemq.activemqMetrics.spans()).isEqualTo(spans.size()); + } + + /** Ensures list encoding works: a version 2 json list of spans */ + @Test + public void messageWithMultipleSpans_json2() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.JSON_V2); + } + + /** Ensures list encoding works: proto3 ListOfSpans */ + @Test + public void messageWithMultipleSpans_proto3() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.PROTO3); + } + + void messageWithMultipleSpans(SpanBytesEncoder encoder) + throws IOException, TimeoutException, InterruptedException { + + byte[] message = encoder.encodeList(spans); + activemq.publish(message); + + Thread.sleep(10000); + assertThat(activemq.storage.acceptedSpanCount()).isEqualTo(spans.size()); + + assertThat(activemq.activemqMetrics.messages()).isEqualTo(1); + assertThat(activemq.activemqMetrics.bytes()).isEqualTo(message.length); + assertThat(activemq.activemqMetrics.spans()).isEqualTo(spans.size()); + } + + /** Ensures malformed spans don't hang the collector */ + @Test + public void skipsMalformedData() throws Exception { + activemq.publish(SpanBytesEncoder.JSON_V2.encodeList(spans)); + activemq.publish(new byte[0]); + activemq.publish("[\"='".getBytes()); // screwed up json + activemq.publish("malformed".getBytes()); + activemq.publish(SpanBytesEncoder.JSON_V2.encodeList(spans)); + + Thread.sleep(1000); + assertThat(activemq.activemqMetrics.messages()).isEqualTo(5); + assertThat(activemq.activemqMetrics.messagesDropped()).isEqualTo(3); + } + + /** Guards against errors that leak from storage, such as InvalidQueryException */ + @Test + public void skipsOnSpanConsumerException() { + // TODO: reimplement + } + + @Test + public void messagesDistributedAcrossMultipleThreadsSuccessfully() { + // TODO: reimplement + } + + +} diff --git a/zipkin-collector/activemq/src/test/resource/log4j2.properties b/zipkin-collector/activemq/src/test/resource/log4j2.properties new file mode 100644 index 00000000000..551eb198208 --- /dev/null +++ b/zipkin-collector/activemq/src/test/resource/log4j2.properties @@ -0,0 +1,29 @@ +# +# Copyright 2015-2019 The OpenZipkin Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + +logger.activemq.name=zipkin.collector.activemq +logger.activemq.level=debug + +# stop huge spam +logger.dockerclient.name=org.testcontainers.dockerclient +logger.dockerclient.level=off diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml index edc3c2b878b..e32e20bc382 100644 --- a/zipkin-collector/pom.xml +++ b/zipkin-collector/pom.xml @@ -39,6 +39,7 @@ rabbitmq scribe kafka08 + activemq diff --git a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java index 9b7f764d856..37139da277d 100644 --- a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java +++ b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java @@ -57,8 +57,7 @@ public void checkPasses() { public void startFailsWithInvalidRabbitMqServer() throws Exception { // we can be pretty certain RabbitMQ isn't running on localhost port 80 String notRabbitMqAddress = "localhost:80"; - try (RabbitMQCollector collector = - builder().addresses(Collections.singletonList(notRabbitMqAddress)).build()) { + try (RabbitMQCollector collector = builder().addresses(Collections.singletonList(notRabbitMqAddress)).build()) { thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to establish connection to RabbitMQ server"); collector.start(); diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index e9ff4a58b36..1b11cad7737 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -208,6 +208,14 @@ true
+ + + io.zipkin.java + zipkin-autoconfigure-collector-activemq + ${project.version} + true + + io.zipkin.java diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 3ef7272fc19..64b89bba824 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -34,6 +34,14 @@ zipkin: virtual-host: ${RABBIT_VIRTUAL_HOST:/} useSsl: ${RABBIT_USE_SSL:false} uri: ${RABBIT_URI:} + activemq: + # RabbitMQ server address list (comma-separated list of tcp://host:port) + addresses: ${ACTIVEMQ_ADDRESSES:} + # TCP connection timeout in milliseconds + connection-timeout: ${ACTIVEMQ_CONNECTION_TIMEOUT:60000} + password: ${ACTIVEMQ_PASSWORD:system} + queue: ${ACTIVEMQ_QUEUE:zipkin} + username: ${ACTIVEMQ_USER:manager} query: enabled: ${QUERY_ENABLED:true} # 1 day in millis