Skip to content

Commit

Permalink
Adds ActiveMQ 5.x transport
Browse files Browse the repository at this point in the history
Due to popular demand, this adds support for ActiveMQ 5.x.
This is enabled when the env variable `ACTIVEMQ_URL` is set to a valid
broker. Thanks very much to @IAMTJW for early work towards this change.

To try this change, you can use jitpack https://jitpack.io/#openzipkin/zipkin

Ex.
```bash
TAG=activemq-SNAPSHOT
curl -sSL https://jitpack.io/com/github/openzipkin/zipkin/zipkin-server/${TAG}/zipkin-server-${TAG}-exec.jar > zipkin.jar
ACTIVEMQ_URL=tcp://localhost:61616 java -jar zipkin.jar
```

Supercedes #2466
Fixes #1990
  • Loading branch information
Adrian Cole committed Jun 23, 2019
1 parent da7482f commit 7b361bb
Show file tree
Hide file tree
Showing 20 changed files with 1,133 additions and 2 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Features include both the collection and lookup of this data.

This repository includes a dependency-free Java library and a [spring-boot](http://projects.spring.io/spring-boot/) server.
Storage options include in-memory, MySQL, Apache Cassandra and Elasticsearch.
Transport options include HTTP, gRPC, Scribe (Apache Thrift), RabbitMQ and
Apache Kafka.
Transport options include HTTP, Apache ActiveMQ, Apache Kafka, gRPC, RabbitMQ
and Scribe (Apache Thrift).

## Quick-start

Expand Down
32 changes: 32 additions & 0 deletions zipkin-collector/activemq/RATIONALE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Rational for collector-activemq

## Diverse need
ActiveMQ was formerly requested in April, 2018 through issue #1990 which had two other thumbs-up. An
early draft of this implementation was developed by @IAMTJW and resulting in another user asking for
it. In June of 2019 there were a couple more requests for this on gitter, notably about Amazon MQ.

## On ActiveMQ 5.x
All users who expressed interest were interestd in ActiveMQ 5.x (aka Classic), not Artemis.
Moreover, at the time of creation Amazon MQ only supported ActiveMQ 5.x.

Artemis has higher throughput potential, but has more conflicting dependencies and would add 8MiB to
the server. Moreover, no-one has asked for it.

## On part of the default server
ActiveMQ's client is 2MiB, which will increase the jar size, something that we've been tracking
recently. To be fair, this is not a large module. In comparison, one dependency of Kafka, `zstd-jni`
alone is almost 4MiB. There are no dependencies likely to conflict at runtime, and only one dodgy
dependency, [hawtbuf](https://github.com/fusesource/hawtbuf), on account of it being abandoned since
2014.

Apart from size, ActiveMQ is a stable integration, included in Spring Boot, and could be useful for
other integrations as an in-memory queue. Moreover, bundling makes integration with zipkin-aws far
easier in the same way as bundling elasticsearch does.

## On a potential single-transport client

This package is using the normal activemq-jms client. During a [mail thread](http://activemq.2283324.n4.nabble.com/Interest-in-using-ActiveMQ-as-a-trace-data-transport-for-Zipkin-td4749755.html), we learned the
the STOMP and AMQP 1.0 protocol are the more portable options for a portable integration as
ActiveMQ, Artemis and RabbitMQ all support these. On the other hand Kafka does not support these
protocols. Any future portability work could be limited by this. Meanwhile, using the standard JMS
client will make troubleshooting most natural to end users.
21 changes: 21 additions & 0 deletions zipkin-collector/activemq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# collector-activemq

## ActiveMQCollector
This collector consumes an ActiveMQ 5.x queue for messages that contain a list of spans. Underneath
this uses the ActiveMQ 5.x JMS client, which has two notable dependencies `slf4j-api` and `hawtbuf`.

The message's binary data includes a list of spans. Supported encodings
are the same as the http [POST /spans](https://zipkin.io/zipkin-api/#/paths/%252Fspans) body.

### Json
The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).

`Codec.JSON.writeSpans(spans)` performs the correct json encoding.

Here's an example, sending a list of a single span to the zipkin queue:

```bash
$ curl -u admin:admin -X POST -s localhost:8161/api/message/zipkin?type=queue \
-H "Content-Type: application/json" \
-d '[{"traceId":"1","name":"bang","id":"2","timestamp":1470150004071068,"duration":1,"localEndpoint":{"serviceName":"flintstones"},"tags":{"lc":"bamm-bamm"}}]'
```
69 changes: 69 additions & 0 deletions zipkin-collector/activemq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2015-2019 The OpenZipkin Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-collector-parent</artifactId>
<version>2.14.3-SNAPSHOT</version>
</parent>

<artifactId>zipkin-collector-activemq</artifactId>
<name>Collector: ActiveMQ</name>
<description>Zipkin span collector for ActiveMQ transport</description>

<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
<amqp-client.version>5.15.9</amqp-client.version>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-collector</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>${amqp-client.version}</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${amqp-client.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<version>${amqp-client.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.collector.activemq;

import java.io.IOException;
import org.apache.activemq.ActiveMQConnectionFactory;
import zipkin2.CheckResult;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.storage.StorageComponent;

/** This collector consumes encoded binary messages from a ActiveMQ queue. */
public final class ActiveMQCollector extends CollectorComponent {
public static Builder builder() {
return new Builder();
}

/** Configuration including defaults needed to consume spans from a ActiveMQ queue. */
public static final class Builder extends CollectorComponent.Builder {
Collector.Builder delegate = Collector.newBuilder(ActiveMQCollector.class);
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
ActiveMQConnectionFactory connectionFactory;
String queue = "zipkin";
int concurrency = 1;

@Override public Builder storage(StorageComponent storage) {
this.delegate.storage(storage);
return this;
}

@Override public Builder sampler(CollectorSampler sampler) {
this.delegate.sampler(sampler);
return this;
}

@Override public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) throw new NullPointerException("metrics == null");
this.metrics = metrics.forTransport("activemq");
this.delegate.metrics(this.metrics);
return this;
}

public Builder connectionFactory(ActiveMQConnectionFactory connectionFactory) {
if (connectionFactory == null) throw new NullPointerException("connectionFactory == null");
this.connectionFactory = connectionFactory;
return this;
}

/** Queue zipkin spans will be consumed from. Defaults to "zipkin". */
public Builder queue(String queue) {
if (queue == null) throw new NullPointerException("queue == null");
this.queue = queue;
return this;
}

/** Count of concurrent message listeners on the queue. Defaults to 1 */
public Builder concurrency(int concurrency) {
if (concurrency < 1) throw new IllegalArgumentException("concurrency < 1");
this.concurrency = concurrency;
return this;
}

@Override public ActiveMQCollector build() {
if (connectionFactory == null) throw new NullPointerException("connectionFactory == null");
return new ActiveMQCollector(this);
}
}

final String queue;
final LazyInit lazyInit;

ActiveMQCollector(Builder builder) {
this.queue = builder.queue;
this.lazyInit = new LazyInit(builder);
}

@Override public ActiveMQCollector start() {
lazyInit.init();
return this;
}

@Override public CheckResult check() {
if (lazyInit.result == null) {
return CheckResult.failed(new IllegalStateException("Collector not yet started"));
}
return lazyInit.result.checkResult;
}

@Override public void close() throws IOException {
lazyInit.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.collector.activemq;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.transport.TransportListener;
import zipkin2.Callback;
import zipkin2.CheckResult;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Consumes spans from messages on a ActiveMQ queue. Malformed messages will be discarded. Errors in
* the storage component will similarly be ignored, with no retry of the message.
*/
final class ActiveMQSpanConsumer implements TransportListener, MessageListener, Closeable {
static final Callback<Void> NOOP = new Callback<Void>() {
@Override public void onSuccess(Void value) {
}

@Override public void onError(Throwable t) {
}
};

static final CheckResult
CLOSED = CheckResult.failed(new IllegalStateException("Collector intentionally closed")),
INTERRUPTION = CheckResult.failed(new IOException("Recoverable error on ActiveMQ connection"));

final Collector collector;
final CollectorMetrics metrics;

final ActiveMQConnection connection;
final Map<QueueSession, QueueReceiver> sessionToReceiver = new LinkedHashMap<>();

volatile CheckResult checkResult = CheckResult.OK;

ActiveMQSpanConsumer(Collector collector, CollectorMetrics metrics, ActiveMQConnection conn) {
this.collector = collector;
this.metrics = metrics;
this.connection = conn;
connection.addTransportListener(this);
}

/** JMS contract is one session per thread: we need a new session up to our concurrency level. */
void registerInNewSession(ActiveMQConnection connection, String queue) throws JMSException {
// Pass redundant info as we can't use default method in activeMQ
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// No need to do anything on ActiveMQ side as physical queues are created on demand
Queue destination = session.createQueue(queue);
QueueReceiver receiver = session.createReceiver(destination);
receiver.setMessageListener(this);
sessionToReceiver.put(session, receiver);
}

@Override public void onCommand(Object o) {
}

@Override public void onException(IOException error) {
checkResult = CheckResult.failed(error);
}

@Override public void transportInterupted() {
checkResult = INTERRUPTION;
}

@Override public void transportResumed() {
checkResult = CheckResult.OK;
}

@Override public void onMessage(Message message) {
metrics.incrementMessages();
byte[] serialized; // TODO: consider how to reuse buffers here
try {
if (message instanceof BytesMessage) {
BytesMessage bytesMessage = (BytesMessage) message;
serialized = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(serialized);
} else if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
serialized = text.getBytes(UTF_8);
} else {
metrics.incrementMessagesDropped();
return;
}
} catch (Exception e) {
metrics.incrementMessagesDropped();
return;
}

metrics.incrementBytes(serialized.length);
if (serialized.length == 0) return; // lenient on empty messages
collector.acceptSpans(serialized, NOOP);
}

@Override public void close() throws IOException {
if (checkResult == CLOSED) return;
checkResult = CLOSED;
connection.removeTransportListener(this);
try {
for (Map.Entry<QueueSession, QueueReceiver> sessionReceiver : sessionToReceiver.entrySet()) {
sessionReceiver.getValue().setMessageListener(null); // deregister this
sessionReceiver.getKey().close();
}
connection.close();
} catch (JMSException e) {
if (e.getLinkedException() instanceof IOException) {
throw (IOException) e.getLinkedException();
}
throw new IOException(e);
}
}
}
Loading

0 comments on commit 7b361bb

Please sign in to comment.