From 9e248af93f4cc6b5fdda7359b5912dfc1d5b8fe4 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 16:26:38 +0800 Subject: [PATCH 01/16] add support activemq --- zipkin-collector/activemq/pom.xml | 56 ++++ .../collector/activemq/ActiveMQCollector.java | 249 ++++++++++++++++++ .../activemq/ActiveMQCollectorRule.java | 84 ++++++ .../activemq/ITActiveMQCollector.java | 48 ++++ .../src/test/resource/log4j2.properties | 15 ++ zipkin-collector/pom.xml | 1 + 6 files changed, 453 insertions(+) create mode 100644 zipkin-collector/activemq/pom.xml create mode 100644 zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java create mode 100644 zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java create mode 100644 zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java create mode 100644 zipkin-collector/activemq/src/test/resource/log4j2.properties diff --git a/zipkin-collector/activemq/pom.xml b/zipkin-collector/activemq/pom.xml new file mode 100644 index 00000000000..80c18e6fdcd --- /dev/null +++ b/zipkin-collector/activemq/pom.xml @@ -0,0 +1,56 @@ + + + + zipkin-collector-parent + io.zipkin.zipkin2 + 2.12.7-SNAPSHOT + + 4.0.0 + + zipkin-collector-activemq + + Collector: ActiveMQ + Zipkin span collector for ActiveMQ transport + + + ${project.basedir}/../.. + + 5.15.0 + + + + + ${project.groupId} + zipkin-collector + + + + org.apache.activemq + activemq-client + ${activemq-client.version} + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + org.testcontainers + testcontainers + test + + + + ${project.groupId} + zipkin + test-jar + test + + + + diff --git a/zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java new file mode 100644 index 00000000000..71417e2360a --- /dev/null +++ b/zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java @@ -0,0 +1,249 @@ +/** + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import zipkin2.Callback; +import zipkin2.CheckResult; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorComponent; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.storage.StorageComponent; + +import javax.jms.*; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + + +/** This collector consumes encoded binary messages from a RabbitMQ queue. */ +public final class ActiveMQCollector extends CollectorComponent { + + static final Callback NOOP = + new Callback() { + @Override + public void onSuccess(Void value) {} + + @Override + public void onError(Throwable t) {} + }; + + public static Builder builder() { + return new Builder(); + } + + /** Configuration including defaults needed to consume spans from a RabbitMQ queue. */ + public static final class Builder extends CollectorComponent.Builder { + Collector.Builder delegate = Collector.newBuilder(ActiveMQCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + String queue = "zipkin"; + String addresses; + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + + + @Override + public Builder storage(StorageComponent storage) { + this.delegate.storage(storage); + return this; + } + + @Override + public Builder sampler(CollectorSampler sampler) { + this.delegate.sampler(sampler); + return this; + } + + @Override + public Builder metrics(CollectorMetrics metrics) { + if (metrics == null) { + throw new NullPointerException("metrics == null"); + } + this.delegate.metrics(this.metrics); + return this; + } + + public Builder addresses(String addresses) { + this.addresses = addresses; + return this; + } + + public Builder connectionFactory(ActiveMQConnectionFactory connectionFactory) { + if (connectionFactory == null) { + throw new NullPointerException("connectionFactory == null"); + } + return this; + } + + /** + * Queue zipkin spans will be consumed from. Defaults to "zipkin-spans". + */ + public Builder queue(String queue) { + if (queue == null) { + throw new NullPointerException("queue == null"); + } + return this; + } + + public Builder username(String username) { + ((ActiveMQConnectionFactory)connectionFactory).setUserName(username); + return this; + } + + /** The password to use when connecting to the broker. Defaults to "guest" */ + public Builder password(String password) { + ((ActiveMQConnectionFactory)connectionFactory).setPassword(password); + return this; + } + + @Override + public ActiveMQCollector build() { + return new ActiveMQCollector(this); + } + } + + final String queue; + final LazyInit connection; + + ActiveMQCollector(Builder builder) { + this.queue = builder.queue; + this.connection = new LazyInit(builder); + } + + @Override + public ActiveMQCollector start() { + connection.get(); + return this; + } + + @Override + public CheckResult check() { + try { + CheckResult failure = connection.failure.get(); + if (failure != null) return failure; + return CheckResult.OK; + } catch (RuntimeException e) { + return CheckResult.failed(e); + } + } + + @Override + public void close() throws IOException { + connection.close(); + } + + /** Lazy creates a connection and a queue before starting consumers */ + static final class LazyInit { + final Builder builder; + final AtomicReference failure = new AtomicReference<>(); + ActivemqConnection activemqConnection; + + LazyInit(Builder builder) { + this.builder = builder; + } + + protected ActivemqConnection compute() { + try { + builder.connectionFactory.setBrokerURL(builder.addresses); + Connection connection = builder.connectionFactory.createConnection(); + connection.start(); + activemqConnection = new ActivemqConnection(connection); + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(builder.queue); + MessageConsumer messageConsumer = session.createConsumer(destination); + + Collector collector = builder.delegate.build(); + CollectorMetrics metrics = builder.metrics; + + messageConsumer.setMessageListener(new ActiveMQSpanConsumer(collector,metrics)); + }catch (Exception e){ + + } + return activemqConnection; + } + + void close() throws IOException { + ActivemqConnection maybeConnection = activemqConnection; + if (maybeConnection != null) { + maybeConnection.close(); + } + } + + ActivemqConnection get() { + if (activemqConnection == null) { + synchronized (this) { + if (activemqConnection == null) { + activemqConnection = compute(); + } + } + } + return activemqConnection; + } + + + } + + static class ActivemqConnection implements Closeable { + Connection connection = null; + + public ActivemqConnection(Connection connection ){ + this.connection = connection; + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public void close() throws IOException { + try { + connection.close(); + }catch (Exception e){ + throw new IOException(e); + } + } + } + + + /** + * Consumes spans from messages on a ActiveMQ queue. Malformed messages will be discarded. Errors + * in the storage component will similarly be ignored, with no retry of the message. + */ + static class ActiveMQSpanConsumer implements MessageListener { + final Collector collector; + final CollectorMetrics metrics; + + ActiveMQSpanConsumer(Collector collector, CollectorMetrics metrics) { + this.collector = collector; + this.metrics = metrics; + } + + @Override + public void onMessage(Message message) { + metrics.incrementMessages(); + if(message instanceof BytesMessage) { + try { + byte [] data = new byte[(int)((BytesMessage)message).getBodyLength()]; + ((BytesMessage)message).readBytes(data); + this.collector.acceptSpans(data, NOOP); + }catch (Exception e){ + } + } + } + + + } + +} diff --git a/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java b/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java new file mode 100644 index 00000000000..31c48f22580 --- /dev/null +++ b/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java @@ -0,0 +1,84 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.activemq; + +import org.junit.AssumptionViolatedException; +import org.junit.rules.ExternalResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import zipkin2.CheckResult; +import zipkin2.collector.InMemoryCollectorMetrics; +import zipkin2.storage.InMemoryStorage; + +import java.io.IOException; + +class ActiveMQCollectorRule extends ExternalResource { + static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQCollectorRule.class); + + final InMemoryStorage storage = InMemoryStorage.newBuilder().build(); + final InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); + + GenericContainer container; + ActiveMQCollector collector; + + + + @Override + protected void before() throws Throwable { + + try { + this.collector = tryToInitializeCollector(); + } catch (RuntimeException| Error e) { + if (container == null) throw e; + container.stop(); + container = null; // try with local connection instead + this.collector = tryToInitializeCollector(); + } + } + + ActiveMQCollector tryToInitializeCollector() { + ActiveMQCollector result = computeCollectorBuilder().build(); + result.start(); + + CheckResult check = result.check(); + if (!check.ok()) { + throw new AssumptionViolatedException(check.error().getMessage(), check.error()); + } + return result; + } + + ActiveMQCollector.Builder computeCollectorBuilder() { + return ActiveMQCollector.builder() + .storage(storage) + .metrics(metrics) + .queue("zipkin") + .addresses("tcp://localhost:61616"); + } + + + + @Override + protected void after() { + try { + if (collector != null) collector.close(); + } catch (IOException e) { + LOGGER.warn("error closing collector " + e.getMessage(), e); + } finally { + if (container != null) { + container.stop(); + } + } + } +} diff --git a/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java new file mode 100644 index 00000000000..1f79209b02f --- /dev/null +++ b/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java @@ -0,0 +1,48 @@ +/** + * Copyright 2015-2018 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.collector.activemq; + +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin2.Span; +import zipkin2.TestObjects; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +public class ITActiveMQCollector { + List spans = Arrays.asList(TestObjects.LOTS_OF_SPANS[0], TestObjects.LOTS_OF_SPANS[1]); + + @ClassRule + public static ActiveMQCollectorRule activemq = new ActiveMQCollectorRule(); + + @After public void clear() { + activemq.metrics.clear(); + activemq.storage.clear(); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test public void checkPasses() { + assertThat(activemq.collector.check().ok()).isTrue(); + } + + + +} diff --git a/zipkin-collector/activemq/src/test/resource/log4j2.properties b/zipkin-collector/activemq/src/test/resource/log4j2.properties new file mode 100644 index 00000000000..6145bd7528d --- /dev/null +++ b/zipkin-collector/activemq/src/test/resource/log4j2.properties @@ -0,0 +1,15 @@ +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + +logger.activemq.name=zipkin.collector.activemq +logger.activemq.level=debug + +# stop huge spam +logger.dockerclient.name=org.testcontainers.dockerclient +logger.dockerclient.level=off diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml index edc3c2b878b..e32e20bc382 100644 --- a/zipkin-collector/pom.xml +++ b/zipkin-collector/pom.xml @@ -39,6 +39,7 @@ rabbitmq scribe kafka08 + activemq From 15207a9eb3c4909fd1ac7fae76e0e4b99a456580 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:12:28 +0800 Subject: [PATCH 02/16] add support activemq --- .../collector-activemq/pom.xml | 43 ++++ ...kinActiveMQCollectorAutoConfiguration.java | 65 ++++++ .../ZipkinActiveMQCollectorProperties.java | 108 +++++++++ .../main/resources/META-INF/spring.factories | 2 + .../main/resources/zipkin-server-shared.yml | 221 ++++++++++++++++++ 5 files changed, 439 insertions(+) create mode 100644 zipkin-autoconfigure/collector-activemq/pom.xml create mode 100644 zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java create mode 100644 zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java create mode 100644 zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories create mode 100644 zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml diff --git a/zipkin-autoconfigure/collector-activemq/pom.xml b/zipkin-autoconfigure/collector-activemq/pom.xml new file mode 100644 index 00000000000..90accadf1fc --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/pom.xml @@ -0,0 +1,43 @@ + + + + + zipkin-autoconfigure + io.zipkin.java + 2.12.7-SNAPSHOT + + 4.0.0 + + zipkin-autoconfigure-collector-activemq + + Auto Configuration: ActiveMQ Collector + + + ${project.basedir}/../.. + + + + + io.zipkin.zipkin2 + zipkin-collector-activemq + + + + + diff --git a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java new file mode 100644 index 00000000000..22b0db8cd2d --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.autoconfigure.collector.activemq; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.*; +import org.springframework.core.type.AnnotatedTypeMetadata; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.collector.activemq.ActiveMQCollector; +import zipkin2.storage.StorageComponent; + +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** Auto-configuration for {@link ActiveMQCollector}. */ +@Configuration +@Conditional(ZipkinActiveMQCollectorAutoConfiguration.ActiveMQAddressesOrUriSet.class) +@EnableConfigurationProperties(ZipkinActiveMQCollectorProperties.class) +class ZipkinActiveMQCollectorAutoConfiguration { + + @Bean(initMethod = "start") + ActiveMQCollector activemq( + ZipkinActiveMQCollectorProperties properties, + CollectorSampler sampler, + CollectorMetrics metrics, + StorageComponent storage) + throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException { + return properties.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build(); + } + + /** + * This condition passes when {@link ZipkinActiveMQCollectorProperties#getAddresses()} is set to a non-empty value. + * + *

This is here because the yaml defaults this property to empty like this, and Spring Boot + * doesn't have an option to treat empty properties as unset. + * + *

{@code
+   * addresses: ${RABBIT_ADDRESSES:}
+   * uri: ${RABBIT_URI:}
+   * }
+ */ + static final class ActiveMQAddressesOrUriSet implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata a) { + return !isEmpty(context.getEnvironment().getProperty("zipkin.collector.rabbitmq.addresses")); + } + + private static boolean isEmpty(String s) { + return s == null || s.isEmpty(); + } + } +} diff --git a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java new file mode 100644 index 00000000000..74d0b249f60 --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java @@ -0,0 +1,108 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.autoconfigure.collector.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import zipkin2.collector.activemq.ActiveMQCollector; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; + +/** Properties for configuring and building a {@link zipkin2.collector.activemq.ActiveMQCollector}. */ +@ConfigurationProperties("zipkin.collector.rabbitmq") +class ZipkinActiveMQCollectorProperties { + static final URI EMPTY_URI = URI.create(""); + + /** RabbitMQ server addresses in the form of a (comma-separated) list of host:port pairs */ + private String addresses; + + /** TCP connection timeout in milliseconds */ + private Integer connectionTimeout; + /** RabbitMQ user password */ + private String password; + /** RabbitMQ queue from which to collect the Zipkin spans */ + private String queue; + /** RabbitMQ username */ + private String username; + + + public String getAddresses() { + return addresses; + } + + public void setAddresses(String addresses) { + this.addresses = addresses; + } + + + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(Integer connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + + public ActiveMQCollector.Builder toBuilder() + throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException { + final ActiveMQCollector.Builder result = ActiveMQCollector.builder(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + if (connectionTimeout != null) { + connectionFactory.setConnectResponseTimeout(connectionTimeout); + } + if (queue != null) { + result.queue(queue); + } + if (addresses != null) { + result.addresses(addresses); + } + if (password != null) { + connectionFactory.setPassword(password); + } + if (username != null) { + connectionFactory.setUserName(username); + } + + result.connectionFactory(connectionFactory); + return result; + } +} diff --git a/zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000000..8063e6fc561 --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + zipkin2.autoconfigure.collector.activemq.ZipkinActiveMQCollectorAutoConfiguration diff --git a/zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml b/zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml new file mode 100644 index 00000000000..3eea7c435d7 --- /dev/null +++ b/zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml @@ -0,0 +1,221 @@ +zipkin: + self-tracing: + # Set to true to enable self-tracing. + enabled: ${SELF_TRACING_ENABLED:false} + # percentage to self-traces to retain + sample-rate: ${SELF_TRACING_SAMPLE_RATE:1.0} + # Timeout in seconds to flush self-tracing data to storage. + message-timeout: ${SELF_TRACING_FLUSH_INTERVAL:1} + collector: + # percentage to traces to retain + sample-rate: ${COLLECTOR_SAMPLE_RATE:1.0} + http: + # Set to false to disable creation of spans via HTTP collector API + enabled: ${HTTP_COLLECTOR_ENABLED:true} + kafka: + # Kafka bootstrap broker list, comma-separated host:port values. Setting this activates the + # Kafka 0.10+ collector. + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:} + # Name of topic to poll for spans + topic: ${KAFKA_TOPIC:zipkin} + # Consumer group this process is consuming on behalf of. + group-id: ${KAFKA_GROUP_ID:zipkin} + # Count of consumer threads consuming the topic + streams: ${KAFKA_STREAMS:1} + rabbitmq: + # RabbitMQ server address list (comma-separated list of host:port) + addresses: ${RABBIT_ADDRESSES:} + concurrency: ${RABBIT_CONCURRENCY:1} + # TCP connection timeout in milliseconds + connection-timeout: ${RABBIT_CONNECTION_TIMEOUT:60000} + password: ${RABBIT_PASSWORD:guest} + queue: ${RABBIT_QUEUE:zipkin} + username: ${RABBIT_USER:guest} + virtual-host: ${RABBIT_VIRTUAL_HOST:/} + useSsl: ${RABBIT_USE_SSL:false} + uri: ${RABBIT_URI:} + activemq: + # RabbitMQ server address list (comma-separated list of tcp://host:port) + addresses: ${ACTIVE_ADDRESSES:} + # TCP connection timeout in milliseconds + connection-timeout: ${ACTIVE_CONNECTION_TIMEOUT:60000} + password: ${ACTIVE_PASSWORD:guest} + queue: ${ACTIVE_QUEUE:zipkin} + username: ${ACTIVE_USER:guest} + query: + enabled: ${QUERY_ENABLED:true} + # 1 day in millis + lookback: ${QUERY_LOOKBACK:86400000} + # The Cache-Control max-age (seconds) for /api/v2/services and /api/v2/spans + names-max-age: 300 + # CORS allowed-origins. + allowed-origins: "*" + + storage: + strict-trace-id: ${STRICT_TRACE_ID:true} + search-enabled: ${SEARCH_ENABLED:true} + autocomplete-keys: ${AUTOCOMPLETE_KEYS:} + autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000} + autocomplete-cardinality: 20000 + type: ${STORAGE_TYPE:mem} + mem: + # Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged. + # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus + # 100 MB for a safety buffer. You'll need to verify in your own environment. + # Experimentally, it works with: max-spans of 500000 with JRE argument -Xmx600m. + max-spans: 500000 + cassandra: + # Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'. + contact-points: ${CASSANDRA_CONTACT_POINTS:localhost} + # Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin. + local-dc: ${CASSANDRA_LOCAL_DC:} + # Will throw an exception on startup if authentication fails. + username: ${CASSANDRA_USERNAME:} + password: ${CASSANDRA_PASSWORD:} + keyspace: ${CASSANDRA_KEYSPACE:zipkin} + # Max pooled connections per datacenter-local host. + max-connections: ${CASSANDRA_MAX_CONNECTIONS:8} + # Ensuring that schema exists, if enabled tries to execute script /zipkin-cassandra-core/resources/cassandra-schema-cql3.txt. + ensure-schema: ${CASSANDRA_ENSURE_SCHEMA:true} + # 7 days in seconds + span-ttl: ${CASSANDRA_SPAN_TTL:604800} + # 3 days in seconds + index-ttl: ${CASSANDRA_INDEX_TTL:259200} + # the maximum trace index metadata entries to cache + index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000} + # how long to cache index metadata about a trace. 1 minute in seconds + index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60} + # how many more index rows to fetch than the user-supplied query limit + index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3} + # Using ssl for connection, rely on Keystore + use-ssl: ${CASSANDRA_USE_SSL:false} + cassandra3: + # Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'. + contact-points: ${CASSANDRA_CONTACT_POINTS:localhost} + # Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin. + local-dc: ${CASSANDRA_LOCAL_DC:} + # Will throw an exception on startup if authentication fails. + username: ${CASSANDRA_USERNAME:} + password: ${CASSANDRA_PASSWORD:} + keyspace: ${CASSANDRA_KEYSPACE:zipkin2} + # Max pooled connections per datacenter-local host. + max-connections: ${CASSANDRA_MAX_CONNECTIONS:8} + # Ensuring that schema exists, if enabled tries to execute script /zipkin2-schema.cql + ensure-schema: ${CASSANDRA_ENSURE_SCHEMA:true} + # how many more index rows to fetch than the user-supplied query limit + index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3} + # Using ssl for connection, rely on Keystore + use-ssl: ${CASSANDRA_USE_SSL:false} + elasticsearch: + # host is left unset intentionally, to defer the decision + hosts: ${ES_HOSTS:} + pipeline: ${ES_PIPELINE:} + max-requests: ${ES_MAX_REQUESTS:64} + timeout: ${ES_TIMEOUT:10000} + index: ${ES_INDEX:zipkin} + date-separator: ${ES_DATE_SEPARATOR:-} + index-shards: ${ES_INDEX_SHARDS:5} + index-replicas: ${ES_INDEX_REPLICAS:1} + username: ${ES_USERNAME:} + password: ${ES_PASSWORD:} + http-logging: ${ES_HTTP_LOGGING:} + legacy-reads-enabled: ${ES_LEGACY_READS_ENABLED:true} + mysql: + jdbc-url: ${MYSQL_JDBC_URL:} + host: ${MYSQL_HOST:localhost} + port: ${MYSQL_TCP_PORT:3306} + username: ${MYSQL_USER:} + password: ${MYSQL_PASS:} + db: ${MYSQL_DB:zipkin} + max-active: ${MYSQL_MAX_CONNECTIONS:10} + use-ssl: ${MYSQL_USE_SSL:false} + ui: + enabled: ${QUERY_ENABLED:true} + ## Values below here are mapped to ZipkinUiProperties, served as /config.json + # Default limit for Find Traces + query-limit: 10 + # The value here becomes a label in the top-right corner + environment: + # Default duration to look back when finding traces. + # Affects the "Start time" element in the UI. 1 hour in millis + default-lookback: 3600000 + # When false, disables the "find a trace" screen + search-enabled: ${SEARCH_ENABLED:true} + # Which sites this Zipkin UI covers. Regex syntax. (e.g. http:\/\/example.com\/.*) + # Multiple sites can be specified, e.g. + # - .*example1.com + # - .*example2.com + # Default is "match all websites" + instrumented: .* + # URL placed into the tag in the HTML + base-path: /zipkin + # When false, disables the "Try Lens UI" button in the navigation page + suggest-lens: true + +server: + port: ${QUERY_PORT:9411} + use-forward-headers: true + compression: + enabled: true + # compresses any response over min-response-size (default is 2KiB) + # Includes dynamic json content and large static assets from zipkin-ui + mime-types: application/json,application/javascript,text/css,image/svg + +spring.main.web-application-type: none + +# We are using Armeria instead of Tomcat. Have it inherit the default configuration from Spring +armeria: + ports: + - port: ${server.port} + protocols: + - http + +spring: + jmx: + # reduce startup time by excluding unexposed JMX service + enabled: false + mvc: + favicon: + # zipkin has its own favicon + enabled: false + autoconfigure: + exclude: + # otherwise we might initialize even when not needed (ex when storage type is cassandra) + - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration +info: + zipkin: + version: "@project.version@" + +logging: + pattern: + level: "%clr(%5p) %clr([%X{traceId}/%X{spanId}]){yellow}" + level: + # Silence Invalid method name: '__can__finagle__trace__v3__' + com.facebook.swift.service.ThriftServiceProcessor: 'OFF' +# # investigate /api/v2/dependencies +# zipkin2.internal.DependencyLinker: 'DEBUG' +# # log cassandra queries (DEBUG is without values) +# com.datastax.driver.core.QueryLogger: 'TRACE' +# # log cassandra trace propagation +# com.datastax.driver.core.Message: 'TRACE' +# # log reason behind http collector dropped messages +# zipkin2.server.ZipkinHttpCollector: 'DEBUG' +# zipkin2.collector.kafka.KafkaCollector: 'DEBUG' +# zipkin2.collector.kafka08.KafkaCollector: 'DEBUG' +# zipkin2.collector.rabbitmq.RabbitMQCollector: 'DEBUG' +# zipkin2.collector.scribe.ScribeCollector: 'DEBUG' + +management: + endpoints: + web: + exposure: + include: '*' + endpoint: + health: + show-details: always +# Disabling auto time http requests since it is added in Undertow HttpHandler in Zipkin autoconfigure +# Prometheus module. In Zipkin we use different naming for the http requests duration + metrics: + web: + server: + auto-time-requests: false From ce55956cd7a152de1c83e53b79abfcdbba0274f9 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:12:51 +0800 Subject: [PATCH 03/16] add support activemq --- zipkin-autoconfigure/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml index 726835e09d8..d6dfda62105 100644 --- a/zipkin-autoconfigure/pom.xml +++ b/zipkin-autoconfigure/pom.xml @@ -46,6 +46,7 @@ storage-mysql storage-cassandra metrics-prometheus + collector-activemq From 0271a85677adc6d87a02b35a7eb2f37915440b8a Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:13:06 +0800 Subject: [PATCH 04/16] add support activemq --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index da095303e18..f95460eee93 100755 --- a/pom.xml +++ b/pom.xml @@ -213,6 +213,12 @@ ${project.version} + + io.zipkin.zipkin2 + zipkin-collector-activemq + ${project.version} + + io.zipkin.zipkin2 zipkin-collector-scribe From 2984d9d343ab53adfea0ab73466879c51c06b837 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:15:12 +0800 Subject: [PATCH 05/16] add support activemq --- zipkin-collector/activemq/pom.xml | 15 +++++++++++++++ .../collector/activemq/ActiveMQCollectorRule.java | 6 +++--- .../collector/activemq/ITActiveMQCollector.java | 6 +++--- .../activemq/src/test/resource/log4j2.properties | 14 ++++++++++++++ 4 files changed, 35 insertions(+), 6 deletions(-) rename zipkin-collector/activemq/src/test/java/{zipkin => zipkin2}/collector/activemq/ActiveMQCollectorRule.java (96%) rename zipkin-collector/activemq/src/test/java/{zipkin => zipkin2}/collector/activemq/ITActiveMQCollector.java (94%) diff --git a/zipkin-collector/activemq/pom.xml b/zipkin-collector/activemq/pom.xml index 80c18e6fdcd..6cd1d4c59c4 100644 --- a/zipkin-collector/activemq/pom.xml +++ b/zipkin-collector/activemq/pom.xml @@ -1,4 +1,19 @@ + diff --git a/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java similarity index 96% rename from zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java rename to zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java index 31c48f22580..fae79aa1f3a 100644 --- a/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ActiveMQCollectorRule.java +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java @@ -1,5 +1,5 @@ -/** - * Copyright 2015-2017 The OpenZipkin Authors +/* + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.collector.activemq; +package zipkin2.collector.activemq; import org.junit.AssumptionViolatedException; import org.junit.rules.ExternalResource; diff --git a/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java similarity index 94% rename from zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java rename to zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java index 1f79209b02f..e09337bb3fc 100644 --- a/zipkin-collector/activemq/src/test/java/zipkin/collector/activemq/ITActiveMQCollector.java +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java @@ -1,5 +1,5 @@ -/** - * Copyright 2015-2018 The OpenZipkin Authors +/* + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.collector.activemq; +package zipkin2.collector.activemq; import org.junit.After; import org.junit.ClassRule; diff --git a/zipkin-collector/activemq/src/test/resource/log4j2.properties b/zipkin-collector/activemq/src/test/resource/log4j2.properties index 6145bd7528d..551eb198208 100644 --- a/zipkin-collector/activemq/src/test/resource/log4j2.properties +++ b/zipkin-collector/activemq/src/test/resource/log4j2.properties @@ -1,3 +1,17 @@ +# +# Copyright 2015-2019 The OpenZipkin Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# + appenders=console appender.console.type=Console appender.console.name=STDOUT From cd43ad7bb69316c330bad671f907f9d45a0d4780 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:16:11 +0800 Subject: [PATCH 06/16] add support activemq --- .../collector/activemq/ActiveMQCollector.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename zipkin-collector/activemq/src/main/java/{zipkin => zipkin2}/collector/activemq/ActiveMQCollector.java (98%) diff --git a/zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java similarity index 98% rename from zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java rename to zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java index 71417e2360a..cbafd4a71d9 100644 --- a/zipkin-collector/activemq/src/main/java/zipkin/collector/activemq/ActiveMQCollector.java +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -1,5 +1,5 @@ -/** - * Copyright 2015-2018 The OpenZipkin Authors +/* + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin.collector.activemq; +package zipkin2.collector.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import zipkin2.Callback; From 6525e42e5b567f23e00608c22508a6e150b8dfe8 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:27:36 +0800 Subject: [PATCH 07/16] add support activemq --- .../activemq/ZipkinActiveMQCollectorAutoConfiguration.java | 2 +- .../collector/activemq/ZipkinActiveMQCollectorProperties.java | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java index 22b0db8cd2d..e4ab98b3e36 100644 --- a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java +++ b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorAutoConfiguration.java @@ -55,7 +55,7 @@ ActiveMQCollector activemq( static final class ActiveMQAddressesOrUriSet implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata a) { - return !isEmpty(context.getEnvironment().getProperty("zipkin.collector.rabbitmq.addresses")); + return !isEmpty(context.getEnvironment().getProperty("zipkin.collector.activemq.addresses")); } private static boolean isEmpty(String s) { diff --git a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java index 74d0b249f60..79513f9134c 100644 --- a/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java +++ b/zipkin-autoconfigure/collector-activemq/src/main/java/zipkin2/autoconfigure/collector/activemq/ZipkinActiveMQCollectorProperties.java @@ -17,15 +17,13 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import zipkin2.collector.activemq.ActiveMQCollector; -import java.net.URI; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; /** Properties for configuring and building a {@link zipkin2.collector.activemq.ActiveMQCollector}. */ -@ConfigurationProperties("zipkin.collector.rabbitmq") +@ConfigurationProperties("zipkin.collector.activemq") class ZipkinActiveMQCollectorProperties { - static final URI EMPTY_URI = URI.create(""); /** RabbitMQ server addresses in the form of a (comma-separated) list of host:port pairs */ private String addresses; From d6b013395cb4887f296c054babfc73836b46a406 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 17:51:40 +0800 Subject: [PATCH 08/16] add support activemq --- zipkin-server/pom.xml | 8 ++++++++ zipkin-server/src/main/resources/zipkin-server-shared.yml | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index e9ff4a58b36..1b11cad7737 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -208,6 +208,14 @@ true + + + io.zipkin.java + zipkin-autoconfigure-collector-activemq + ${project.version} + true + + io.zipkin.java diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 3ef7272fc19..3eea7c435d7 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -34,6 +34,14 @@ zipkin: virtual-host: ${RABBIT_VIRTUAL_HOST:/} useSsl: ${RABBIT_USE_SSL:false} uri: ${RABBIT_URI:} + activemq: + # RabbitMQ server address list (comma-separated list of tcp://host:port) + addresses: ${ACTIVE_ADDRESSES:} + # TCP connection timeout in milliseconds + connection-timeout: ${ACTIVE_CONNECTION_TIMEOUT:60000} + password: ${ACTIVE_PASSWORD:guest} + queue: ${ACTIVE_QUEUE:zipkin} + username: ${ACTIVE_USER:guest} query: enabled: ${QUERY_ENABLED:true} # 1 day in millis From 9d0f40eaacd7ef7c75fc60fdd8e30d154903f2fc Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Thu, 21 Mar 2019 19:59:56 +0800 Subject: [PATCH 09/16] add support activemq --- .../collector/activemq/ActiveMQCollector.java | 65 +++++++------------ 1 file changed, 23 insertions(+), 42 deletions(-) diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java index cbafd4a71d9..03d6c555efb 100644 --- a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -25,6 +25,7 @@ import javax.jms.*; import java.io.Closeable; import java.io.IOException; +import java.lang.IllegalStateException; import java.util.concurrent.atomic.AtomicReference; @@ -147,73 +148,53 @@ public void close() throws IOException { static final class LazyInit { final Builder builder; final AtomicReference failure = new AtomicReference<>(); - ActivemqConnection activemqConnection; + Connection connection; LazyInit(Builder builder) { this.builder = builder; } - protected ActivemqConnection compute() { + protected Connection compute() { try { - builder.connectionFactory.setBrokerURL(builder.addresses); - Connection connection = builder.connectionFactory.createConnection(); - connection.start(); - activemqConnection = new ActivemqConnection(connection); + builder.connectionFactory.setBrokerURL("failover:("+builder.addresses+")?initialReconnectDelay=100"); + connection = builder.connectionFactory.createConnection(); Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); Destination destination = session.createQueue(builder.queue); MessageConsumer messageConsumer = session.createConsumer(destination); - Collector collector = builder.delegate.build(); CollectorMetrics metrics = builder.metrics; - - messageConsumer.setMessageListener(new ActiveMQSpanConsumer(collector,metrics)); + messageConsumer.setMessageListener(new ActiveMQSpanConsumerMessageListener(collector,metrics)); }catch (Exception e){ - + throw new IllegalStateException("Unable to establish connection to ActiveMQ server", e); } - return activemqConnection; + return connection; } void close() throws IOException { - ActivemqConnection maybeConnection = activemqConnection; + Connection maybeConnection = connection; if (maybeConnection != null) { - maybeConnection.close(); + try { + maybeConnection.close(); + }catch (Exception e){ + throw new IOException(e); + } + } } - ActivemqConnection get() { - if (activemqConnection == null) { + Connection get() { + if (connection == null) { synchronized (this) { - if (activemqConnection == null) { - activemqConnection = compute(); + if (connection == null) { + connection = compute(); } } } - return activemqConnection; - } - - - } - - static class ActivemqConnection implements Closeable { - Connection connection = null; - - public ActivemqConnection(Connection connection ){ - this.connection = connection; + return connection; } - @Override - public int hashCode() { - return super.hashCode(); - } - @Override - public void close() throws IOException { - try { - connection.close(); - }catch (Exception e){ - throw new IOException(e); - } - } } @@ -221,11 +202,11 @@ public void close() throws IOException { * Consumes spans from messages on a ActiveMQ queue. Malformed messages will be discarded. Errors * in the storage component will similarly be ignored, with no retry of the message. */ - static class ActiveMQSpanConsumer implements MessageListener { + static class ActiveMQSpanConsumerMessageListener implements MessageListener { final Collector collector; final CollectorMetrics metrics; - ActiveMQSpanConsumer(Collector collector, CollectorMetrics metrics) { + ActiveMQSpanConsumerMessageListener(Collector collector, CollectorMetrics metrics) { this.collector = collector; this.metrics = metrics; } From 2b44eb3b1d1252658e9756098b8cab6776264e38 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Mon, 25 Mar 2019 11:15:30 +0800 Subject: [PATCH 10/16] add support activemq --- .../zipkin2/collector/activemq/ActiveMQCollector.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java index 03d6c555efb..981580ece57 100644 --- a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -23,13 +23,12 @@ import zipkin2.storage.StorageComponent; import javax.jms.*; -import java.io.Closeable; import java.io.IOException; import java.lang.IllegalStateException; import java.util.concurrent.atomic.AtomicReference; -/** This collector consumes encoded binary messages from a RabbitMQ queue. */ +/** This collector consumes encoded binary messages from a ActiveMQ queue. */ public final class ActiveMQCollector extends CollectorComponent { static final Callback NOOP = @@ -45,7 +44,7 @@ public static Builder builder() { return new Builder(); } - /** Configuration including defaults needed to consume spans from a RabbitMQ queue. */ + /** Configuration including defaults needed to consume spans from a ActiveMQ queue. */ public static final class Builder extends CollectorComponent.Builder { Collector.Builder delegate = Collector.newBuilder(ActiveMQCollector.class); CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; @@ -88,7 +87,7 @@ public Builder connectionFactory(ActiveMQConnectionFactory connectionFactory) { } /** - * Queue zipkin spans will be consumed from. Defaults to "zipkin-spans". + * Queue zipkin spans will be consumed from. Defaults to "zipkin". */ public Builder queue(String queue) { if (queue == null) { @@ -98,13 +97,13 @@ public Builder queue(String queue) { } public Builder username(String username) { - ((ActiveMQConnectionFactory)connectionFactory).setUserName(username); + connectionFactory.setUserName(username); return this; } /** The password to use when connecting to the broker. Defaults to "guest" */ public Builder password(String password) { - ((ActiveMQConnectionFactory)connectionFactory).setPassword(password); + connectionFactory.setPassword(password); return this; } From 0c62ac7d0a613fdf257eb7c8c78792b7f796afe8 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Mon, 25 Mar 2019 11:29:45 +0800 Subject: [PATCH 11/16] add support activemq --- zipkin-collector/activemq/README.md | 56 +++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 zipkin-collector/activemq/README.md diff --git a/zipkin-collector/activemq/README.md b/zipkin-collector/activemq/README.md new file mode 100644 index 00000000000..a5954f8c8bf --- /dev/null +++ b/zipkin-collector/activemq/README.md @@ -0,0 +1,56 @@ +# collector-activemq + +## ActiveMQCollector +This collector consumes a ActiveMQ queue for messages that contain a list of spans. +Its only dependencies besides Zipkin core are the `slf4j-api` and the [ActiveMQ Java Client](https://github.com/apache/activemq). + +### Configuration + +The following configuration can be set for the ActiveMQ Collector. + +Property | Environment Variable | Description +--- | --- | --- +`zipkin.collector.activemq.connection-timeout` | `ACTIVE_CONNECTION_TIMEOUT` | Milliseconds to wait establishing a connection. Defaults to `60000` (1 minute) +`zipkin.collector.activemq.queue` | `ACTIVE_QUEUE` | Queue from which to collect span messages. Defaults to `zipkin` + +If the URI is set, the following properties will be ignored. + +Property | Environment Variable | Description +--- | --- | --- +`zipkin.collector.activemq.addresses` | `ACTIVE_ADDRESSES` | Comma-separated list of RabbitMQ addresses, ex. `localhost:5672,localhost:5673` +`zipkin.collector.activemq.password` | `ACTIVE_PASSWORD`| Password to use when connecting to RabbitMQ. Defaults to `guest` +`zipkin.collector.activemq.username` | `ACTIVE_USER` | Username to use when connecting to RabbitMQ. Defaults to `guest` + +### Caveats + +The configured queue will be idempotently declared as a durable queue. + + +Consumption is done with `autoAck` on, so messages that fail to process successfully are not retried. + +## Encoding spans into ActiveMQ messages +The message's body should be the bytes of an encoded list of spans. + +### JSON +A list of Spans in JSON. The first character must be '[' (decimal 91). + +`SpanBytesEncoder.JSON_V2.encodeList(spans)` performs the correct JSON encoding. + +## Local testing + +The following assumes you are running an instance of RabbitMQ locally on the default port (61616). +You can download and install ActiveMQ following [instructions available here](http://activemq.apache.org/download.html). +With the [ActiveMQ Management Admin](http://localhost:8161/admin/) you can easily publish +one-off spans to ActiveMQ to be collected by this collector. + +1. Start ActiveMQ server +2. Start Zipkin server +```bash +$ ACTIVE_ADDRESSES=tcp://localhost:61616 java -jar zipkin.jar +``` +3. Save an array of spans to a file like `sample-spans.json` +```json +[{"traceId":"9032b04972e475c5","id":"9032b04972e475c5","kind":"SERVER","name":"get","timestamp":1505990621526000,"duration":612898,"localEndpoint":{"serviceName":"brave-webmvc-example","ipv4":"192.168.1.113"},"remoteEndpoint":{"serviceName":"","ipv4":"127.0.0.1","port":60149},"tags":{"error":"500 Internal Server Error","http.path":"/a"}}] +``` +4. Publish them using the admin + From 0afdf7898b468d1c62b431bfab65423a72eea454 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Mon, 25 Mar 2019 14:15:03 +0800 Subject: [PATCH 12/16] add support activemq --- .../collector/activemq/ActiveMQCollector.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java index 981580ece57..3e590e4407c 100644 --- a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -213,13 +213,17 @@ static class ActiveMQSpanConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { metrics.incrementMessages(); - if(message instanceof BytesMessage) { - try { - byte [] data = new byte[(int)((BytesMessage)message).getBodyLength()]; - ((BytesMessage)message).readBytes(data); + try { + if(message instanceof BytesMessage) { + byte [] data = new byte[(int)((BytesMessage)message).getBodyLength()]; + ((BytesMessage)message).readBytes(data); + this.collector.acceptSpans(data, NOOP); + }else if(message instanceof TextMessage){ + String text = ((TextMessage)message).getText(); + byte [] data = text.getBytes(); this.collector.acceptSpans(data, NOOP); - }catch (Exception e){ } + }catch (Exception e){ } } From e6afc57a4fb8514e02a1a3d98b0f7ef773242cab Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Mon, 25 Mar 2019 14:25:46 +0800 Subject: [PATCH 13/16] add support activemq --- zipkin-collector/activemq/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zipkin-collector/activemq/README.md b/zipkin-collector/activemq/README.md index a5954f8c8bf..9ca5f8eb269 100644 --- a/zipkin-collector/activemq/README.md +++ b/zipkin-collector/activemq/README.md @@ -17,9 +17,9 @@ If the URI is set, the following properties will be ignored. Property | Environment Variable | Description --- | --- | --- -`zipkin.collector.activemq.addresses` | `ACTIVE_ADDRESSES` | Comma-separated list of RabbitMQ addresses, ex. `localhost:5672,localhost:5673` -`zipkin.collector.activemq.password` | `ACTIVE_PASSWORD`| Password to use when connecting to RabbitMQ. Defaults to `guest` -`zipkin.collector.activemq.username` | `ACTIVE_USER` | Username to use when connecting to RabbitMQ. Defaults to `guest` +`zipkin.collector.activemq.addresses` | `ACTIVE_ADDRESSES` | Comma-separated list of ActiveMQ addresses, ex. `localhost:5672,localhost:5673` +`zipkin.collector.activemq.password` | `ACTIVE_PASSWORD`| Password to use when connecting to ActiveMQ. Defaults to `guest` +`zipkin.collector.activemq.username` | `ACTIVE_USER` | Username to use when connecting to ActiveMQ. Defaults to `guest` ### Caveats @@ -38,7 +38,7 @@ A list of Spans in JSON. The first character must be '[' (decimal 91). ## Local testing -The following assumes you are running an instance of RabbitMQ locally on the default port (61616). +The following assumes you are running an instance of ActiveMQ locally on the default port (61616). You can download and install ActiveMQ following [instructions available here](http://activemq.apache.org/download.html). With the [ActiveMQ Management Admin](http://localhost:8161/admin/) you can easily publish one-off spans to ActiveMQ to be collected by this collector. From 3a1b0b1f36993e34d577afeb67439998801267df Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Mon, 25 Mar 2019 14:29:14 +0800 Subject: [PATCH 14/16] add support activemq --- zipkin-collector/activemq/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zipkin-collector/activemq/README.md b/zipkin-collector/activemq/README.md index 9ca5f8eb269..0da53bb8f4f 100644 --- a/zipkin-collector/activemq/README.md +++ b/zipkin-collector/activemq/README.md @@ -17,9 +17,9 @@ If the URI is set, the following properties will be ignored. Property | Environment Variable | Description --- | --- | --- -`zipkin.collector.activemq.addresses` | `ACTIVE_ADDRESSES` | Comma-separated list of ActiveMQ addresses, ex. `localhost:5672,localhost:5673` -`zipkin.collector.activemq.password` | `ACTIVE_PASSWORD`| Password to use when connecting to ActiveMQ. Defaults to `guest` -`zipkin.collector.activemq.username` | `ACTIVE_USER` | Username to use when connecting to ActiveMQ. Defaults to `guest` +`zipkin.collector.activemq.addresses` | `ACTIVE_ADDRESSES` | Comma-separated list of ActiveMQ addresses, ex. `tcp://localhost:61616,tcp://localhost2:61616` +`zipkin.collector.activemq.password` | `ACTIVE_PASSWORD`| Password to use when connecting to ActiveMQ. Defaults to `system` +`zipkin.collector.activemq.username` | `ACTIVE_USER` | Username to use when connecting to ActiveMQ. Defaults to `manager` ### Caveats From 1f031e16aed843787d6111fef615228e34400647 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Mon, 25 Mar 2019 16:17:09 +0800 Subject: [PATCH 15/16] add support activemq --- .../main/resources/zipkin-server-shared.yml | 221 ------------------ zipkin-collector/activemq/README.md | 12 +- .../collector/activemq/ActiveMQCollector.java | 4 +- .../activemq/ITActiveMQCollector.java | 11 + .../rabbitmq/ITRabbitMQCollector.java | 3 +- .../main/resources/zipkin-server-shared.yml | 10 +- 6 files changed, 25 insertions(+), 236 deletions(-) delete mode 100644 zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml diff --git a/zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml b/zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml deleted file mode 100644 index 3eea7c435d7..00000000000 --- a/zipkin-autoconfigure/collector-activemq/src/main/resources/zipkin-server-shared.yml +++ /dev/null @@ -1,221 +0,0 @@ -zipkin: - self-tracing: - # Set to true to enable self-tracing. - enabled: ${SELF_TRACING_ENABLED:false} - # percentage to self-traces to retain - sample-rate: ${SELF_TRACING_SAMPLE_RATE:1.0} - # Timeout in seconds to flush self-tracing data to storage. - message-timeout: ${SELF_TRACING_FLUSH_INTERVAL:1} - collector: - # percentage to traces to retain - sample-rate: ${COLLECTOR_SAMPLE_RATE:1.0} - http: - # Set to false to disable creation of spans via HTTP collector API - enabled: ${HTTP_COLLECTOR_ENABLED:true} - kafka: - # Kafka bootstrap broker list, comma-separated host:port values. Setting this activates the - # Kafka 0.10+ collector. - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:} - # Name of topic to poll for spans - topic: ${KAFKA_TOPIC:zipkin} - # Consumer group this process is consuming on behalf of. - group-id: ${KAFKA_GROUP_ID:zipkin} - # Count of consumer threads consuming the topic - streams: ${KAFKA_STREAMS:1} - rabbitmq: - # RabbitMQ server address list (comma-separated list of host:port) - addresses: ${RABBIT_ADDRESSES:} - concurrency: ${RABBIT_CONCURRENCY:1} - # TCP connection timeout in milliseconds - connection-timeout: ${RABBIT_CONNECTION_TIMEOUT:60000} - password: ${RABBIT_PASSWORD:guest} - queue: ${RABBIT_QUEUE:zipkin} - username: ${RABBIT_USER:guest} - virtual-host: ${RABBIT_VIRTUAL_HOST:/} - useSsl: ${RABBIT_USE_SSL:false} - uri: ${RABBIT_URI:} - activemq: - # RabbitMQ server address list (comma-separated list of tcp://host:port) - addresses: ${ACTIVE_ADDRESSES:} - # TCP connection timeout in milliseconds - connection-timeout: ${ACTIVE_CONNECTION_TIMEOUT:60000} - password: ${ACTIVE_PASSWORD:guest} - queue: ${ACTIVE_QUEUE:zipkin} - username: ${ACTIVE_USER:guest} - query: - enabled: ${QUERY_ENABLED:true} - # 1 day in millis - lookback: ${QUERY_LOOKBACK:86400000} - # The Cache-Control max-age (seconds) for /api/v2/services and /api/v2/spans - names-max-age: 300 - # CORS allowed-origins. - allowed-origins: "*" - - storage: - strict-trace-id: ${STRICT_TRACE_ID:true} - search-enabled: ${SEARCH_ENABLED:true} - autocomplete-keys: ${AUTOCOMPLETE_KEYS:} - autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000} - autocomplete-cardinality: 20000 - type: ${STORAGE_TYPE:mem} - mem: - # Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged. - # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus - # 100 MB for a safety buffer. You'll need to verify in your own environment. - # Experimentally, it works with: max-spans of 500000 with JRE argument -Xmx600m. - max-spans: 500000 - cassandra: - # Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'. - contact-points: ${CASSANDRA_CONTACT_POINTS:localhost} - # Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin. - local-dc: ${CASSANDRA_LOCAL_DC:} - # Will throw an exception on startup if authentication fails. - username: ${CASSANDRA_USERNAME:} - password: ${CASSANDRA_PASSWORD:} - keyspace: ${CASSANDRA_KEYSPACE:zipkin} - # Max pooled connections per datacenter-local host. - max-connections: ${CASSANDRA_MAX_CONNECTIONS:8} - # Ensuring that schema exists, if enabled tries to execute script /zipkin-cassandra-core/resources/cassandra-schema-cql3.txt. - ensure-schema: ${CASSANDRA_ENSURE_SCHEMA:true} - # 7 days in seconds - span-ttl: ${CASSANDRA_SPAN_TTL:604800} - # 3 days in seconds - index-ttl: ${CASSANDRA_INDEX_TTL:259200} - # the maximum trace index metadata entries to cache - index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000} - # how long to cache index metadata about a trace. 1 minute in seconds - index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60} - # how many more index rows to fetch than the user-supplied query limit - index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3} - # Using ssl for connection, rely on Keystore - use-ssl: ${CASSANDRA_USE_SSL:false} - cassandra3: - # Comma separated list of host addresses part of Cassandra cluster. Ports default to 9042 but you can also specify a custom port with 'host:port'. - contact-points: ${CASSANDRA_CONTACT_POINTS:localhost} - # Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin. - local-dc: ${CASSANDRA_LOCAL_DC:} - # Will throw an exception on startup if authentication fails. - username: ${CASSANDRA_USERNAME:} - password: ${CASSANDRA_PASSWORD:} - keyspace: ${CASSANDRA_KEYSPACE:zipkin2} - # Max pooled connections per datacenter-local host. - max-connections: ${CASSANDRA_MAX_CONNECTIONS:8} - # Ensuring that schema exists, if enabled tries to execute script /zipkin2-schema.cql - ensure-schema: ${CASSANDRA_ENSURE_SCHEMA:true} - # how many more index rows to fetch than the user-supplied query limit - index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3} - # Using ssl for connection, rely on Keystore - use-ssl: ${CASSANDRA_USE_SSL:false} - elasticsearch: - # host is left unset intentionally, to defer the decision - hosts: ${ES_HOSTS:} - pipeline: ${ES_PIPELINE:} - max-requests: ${ES_MAX_REQUESTS:64} - timeout: ${ES_TIMEOUT:10000} - index: ${ES_INDEX:zipkin} - date-separator: ${ES_DATE_SEPARATOR:-} - index-shards: ${ES_INDEX_SHARDS:5} - index-replicas: ${ES_INDEX_REPLICAS:1} - username: ${ES_USERNAME:} - password: ${ES_PASSWORD:} - http-logging: ${ES_HTTP_LOGGING:} - legacy-reads-enabled: ${ES_LEGACY_READS_ENABLED:true} - mysql: - jdbc-url: ${MYSQL_JDBC_URL:} - host: ${MYSQL_HOST:localhost} - port: ${MYSQL_TCP_PORT:3306} - username: ${MYSQL_USER:} - password: ${MYSQL_PASS:} - db: ${MYSQL_DB:zipkin} - max-active: ${MYSQL_MAX_CONNECTIONS:10} - use-ssl: ${MYSQL_USE_SSL:false} - ui: - enabled: ${QUERY_ENABLED:true} - ## Values below here are mapped to ZipkinUiProperties, served as /config.json - # Default limit for Find Traces - query-limit: 10 - # The value here becomes a label in the top-right corner - environment: - # Default duration to look back when finding traces. - # Affects the "Start time" element in the UI. 1 hour in millis - default-lookback: 3600000 - # When false, disables the "find a trace" screen - search-enabled: ${SEARCH_ENABLED:true} - # Which sites this Zipkin UI covers. Regex syntax. (e.g. http:\/\/example.com\/.*) - # Multiple sites can be specified, e.g. - # - .*example1.com - # - .*example2.com - # Default is "match all websites" - instrumented: .* - # URL placed into the tag in the HTML - base-path: /zipkin - # When false, disables the "Try Lens UI" button in the navigation page - suggest-lens: true - -server: - port: ${QUERY_PORT:9411} - use-forward-headers: true - compression: - enabled: true - # compresses any response over min-response-size (default is 2KiB) - # Includes dynamic json content and large static assets from zipkin-ui - mime-types: application/json,application/javascript,text/css,image/svg - -spring.main.web-application-type: none - -# We are using Armeria instead of Tomcat. Have it inherit the default configuration from Spring -armeria: - ports: - - port: ${server.port} - protocols: - - http - -spring: - jmx: - # reduce startup time by excluding unexposed JMX service - enabled: false - mvc: - favicon: - # zipkin has its own favicon - enabled: false - autoconfigure: - exclude: - # otherwise we might initialize even when not needed (ex when storage type is cassandra) - - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration -info: - zipkin: - version: "@project.version@" - -logging: - pattern: - level: "%clr(%5p) %clr([%X{traceId}/%X{spanId}]){yellow}" - level: - # Silence Invalid method name: '__can__finagle__trace__v3__' - com.facebook.swift.service.ThriftServiceProcessor: 'OFF' -# # investigate /api/v2/dependencies -# zipkin2.internal.DependencyLinker: 'DEBUG' -# # log cassandra queries (DEBUG is without values) -# com.datastax.driver.core.QueryLogger: 'TRACE' -# # log cassandra trace propagation -# com.datastax.driver.core.Message: 'TRACE' -# # log reason behind http collector dropped messages -# zipkin2.server.ZipkinHttpCollector: 'DEBUG' -# zipkin2.collector.kafka.KafkaCollector: 'DEBUG' -# zipkin2.collector.kafka08.KafkaCollector: 'DEBUG' -# zipkin2.collector.rabbitmq.RabbitMQCollector: 'DEBUG' -# zipkin2.collector.scribe.ScribeCollector: 'DEBUG' - -management: - endpoints: - web: - exposure: - include: '*' - endpoint: - health: - show-details: always -# Disabling auto time http requests since it is added in Undertow HttpHandler in Zipkin autoconfigure -# Prometheus module. In Zipkin we use different naming for the http requests duration - metrics: - web: - server: - auto-time-requests: false diff --git a/zipkin-collector/activemq/README.md b/zipkin-collector/activemq/README.md index 0da53bb8f4f..2c5cb718a0a 100644 --- a/zipkin-collector/activemq/README.md +++ b/zipkin-collector/activemq/README.md @@ -10,16 +10,16 @@ The following configuration can be set for the ActiveMQ Collector. Property | Environment Variable | Description --- | --- | --- -`zipkin.collector.activemq.connection-timeout` | `ACTIVE_CONNECTION_TIMEOUT` | Milliseconds to wait establishing a connection. Defaults to `60000` (1 minute) -`zipkin.collector.activemq.queue` | `ACTIVE_QUEUE` | Queue from which to collect span messages. Defaults to `zipkin` +`zipkin.collector.activemq.connection-timeout` | `ACTIVEMQ_CONNECTION_TIMEOUT` | Milliseconds to wait establishing a connection. Defaults to `60000` (1 minute) +`zipkin.collector.activemq.queue` | `ACTIVEMQ_QUEUE` | Queue from which to collect span messages. Defaults to `zipkin` If the URI is set, the following properties will be ignored. Property | Environment Variable | Description --- | --- | --- -`zipkin.collector.activemq.addresses` | `ACTIVE_ADDRESSES` | Comma-separated list of ActiveMQ addresses, ex. `tcp://localhost:61616,tcp://localhost2:61616` -`zipkin.collector.activemq.password` | `ACTIVE_PASSWORD`| Password to use when connecting to ActiveMQ. Defaults to `system` -`zipkin.collector.activemq.username` | `ACTIVE_USER` | Username to use when connecting to ActiveMQ. Defaults to `manager` +`zipkin.collector.activemq.addresses` | `ACTIVEMQ_ADDRESSES` | Comma-separated list of ActiveMQ addresses, ex. `tcp://localhost:61616,tcp://localhost2:61616` +`zipkin.collector.activemq.password` | `ACTIVEMQ_PASSWORD`| Password to use when connecting to ActiveMQ. Defaults to `system` +`zipkin.collector.activemq.username` | `ACTIVEMQ_USER` | Username to use when connecting to ActiveMQ. Defaults to `manager` ### Caveats @@ -46,7 +46,7 @@ one-off spans to ActiveMQ to be collected by this collector. 1. Start ActiveMQ server 2. Start Zipkin server ```bash -$ ACTIVE_ADDRESSES=tcp://localhost:61616 java -jar zipkin.jar +$ ACTIVEMQ_ADDRESSES=tcp://localhost:61616 java -jar zipkin.jar ``` 3. Save an array of spans to a file like `sample-spans.json` ```json diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java index 3e590e4407c..db8d1b09df1 100644 --- a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -157,14 +157,14 @@ protected Connection compute() { try { builder.connectionFactory.setBrokerURL("failover:("+builder.addresses+")?initialReconnectDelay=100"); connection = builder.connectionFactory.createConnection(); - Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(builder.queue); MessageConsumer messageConsumer = session.createConsumer(destination); Collector collector = builder.delegate.build(); CollectorMetrics metrics = builder.metrics; messageConsumer.setMessageListener(new ActiveMQSpanConsumerMessageListener(collector,metrics)); - }catch (Exception e){ + }catch (JMSException e){ throw new IllegalStateException("Unable to establish connection to ActiveMQ server", e); } return connection; diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java index e09337bb3fc..09fc1650715 100644 --- a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java @@ -43,6 +43,17 @@ public class ITActiveMQCollector { assertThat(activemq.collector.check().ok()).isTrue(); } + @Test + public void startFailsWithInvalidActiveMqServer() throws Exception { + // we can be pretty certain ActiveMQ isn't running on localhost port 61614 + String notActiveMqAddress = "localhost:61614"; + try (ActiveMQCollector collector = ActiveMQCollector.builder().addresses(notActiveMqAddress).build()) { + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to establish connection to ActiveMQ server"); + collector.start(); + } + } + } diff --git a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java index 9b7f764d856..37139da277d 100644 --- a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java +++ b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java @@ -57,8 +57,7 @@ public void checkPasses() { public void startFailsWithInvalidRabbitMqServer() throws Exception { // we can be pretty certain RabbitMQ isn't running on localhost port 80 String notRabbitMqAddress = "localhost:80"; - try (RabbitMQCollector collector = - builder().addresses(Collections.singletonList(notRabbitMqAddress)).build()) { + try (RabbitMQCollector collector = builder().addresses(Collections.singletonList(notRabbitMqAddress)).build()) { thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to establish connection to RabbitMQ server"); collector.start(); diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 3eea7c435d7..64b89bba824 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -36,12 +36,12 @@ zipkin: uri: ${RABBIT_URI:} activemq: # RabbitMQ server address list (comma-separated list of tcp://host:port) - addresses: ${ACTIVE_ADDRESSES:} + addresses: ${ACTIVEMQ_ADDRESSES:} # TCP connection timeout in milliseconds - connection-timeout: ${ACTIVE_CONNECTION_TIMEOUT:60000} - password: ${ACTIVE_PASSWORD:guest} - queue: ${ACTIVE_QUEUE:zipkin} - username: ${ACTIVE_USER:guest} + connection-timeout: ${ACTIVEMQ_CONNECTION_TIMEOUT:60000} + password: ${ACTIVEMQ_PASSWORD:system} + queue: ${ACTIVEMQ_QUEUE:zipkin} + username: ${ACTIVEMQ_USER:manager} query: enabled: ${QUERY_ENABLED:true} # 1 day in millis From 63d24aa0776ebdb6ba739a02ffd64640d5d720d2 Mon Sep 17 00:00:00 2001 From: IAMTJW <924862077@qq.com> Date: Tue, 26 Mar 2019 16:45:09 +0800 Subject: [PATCH 16/16] add activemq test --- zipkin-collector/activemq/pom.xml | 8 +++ .../collector/activemq/ActiveMQCollector.java | 3 +- .../activemq/ActiveMQCollectorRule.java | 57 ++++++++++++++++ .../activemq/ITActiveMQCollector.java | 68 +++++++++++++++++++ 4 files changed, 135 insertions(+), 1 deletion(-) diff --git a/zipkin-collector/activemq/pom.xml b/zipkin-collector/activemq/pom.xml index 6cd1d4c59c4..9178707eea6 100644 --- a/zipkin-collector/activemq/pom.xml +++ b/zipkin-collector/activemq/pom.xml @@ -47,6 +47,14 @@ ${activemq-client.version} + + org.apache.activemq + activemq-all + ${activemq-client.version} + test + + + org.apache.logging.log4j log4j-slf4j-impl diff --git a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java index db8d1b09df1..dad964f3863 100644 --- a/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java +++ b/zipkin-collector/activemq/src/main/java/zipkin2/collector/activemq/ActiveMQCollector.java @@ -70,6 +70,7 @@ public Builder metrics(CollectorMetrics metrics) { if (metrics == null) { throw new NullPointerException("metrics == null"); } + this.metrics = metrics.forTransport("activemq"); this.delegate.metrics(this.metrics); return this; } @@ -155,7 +156,7 @@ static final class LazyInit { protected Connection compute() { try { - builder.connectionFactory.setBrokerURL("failover:("+builder.addresses+")?initialReconnectDelay=100"); + builder.connectionFactory.setBrokerURL("failover:("+builder.addresses+")?initialReconnectDelay=100&maxReconnectAttempts=10"); connection = builder.connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java index fae79aa1f3a..c66838e115e 100644 --- a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQCollectorRule.java @@ -13,6 +13,9 @@ */ package zipkin2.collector.activemq; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; import org.junit.AssumptionViolatedException; import org.junit.rules.ExternalResource; import org.slf4j.Logger; @@ -22,7 +25,9 @@ import zipkin2.collector.InMemoryCollectorMetrics; import zipkin2.storage.InMemoryStorage; +import javax.jms.*; import java.io.IOException; +import java.util.concurrent.TimeoutException; class ActiveMQCollectorRule extends ExternalResource { static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQCollectorRule.class); @@ -30,14 +35,30 @@ class ActiveMQCollectorRule extends ExternalResource { final InMemoryStorage storage = InMemoryStorage.newBuilder().build(); final InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); + final InMemoryCollectorMetrics activemqMetrics = metrics.forTransport("activemq"); + + GenericContainer container; ActiveMQCollector collector; + BrokerService broker; + @Override protected void before() throws Throwable { + BrokerService broker; + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setBrokerName("MyBroker"); + broker.setPersistent(false); + broker.addConnector("tcp://localhost:61616"); + broker.start(); + + Thread.sleep(10000); + + try { this.collector = tryToInitializeCollector(); } catch (RuntimeException| Error e) { @@ -68,6 +89,42 @@ ActiveMQCollector.Builder computeCollectorBuilder() { } + void publish(byte[] message) throws IOException, TimeoutException { + ConnectionFactory connectionFactory; //创建链接工厂 + Connection connection = null;//链接 + Session session;//创建会话 + Destination destination;//消息目的地 消息队列 + MessageProducer messageProducer;//消息生产者 + //实例化链接工厂 参数为 用户,密码,url + connectionFactory = new ActiveMQConnectionFactory("", "", ActiveMQConnection.DEFAULT_BROKER_URL); + try { + connection=connectionFactory.createConnection();//通过链接工厂创建链接 + connection.start();//启动链接 + //创建会话 Session.AUTO_ACKNOWLEDGE。receive 或MessageListener.onMessage()成功返回的时候,自动确认收到消息。 + session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + //创建一个消息队列名称为hello ActiveMQ 消息队列中可包含需要发布消息 + destination = session.createQueue("zipkin"); + //将创建的消息队列hello ActiveMQ交给消息发布者messageProdecer + messageProducer=session.createProducer(destination); + + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(message); + messageProducer.send(bytesMessage); + + + } catch (JMSException e) { + e.printStackTrace(); + }finally{ + try { + //关闭连接 + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + } + @Override protected void after() { diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java index 09fc1650715..703a3be2da7 100644 --- a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java @@ -20,9 +20,12 @@ import org.junit.rules.ExpectedException; import zipkin2.Span; import zipkin2.TestObjects; +import zipkin2.codec.SpanBytesEncoder; +import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Java6Assertions.assertThat; @@ -55,5 +58,70 @@ public void startFailsWithInvalidActiveMqServer() throws Exception { } + /** Ensures list encoding works: a json encoded list of spans */ + @Test + public void messageWithMultipleSpans_json() throws Exception { + byte[] message = SpanBytesEncoder.JSON_V1.encodeList(spans); + activemq.publish(message); + + Thread.sleep(1000); + assertThat(activemq.storage.acceptedSpanCount()).isEqualTo(spans.size()); + + assertThat(activemq.activemqMetrics.messages()).isEqualTo(1); + assertThat(activemq.activemqMetrics.bytes()).isEqualTo(message.length); + assertThat(activemq.activemqMetrics.spans()).isEqualTo(spans.size()); + } + + /** Ensures list encoding works: a version 2 json list of spans */ + @Test + public void messageWithMultipleSpans_json2() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.JSON_V2); + } + + /** Ensures list encoding works: proto3 ListOfSpans */ + @Test + public void messageWithMultipleSpans_proto3() throws Exception { + messageWithMultipleSpans(SpanBytesEncoder.PROTO3); + } + + void messageWithMultipleSpans(SpanBytesEncoder encoder) + throws IOException, TimeoutException, InterruptedException { + + byte[] message = encoder.encodeList(spans); + activemq.publish(message); + + Thread.sleep(10000); + assertThat(activemq.storage.acceptedSpanCount()).isEqualTo(spans.size()); + + assertThat(activemq.activemqMetrics.messages()).isEqualTo(1); + assertThat(activemq.activemqMetrics.bytes()).isEqualTo(message.length); + assertThat(activemq.activemqMetrics.spans()).isEqualTo(spans.size()); + } + + /** Ensures malformed spans don't hang the collector */ + @Test + public void skipsMalformedData() throws Exception { + activemq.publish(SpanBytesEncoder.JSON_V2.encodeList(spans)); + activemq.publish(new byte[0]); + activemq.publish("[\"='".getBytes()); // screwed up json + activemq.publish("malformed".getBytes()); + activemq.publish(SpanBytesEncoder.JSON_V2.encodeList(spans)); + + Thread.sleep(1000); + assertThat(activemq.activemqMetrics.messages()).isEqualTo(5); + assertThat(activemq.activemqMetrics.messagesDropped()).isEqualTo(3); + } + + /** Guards against errors that leak from storage, such as InvalidQueryException */ + @Test + public void skipsOnSpanConsumerException() { + // TODO: reimplement + } + + @Test + public void messagesDistributedAcrossMultipleThreadsSuccessfully() { + // TODO: reimplement + } + }