diff --git a/.travis.yml b/.travis.yml old mode 100644 new mode 100755 index 656f184b607..59cfeb909c8 --- a/.travis.yml +++ b/.travis.yml @@ -42,6 +42,10 @@ before_install: - git config credential.helper "store --file=.git/credentials" - echo "https://$GH_TOKEN:@github.com" > .git/credentials + # Manually install elasticsearch until https://github.com/travis-ci/apt-source-whitelist/issues/190 + - curl -SL https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz | tar xz + - elasticsearch-*/bin/elasticsearch -d > /dev/null + script: # If we aren't a pull request, we are publishing either a snapshot or a release. # We don't currently publish the benchmarks or interop modules, as they are test code only. diff --git a/README.md b/README.md index c9e9e4df681..80e2ed453ed 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![Gitter chat](http://img.shields.io/badge/gitter-join%20chat%20%E2%86%92-brightgreen.svg)](https://gitter.im/openzipkin/zipkin) [![Build Status](https://travis-ci.org/openzipkin/zipkin-java.svg?branch=master)](https://travis-ci.org/openzipkin/zipkin-java) [![Download](https://api.bintray.com/packages/openzipkin/maven/zipkin-java/images/download.svg) ](https://bintray.com/openzipkin/maven/zipkin-java/_latestVersion) # zipkin-java -This project is a native java port of [zipkin](https://github.com/openzipkin/zipkin), which was historically written in scala+finagle. This includes a dependency-free library and a [spring-boot](http://projects.spring.io/spring-boot/) replacement for zipkin's query server. Storage options include in-memory, JDBC (mysql) and Cassandra. +This project is a native java port of [zipkin](https://github.com/openzipkin/zipkin), which was historically written in scala+finagle. This includes a dependency-free library and a [spring-boot](http://projects.spring.io/spring-boot/) replacement for zipkin's query server. Storage options include in-memory, JDBC (mysql), Cassandra, and Elasticsearch. ## Library The [core library](https://github.com/openzipkin/zipkin-java/tree/master/zipkin/src/main/java/io/zipkin) requires minimum language level 7. While currently only used by the server, we expect this library to be used in native instrumentation as well. diff --git a/interop/pom.xml b/interop/pom.xml index df70d4a9822..5f73e9673c6 100644 --- a/interop/pom.xml +++ b/interop/pom.xml @@ -67,12 +67,27 @@ test + + ${project.groupId} + spanstore-elasticsearch + test + + ${project.groupId} spanstore-jdbc test + + + com.google.guava + guava + 19.0 + test + + + io.zipkin diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java new file mode 100755 index 00000000000..e79c9d79ac5 --- /dev/null +++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaDependencyStoreTest.java @@ -0,0 +1,56 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import com.twitter.zipkin.common.Span; +import com.twitter.zipkin.storage.DependencyStore; +import com.twitter.zipkin.storage.DependencyStoreSpec; +import java.util.concurrent.TimeUnit; +import org.junit.BeforeClass; +import scala.collection.immutable.List; +import zipkin.DependencyLink; +import zipkin.InMemorySpanStore; +import zipkin.SpanStore; +import zipkin.interop.ScalaDependencyStoreAdapter; +import zipkin.interop.ScalaSpanStoreAdapter; + +import static zipkin.internal.Util.midnightUTC; + +public class ElasticsearchScalaDependencyStoreTest extends DependencyStoreSpec { + private static ElasticsearchSpanStore spanStore; + + @BeforeClass + public static void setupDB() { + spanStore = ElasticsearchTestGraph.INSTANCE.spanStore(); + } + + public DependencyStore store() { + return new ScalaDependencyStoreAdapter(spanStore); + } + + @Override + public void processDependencies(List input) { + SpanStore mem = new InMemorySpanStore(); + new ScalaSpanStoreAdapter(mem).apply(input); + java.util.List + links = mem.getDependencies(today() + TimeUnit.DAYS.toMillis(1), null); + + long midnight = midnightUTC(((long) input.apply(0).timestamp().get()) / 1000); + ElasticsearchTestGraph.INSTANCE.spanStore().writeDependencyLinks(links, midnight); + } + + public void clear() { + spanStore.clear(); + } +} diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java new file mode 100644 index 00000000000..f4a42ee77ce --- /dev/null +++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchScalaSpanStoreTest.java @@ -0,0 +1,36 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import com.twitter.zipkin.storage.SpanStore; +import com.twitter.zipkin.storage.SpanStoreSpec; +import org.junit.BeforeClass; +import zipkin.interop.ScalaSpanStoreAdapter; + +public class ElasticsearchScalaSpanStoreTest extends SpanStoreSpec { + private static ElasticsearchSpanStore spanStore; + + @BeforeClass + public static void setupDB() { + spanStore = ElasticsearchTestGraph.INSTANCE.spanStore(); + } + + public SpanStore store() { + return new ScalaSpanStoreAdapter(spanStore); + } + + public void clear() { + spanStore.clear(); + } +} diff --git a/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java new file mode 100644 index 00000000000..c331f15fef4 --- /dev/null +++ b/interop/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java @@ -0,0 +1,44 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.junit.AssumptionViolatedException; + +enum ElasticsearchTestGraph { + INSTANCE; + + static final ElasticsearchConfig CONFIG = new ElasticsearchConfig.Builder().build(); + + static { + // Avoid race-conditions in travis by forcing read-your-writes consistency. + ElasticsearchSpanConsumer.BLOCK_ON_FUTURES = true; + } + + private AssumptionViolatedException ex; + private ElasticsearchSpanStore spanStore; + + /** A lot of tech debt here because the spanstore constructor performs I/O. */ + synchronized ElasticsearchSpanStore spanStore() { + if (ex != null) throw ex; + if (this.spanStore == null) { + try { + this.spanStore = new ElasticsearchSpanStore(CONFIG); + } catch (NoNodeAvailableException e) { + throw ex = new AssumptionViolatedException(e.getMessage()); + } + } + return spanStore; + } +} diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index 84b9d6ba32a..a8e9a5aa50e --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,8 @@ MariaDB has a friendlier license, LGPL, which is less scary in audits. --> 1.3.6 + 2.2.1 + 1.7.19 4.12 3.3.0 @@ -146,6 +148,12 @@ ${project.version} + + ${project.groupId} + spanstore-elasticsearch + ${project.version} + + ${project.groupId} transport-kafka @@ -295,6 +303,7 @@ LICENSE **/*.md dsc-cassandra-*/** + elasticsearch-*/** src/test/resources/** src/main/resources/** diff --git a/zipkin-server/README.md b/zipkin-server/README.md index b4cd8dbc594..73e8fc8282f 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -58,6 +58,24 @@ Example usage: $ STORAGE_TYPE=mysql MYSQL_USER=root ./mvnw -pl zipkin-server spring-boot:run ``` + +### Elasticsearch +The following apply when `STORAGE_TYPE` is set to `elasticsearch`: + + * `ES_CLUSTER`: The name of the elasticsearch cluster to connect to. Defaults to "elasticsearch". + * `ES_HOSTS`: A comma separated list of elasticsearch hostnodes to connect to, in host:port + format. The port should be the transport port, not the http port. Defaults to + "localhost:9300". Only one of these hosts needs to be available to fetch the + remaining nodes in the cluster. It is recommended to set this to all the master + nodes of the cluster. + * `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin. + +Example usage: + +```bash +$ STORAGE_TYPE=elasticsearch ES_CLUSTER=monitoring ES_HOSTS=host1:9300,host2:9300 ./mvnw -pl zipkin-server spring-boot:run +``` + ### Kafka The following apply when `KAFKA_ZOOKEEPER` is set: diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index 4f8825b6a43..4e3f91a61ef 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -91,6 +91,21 @@ true + + + ${project.groupId} + spanstore-elasticsearch + true + + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + true + + ${project.groupId} diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinElasticsearchProperties.java b/zipkin-server/src/main/java/zipkin/server/ZipkinElasticsearchProperties.java new file mode 100644 index 00000000000..7576bfd2aa4 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinElasticsearchProperties.java @@ -0,0 +1,64 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.server; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("elasticsearch") +public class ZipkinElasticsearchProperties { + + /** + * The elasticsearch cluster to connect to, defaults to "elasticsearch". + */ + private String cluster = "elasticsearch"; + + /** + * A comma separated list of elasticsearch hostnodes to connect to, in host:port + * format. The port should be the transport port, not the http port. Defaults to + * "localhost:9300". + */ + private String hosts = "localhost:9300"; + + /** + * The index prefix to use when generating daily index names. Defaults to zipkin. + */ + private String index = "zipkin"; + + public String getCluster() { + return cluster; + } + + public ZipkinElasticsearchProperties setCluster(String cluster) { + this.cluster = cluster; + return this; + } + + public String getHosts() { + return hosts; + } + + public ZipkinElasticsearchProperties setHosts(String hosts) { + this.hosts = hosts; + return this; + } + + public String getIndex() { + return index; + } + + public ZipkinElasticsearchProperties setIndex(String index) { + this.index = index; + return this; + } +} diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java index 3f7f9f016aa..8ebf847c0ed 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java @@ -43,13 +43,15 @@ import zipkin.SpanStore; import zipkin.cassandra.CassandraConfig; import zipkin.cassandra.CassandraSpanStore; +import zipkin.elasticsearch.ElasticsearchConfig; +import zipkin.elasticsearch.ElasticsearchSpanStore; import zipkin.jdbc.JDBCSpanStore; import zipkin.kafka.KafkaConfig; import zipkin.kafka.KafkaTransport; import zipkin.server.brave.TraceWritesSpanStore; @Configuration -@EnableConfigurationProperties({ZipkinServerProperties.class, ZipkinCassandraProperties.class}) +@EnableConfigurationProperties(ZipkinServerProperties.class) @EnableAsync(proxyTargetClass = true) public class ZipkinServerConfiguration { @@ -71,6 +73,11 @@ Sampler traceIdSampler(@Value("${zipkin.collector.sample-rate:1.0}") float rate) @Bean @ConditionalOnMissingBean(SpanStore.class) SpanStore spanStore() { + if (server.getStore().getType() != ZipkinServerProperties.Store.Type.mem) { + throw new IllegalStateException("Attempted to set storage type to " + + server.getStore().getType() + " but could not initialize the spanstore for " + + "that storage type. Did you include it on the classpath?"); + } return new InMemorySpanStore(); } @@ -132,6 +139,21 @@ static class CassandraConfiguration { } } + @Configuration + @EnableConfigurationProperties(ZipkinElasticsearchProperties.class) + @ConditionalOnProperty(name = "zipkin.store.type", havingValue = "elasticsearch") + @ConditionalOnClass(name = "zipkin.elasticsearch.ElasticsearchSpanStore") + static class ElasticsearchConfiguration { + @Bean SpanStore elasticsearchSpanStore(ZipkinElasticsearchProperties elasticsearch) { + ElasticsearchConfig config = new ElasticsearchConfig.Builder() + .cluster(elasticsearch.getCluster()) + .hosts(elasticsearch.getHosts()) + .index(elasticsearch.getIndex()) + .build(); + return new ElasticsearchSpanStore(config); + } + } + /** * This transport consumes a topic, decodes spans from thrift messages and stores them subject to * sampling policy. diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerProperties.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerProperties.java index c2cf8d91908..45f9a14e0cb 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerProperties.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerProperties.java @@ -31,7 +31,7 @@ public Ui getUi() { static class Store { enum Type { - cassandra, mysql, mem + cassandra, mysql, elasticsearch, mem } private Type type = Type.mem; diff --git a/zipkin-server/src/main/resources/zipkin-server.yml b/zipkin-server/src/main/resources/zipkin-server.yml index 49b6db480ea..b2d41dfdafe 100644 --- a/zipkin-server/src/main/resources/zipkin-server.yml +++ b/zipkin-server/src/main/resources/zipkin-server.yml @@ -23,6 +23,10 @@ mysql: db: ${MYSQL_DB:zipkin} max-active: ${MYSQL_MAX_CONNECTIONS:10} use-ssl: ${MYSQL_USE_SSL:false} +elasticsearch: + cluster: ${ES_CLUSTER:elasticsearch} + hosts: ${ES_HOSTS:localhost:9300} + index: ${ES_INDEX:zipkin} kafka: # ZooKeeper host string, comma-separated host:port value. zookeeper: ${KAFKA_ZOOKEEPER:} diff --git a/zipkin-spanstores/elasticsearch/README.md b/zipkin-spanstores/elasticsearch/README.md new file mode 100755 index 00000000000..fd896f89c0d --- /dev/null +++ b/zipkin-spanstores/elasticsearch/README.md @@ -0,0 +1,13 @@ +# spanstore-elasticsearch + +This is an Elasticsearch 2 SpanStore. The SpanStore utilizies the Elasticsearch Java client +library with a node client for optimal performance. + +Spans are stored into daily indices, for example spans with a timestamp falling on 2016/03/19 +will be stored in an index like zipkin-2016-03-19. There is no support for TTL through this SpanStore. +It is recommended instead to use [Elastic Curator](https://www.elastic.co/guide/en/elasticsearch/client/curator/current/about.html) +to remove indices older than the point you are interested in. + +`zipkin.elasticsearch.ElasticsearchConfig` includes defaults that will operate +against a local Elasticsearch installation. + diff --git a/zipkin-spanstores/elasticsearch/pom.xml b/zipkin-spanstores/elasticsearch/pom.xml new file mode 100755 index 00000000000..d9cec33993d --- /dev/null +++ b/zipkin-spanstores/elasticsearch/pom.xml @@ -0,0 +1,68 @@ + + + + + io.zipkin.java + zipkin-spanstores + 0.9.4-SNAPSHOT + + 4.0.0 + + spanstore-elasticsearch + SpanStore: Elasticsearch + + + 1.7 + 1.7 + ${project.basedir}/.. + + + + + ${project.groupId} + zipkin + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + ch.qos.logback + logback-classic + 1.1.6 + test + + + + ${project.groupId} + zipkin + test-jar + test + + + diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchConfig.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchConfig.java new file mode 100644 index 00000000000..a7c7a874554 --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchConfig.java @@ -0,0 +1,79 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import com.google.common.io.Resources; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static zipkin.internal.Util.checkNotNull; + +public class ElasticsearchConfig { + + public static final class Builder { + + private String cluster = "elasticsearch"; + private String hosts = "localhost:9300"; + private String index = "zipkin"; + + /** + * The elasticsearch cluster to connect to, defaults to "elasticsearch". + */ + public Builder cluster(String cluster) { + this.cluster = cluster; + return this; + } + + /** + * A comma separated list of elasticsearch hostnodes to connect to, in host:port format. The + * port should be the transport port, not the http port. Defaults to "localhost:9300". + */ + public Builder hosts(String hosts) { + this.hosts = hosts; + return this; + } + + /** + * The index prefix to use when generating daily index names. Defaults to zipkin. + */ + public Builder index(String index) { + this.index = index; + return this; + } + + public ElasticsearchConfig build() { + return new ElasticsearchConfig(this); + } + } + + final String clusterName; + final String hosts; + final String index; + final String indexTemplate; + + ElasticsearchConfig(Builder builder) { + clusterName = checkNotNull(builder.cluster, "builder.cluster"); + hosts = checkNotNull(builder.hosts, "builder.hosts"); + index = checkNotNull(builder.index, "builder.index"); + + try { + indexTemplate = Resources.toString( + Resources.getResource("zipkin/elasticsearch/zipkin_template.json"), + StandardCharsets.UTF_8) + .replace("${__INDEX__}", index); + } catch (IOException e) { + throw new AssertionError("Error reading jar resource, shouldn't happen.", e); + } + } +} diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchConstants.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchConstants.java new file mode 100644 index 00000000000..de586c365d4 --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchConstants.java @@ -0,0 +1,22 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +final class ElasticsearchConstants { + + static final String SPAN = "span"; + static final String DEPENDENCY_LINK = "dependencylink"; + + private ElasticsearchConstants() {} +} diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java new file mode 100755 index 00000000000..8595be90faf --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanConsumer.java @@ -0,0 +1,83 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import zipkin.Codec; +import zipkin.Span; +import zipkin.SpanConsumer; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.JsonCodec; + +// Extracted for readability +final class ElasticsearchSpanConsumer implements SpanConsumer { + + static final JsonCodec JSON_CODEC = new JsonCodec(); + /** + * Internal flag that allows you read-your-writes consistency during tests. + * + *

This is internal as collection endpoints are usually in different threads or not in the same + * process as query ones. Special-casing this allows tests to pass without changing {@link + * SpanConsumer#accept}. + * + *

Why not just change {@link SpanConsumer#accept} now? {@link SpanConsumer#accept} may indeed + * need to change, but when that occurs, we'd want to choose something that is widely supportable, + * and serving a specific use case. That api might not be a future, for example. Future is + * difficult, for example, properly supporting and testing cancel. Further, there are other async + * models such as callbacks that could be more supportable. Regardless, this work is best delayed + * until there's a worthwhile use-case vs up-fronting only due to tests, and prematurely choosing + * Future results. + */ + static boolean BLOCK_ON_FUTURES; + + private final Client client; + private final IndexNameFormatter indexNameFormatter; + + ElasticsearchSpanConsumer(Client client, IndexNameFormatter indexNameFormatter) { + this.client = client; + this.indexNameFormatter = indexNameFormatter; + } + + @Override + public void accept(List spans) { + BulkRequestBuilder request = client.prepareBulk(); + for (Span span : spans) { + request.add(createSpanIndexRequest(ApplyTimestampAndDuration.apply(span))); + } + ListenableActionFuture future = request.execute(); + if (BLOCK_ON_FUTURES) { + future.actionGet(); + client.admin().indices().flush(new FlushRequest()).actionGet(); + } + } + + private IndexRequestBuilder createSpanIndexRequest(Span span) { + long indexTimestampMillis; + if (span.timestamp != null) { + indexTimestampMillis = TimeUnit.MICROSECONDS.toMillis(span.timestamp); + } else { + indexTimestampMillis = System.currentTimeMillis(); + } + String spanIndex = indexNameFormatter.indexNameForTimestamp(indexTimestampMillis); + return client.prepareIndex(spanIndex, ElasticsearchConstants.SPAN) + .setSource(Codec.JSON.writeSpan(span)); + } +} diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java new file mode 100755 index 00000000000..c4d6e2a26cf --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/ElasticsearchSpanStore.java @@ -0,0 +1,416 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.net.HostAndPort; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService.ScriptType; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.nested.Nested; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.tophits.TopHits; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; +import zipkin.Codec; +import zipkin.DependencyLink; +import zipkin.QueryRequest; +import zipkin.Span; +import zipkin.SpanStore; +import zipkin.internal.CorrectForClockSkew; +import zipkin.internal.MergeById; +import zipkin.internal.Nullable; +import zipkin.internal.Util; + +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; +import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.index.query.QueryBuilders.termsQuery; + +public class ElasticsearchSpanStore implements SpanStore { + + private static final long ONE_DAY_IN_MILLIS = TimeUnit.DAYS.toMillis(1); + + private final Client client; + private final IndexNameFormatter indexNameFormatter; + private final ElasticsearchSpanConsumer spanConsumer; + private final String indexTemplate; + + public ElasticsearchSpanStore(ElasticsearchConfig config) { + this.client = createClient(config.hosts, config.clusterName); + this.indexNameFormatter = new IndexNameFormatter(config.index); + this.spanConsumer = new ElasticsearchSpanConsumer(client, indexNameFormatter); + this.indexTemplate = config.indexTemplate; + + checkForIndexTemplate(); + } + + @Override public void accept(List spans) { + spanConsumer.accept(spans); + } + + @Override public List> getTraces(QueryRequest request) { + long endMillis = request.endTs; + long beginMillis = endMillis - request.lookback; + + String serviceName = request.serviceName.toLowerCase(); + + BoolQueryBuilder filter = boolQuery() + .must(boolQuery() + .should(termQuery("annotations.endpoint.serviceName", serviceName)) + .should(nestedQuery( + "binaryAnnotations", + termQuery("binaryAnnotations.endpoint.serviceName", serviceName)))) + .must(rangeQuery("timestamp") + .gte(TimeUnit.MILLISECONDS.toMicros(beginMillis)) + .lte(TimeUnit.MILLISECONDS.toMicros(endMillis))); + if (request.spanName != null) { + filter.must(termQuery("name", request.spanName)); + } + for (String annotation : request.annotations) { + filter.must(termQuery("annotations.value", annotation)); + } + for (Map.Entry annotation : request.binaryAnnotations.entrySet()) { + // In our index template, we make sure the binaryAnnotation value is indexed as string, + // meaning non-string values won't even be indexed at all. This means that we can only + // match string values here, which happens to be exactly what we want. + filter.must(nestedQuery("binaryAnnotations", + boolQuery() + .must(termQuery("binaryAnnotations.key", annotation.getKey())) + .must(termQuery("binaryAnnotations.value", + annotation.getValue())))); + } + + if (request.minDuration != null) { + RangeQueryBuilder durationQuery = rangeQuery("duration").gte(request.minDuration); + if (request.maxDuration != null) { + durationQuery.lte(request.maxDuration); + } + filter.must(durationQuery); + } + + + List strings = computeIndices(beginMillis, endMillis); + String[] indices = strings.toArray(new String[strings.size()]); + // We need to filter to traces that contain at least one span that matches the request, + // but we need to order by timestamp of the first span, regardless of if it matched the + // filter or not. Normal queries usually will apply a filter first, meaning we wouldn't be + // able to "backtrack" to state with non-filtered spans to do the ordering properly, which + // is important to respect request.limit. Luckily, pipeline aggregations can help - we + // aggregate unfiltered trace ids, ordered by their min timestamp. We then apply a pipeline + // aggregation which applies the filter, and then removes parent buckets based on whether any + // documents matched the filter, effectively "backtracking". + SearchRequestBuilder elasticRequest = + client.prepareSearch(indices) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setTypes(ElasticsearchConstants.SPAN) + .setQuery(matchAllQuery()) + .setSize(0) + .addAggregation( + AggregationBuilders.terms("traceId_agg") + .field("traceId") + .subAggregation(AggregationBuilders.min("timestamps_agg").field("timestamp")) + .subAggregation(AggregationBuilders.filter("filtered_agg").filter(filter)) + .subAggregation(PipelineAggregatorBuilders.having("bucket_filter") + .setBucketsPathsMap(ImmutableMap.of("_count", "filtered_agg._count")) + .script(new Script("_count > 0", ScriptType.INLINE, "expression", null))) + .order(Order.aggregation("timestamps_agg", false)) + .size(request.limit)); + SearchResponse response = elasticRequest.execute().actionGet(); + + if (response.getAggregations() == null) { + return Collections.emptyList(); + } + Terms traceIdsAgg = response.getAggregations().get("traceId_agg"); + if (traceIdsAgg == null) { + return Collections.emptyList(); + } + List traceIds = new ArrayList<>(); + for (Terms.Bucket bucket : traceIdsAgg.getBuckets()) { + traceIds.add(Util.lowerHexToUnsignedLong(bucket.getKeyAsString())); + } + return getTracesByIds(traceIds, indices); + } + + @Override public List getTrace(long id) { + return Iterables.getFirst(getTracesByIds(ImmutableList.of(id), indexNameFormatter.catchAll()), + null); + } + + @Override public List getRawTrace(long traceId) { + SearchRequestBuilder elasticRequest = client.prepareSearch(indexNameFormatter.catchAll()) + .setTypes(ElasticsearchConstants.SPAN) + .setQuery(termQuery("traceId",String.format("%016x", traceId))); + SearchResponse response = elasticRequest.execute().actionGet(); + ImmutableList.Builder trace = ImmutableList.builder(); + for (SearchHit hit : response.getHits()) { + trace.add(Codec.JSON.readSpan(hit.getSourceRef().toBytes())); + } + return trace.build(); + } + + private List> getTracesByIds(Collection traceIds, String... indices) { + List traceIdsStr = new ArrayList<>(traceIds.size()); + for (long traceId : traceIds) { + traceIdsStr.add(String.format("%016x", traceId)); + } + SearchRequestBuilder elasticRequest = client.prepareSearch(indices) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setTypes(ElasticsearchConstants.SPAN) + // TODO: This is the default maximum size of an elasticsearch result set. + // Need to determine whether this is enough by zipkin standards or should + // increase it in the index template. + .setSize(10000) + .setQuery(termsQuery("traceId", traceIdsStr)) + .addSort(SortBuilders.fieldSort("timestamp") + .order(SortOrder.ASC) + .unmappedType("long")); + SearchResponse response = elasticRequest.execute().actionGet(); + ArrayListMultimap groupedSpans = ArrayListMultimap.create(); + for (SearchHit hit : response.getHits()) { + Span span = Codec.JSON.readSpan(hit.getSourceRef().toBytes()); + groupedSpans.put(span.traceId, span); + } + ImmutableList.Builder> traces = ImmutableList.builder(); + // We want to return traces in the same order as the parameter. + for (Long traceId : traceIds) { + if (!groupedSpans.containsKey(traceId)) { + // Couldn't find the trace, this will usually only happen when called from getTrace, not + // getTraces. + continue; + } + traces.add( + ImmutableList.copyOf( + CorrectForClockSkew.apply(MergeById.apply(groupedSpans.get(traceId))))); + } + return traces.build(); + } + + @Override + public List getServiceNames() { + SearchRequestBuilder elasticRequest = + client.prepareSearch(indexNameFormatter.catchAll()) + .setTypes(ElasticsearchConstants.SPAN) + .setQuery(matchAllQuery()) + .setSize(0) + .addAggregation(AggregationBuilders.terms("annotationServiceName_agg") + .field("annotations.endpoint.serviceName")) + .addAggregation(AggregationBuilders.nested("binaryAnnotations_agg") + .path("binaryAnnotations") + .subAggregation(AggregationBuilders.terms("binaryAnnotationsServiceName_agg") + .field("binaryAnnotations.endpoint.serviceName"))); + SearchResponse response = elasticRequest.execute().actionGet(); + if (response.getAggregations() == null) { + return Collections.emptyList(); + } + SortedSet serviceNames = new TreeSet<>(); + Terms annotationServiceNamesAgg = response.getAggregations().get("annotationServiceName_agg"); + if (annotationServiceNamesAgg != null) { + for (Terms.Bucket bucket : annotationServiceNamesAgg.getBuckets()) { + if (!bucket.getKeyAsString().isEmpty()) { + serviceNames.add(bucket.getKeyAsString()); + } + } + } + Nested binaryAnnotationsAgg = response.getAggregations().get("binaryAnnotations_agg"); + if (binaryAnnotationsAgg != null && binaryAnnotationsAgg.getAggregations() != null) { + Terms binaryAnnotationServiceNamesAgg = binaryAnnotationsAgg.getAggregations() + .get("binaryAnnotationsServiceName_agg"); + if (binaryAnnotationServiceNamesAgg != null) { + for (Terms.Bucket bucket : binaryAnnotationServiceNamesAgg.getBuckets()) { + if (!bucket.getKeyAsString().isEmpty()) { + serviceNames.add(bucket.getKeyAsString()); + } + } + } + } + return ImmutableList.copyOf(serviceNames); + } + + @Override + public List getSpanNames(String serviceName) { + if (Strings.isNullOrEmpty(serviceName)) { + return Collections.emptyList(); + } + serviceName = serviceName.toLowerCase(); + QueryBuilder filter = boolQuery() + .should(termQuery("annotations.endpoint.serviceName", serviceName)) + .should(termQuery("binaryAnnotations.endpoint.serviceName", serviceName)); + SearchRequestBuilder elasticRequest = client.prepareSearch(indexNameFormatter.catchAll()) + .setTypes(ElasticsearchConstants.SPAN) + .setQuery(boolQuery().must(matchAllQuery()).filter(filter)) + .setSize(0) + .addAggregation(AggregationBuilders.terms("name_agg") + .order(Order.term(true)) + .field("name")); + SearchResponse response = elasticRequest.execute().actionGet(); + Terms namesAgg = response.getAggregations().get("name_agg"); + if (namesAgg == null) { + return Collections.emptyList(); + } + ImmutableList.Builder spanNames = ImmutableList.builder(); + for (Terms.Bucket bucket : namesAgg.getBuckets()) { + spanNames.add(bucket.getKeyAsString()); + } + return spanNames.build(); + } + + @Override + public List getDependencies(long endMillis, @Nullable Long lookback) { + long beginMillis = lookback != null ? endMillis - lookback : 0; + // We just return all dependencies in the days that fall within endTs and lookback as + // dependency links themselves don't have timestamps. + List strings = computeIndices(beginMillis, endMillis); + SearchRequestBuilder elasticRequest = client.prepareSearch( + strings.toArray(new String[strings.size()])) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setTypes(ElasticsearchConstants.DEPENDENCY_LINK) + .addAggregation(AggregationBuilders.terms("parent_child_agg") + .field("parent_child") + .subAggregation(AggregationBuilders.topHits("hits_agg") + .setSize(1)) + .subAggregation(AggregationBuilders.sum("callCount_agg") + .field("callCount"))) + .setQuery(matchAllQuery()); + SearchResponse response = elasticRequest.execute().actionGet(); + if (response.getAggregations() == null) { + return Collections.emptyList(); + } + Terms parentChildAgg = response.getAggregations().get("parent_child_agg"); + if (parentChildAgg == null) { + return Collections.emptyList(); + } + ImmutableList.Builder links = ImmutableList.builder(); + for (Terms.Bucket bucket : parentChildAgg.getBuckets()) { + TopHits hitsAgg = bucket.getAggregations().get("hits_agg"); + Sum callCountAgg = bucket.getAggregations().get("callCount_agg"); + // We would have no bucket if there wasn't a hit, so this should always be non-empty. + SearchHit hit = hitsAgg.getHits().getAt(0); + DependencyLink link = Codec.JSON.readDependencyLink(hit.getSourceRef().toBytes()); + link = new DependencyLink.Builder(link).callCount((long) callCountAgg.getValue()).build(); + links.add(link); + } + return links.build(); + } + + @VisibleForTesting void clear() { + client.admin().indices().delete(new DeleteIndexRequest(indexNameFormatter.catchAll())) + .actionGet(); + client.admin().indices().flush(new FlushRequest()).actionGet(); + } + + @VisibleForTesting void writeDependencyLinks(List links, long timestampMillis) { + timestampMillis = Util.midnightUTC(timestampMillis); + BulkRequestBuilder request = client.prepareBulk(); + for (DependencyLink link : links) { + request.add(client.prepareIndex( + indexNameFormatter.indexNameForTimestamp(timestampMillis), + ElasticsearchConstants.DEPENDENCY_LINK) + .setSource( + "parent", link.parent, + "child", link.child, + "parent_child", link.parent + "|" + link.child, // For aggregating callCount + "callCount", link.callCount)); + } + request.execute().actionGet(); + client.admin().indices().flush(new FlushRequest()).actionGet(); + } + + private List computeIndices(long beginMillis, long endMillis) { + beginMillis = Util.midnightUTC(beginMillis); + endMillis = Util.midnightUTC(endMillis); + + List indices = new ArrayList<>(); + // If a leap second is involved, the same index will be specified twice. + // It shouldn't be a big deal. + for (long currentMillis = beginMillis; currentMillis <= endMillis; + currentMillis += ONE_DAY_IN_MILLIS) { + indices.add(indexNameFormatter.indexNameForTimestamp(currentMillis)); + } + return indices; + } + + private void checkForIndexTemplate() { + GetIndexTemplatesResponse existingTemplates = + client.admin().indices().getTemplates(new GetIndexTemplatesRequest("zipkin_template")) + .actionGet(); + if (!existingTemplates.getIndexTemplates().isEmpty()) { + return; + } + client.admin().indices().putTemplate( + new PutIndexTemplateRequest("zipkin_template").source(indexTemplate)) + .actionGet(); + } + + private static Client createClient(String hosts, String clusterName) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("client.transport.sniff", true) + .build(); + + TransportClient client = TransportClient.builder() + .settings(settings) + .build(); + for (String host : hosts.split(",")) { + HostAndPort hostAndPort = HostAndPort.fromString(host); + try { + client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName( + hostAndPort.getHostText()), hostAndPort.getPort())); + } catch (UnknownHostException e) { + // Hosts may be down transiently, we should still try to connect. If all of them happen + // to be down we will fail later when trying to use the client when checking the index + // template. + continue; + } + } + return client; + } +} diff --git a/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/IndexNameFormatter.java b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/IndexNameFormatter.java new file mode 100755 index 00000000000..d62ffb3fc70 --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/java/zipkin/elasticsearch/IndexNameFormatter.java @@ -0,0 +1,39 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +final class IndexNameFormatter { + + private static final String DAILY_INDEX_FORMAT = "yyyy-MM-dd"; + + private final String index; + private final DateFormat dateFormat; + + IndexNameFormatter(String index) { + this.index = index; + this.dateFormat = new SimpleDateFormat(DAILY_INDEX_FORMAT); + } + + String indexNameForTimestamp(long timestampMillis) { + return index + "-" + dateFormat.format(new Date(timestampMillis)); + } + + String catchAll() { + return index + "-*"; + } +} diff --git a/zipkin-spanstores/elasticsearch/src/main/resources/zipkin/elasticsearch/zipkin_template.json b/zipkin-spanstores/elasticsearch/src/main/resources/zipkin/elasticsearch/zipkin_template.json new file mode 100755 index 00000000000..7734f38b84d --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/main/resources/zipkin/elasticsearch/zipkin_template.json @@ -0,0 +1,78 @@ +{ + "template": "${__INDEX__}-*", + "settings": { + "index.requests.cache.enable": true, + "analysis": { + "analyzer": { + "lowercase": { + "type": "custom", + "tokenizer": "keyword", + "filter": [ + "lowercase" + ] + } + } + } + }, + "mappings": { + "_default_": { + "dynamic_templates": [ + { + "strings": { + "mapping": { + "index": "not_analyzed", + "type": "string" + }, + "match_mapping_type": "string", + "match": "*" + } + }, + { + "value": { + "match": "value", + "mapping": { + "index": "not_analyzed", + "ignore_malformed": true, + "type": "string" + } + } + }, + { + "timestamps": { + "match": "timestamp", + "mapping": { + "type": "long" + } + } + }, + { + "serviceName": { + "match": "serviceName", + "mapping": { + "type": "string", + "analyzer": "lowercase" + } + } + }, + { + "binaryAnnotations": { + "match": "binaryAnnotations", + "mapping": { + "type": "nested" + } + } + } + ], + "_all": { + "enabled": false + } + }, + "span": { + "properties": { + "binaryAnnotations": { + "type": "nested" + } + } + } + } +} diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java new file mode 100755 index 00000000000..c4bae6b1f76 --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchDependenciesTest.java @@ -0,0 +1,57 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import zipkin.DependenciesTest; +import zipkin.DependencyLink; +import zipkin.InMemorySpanStore; +import zipkin.Span; +import zipkin.SpanStore; + +import static zipkin.internal.Util.midnightUTC; + +public class ElasticsearchDependenciesTest extends DependenciesTest { + + public ElasticsearchDependenciesTest() { + this.store = ElasticsearchTestGraph.INSTANCE.spanStore(); + } + + @Override + public void clear() { + store.clear(); + } + + /** + * The current implementation does not include dependency aggregation. It includes retrieval of + * pre-aggregated links. + * + *

This uses {@link InMemorySpanStore} to prepare links and {@link + * ElasticsearchSpanStore#writeDependencyLinks(List, long)}} to store them. + * + *

Note: The zipkin-dependencies-spark doesn't yet support writing dependency links to + * elasticsearch, until it does this span store cannot be used for dependency links. + */ + @Override + public void processDependencies(List spans) { + SpanStore mem = new InMemorySpanStore(); + mem.accept(spans); + List links = mem.getDependencies(today + TimeUnit.DAYS.toMillis(1), null); + + long midnight = midnightUTC(spans.get(0).timestamp / 1000); + ElasticsearchTestGraph.INSTANCE.spanStore().writeDependencyLinks( + links, midnight); + } +} diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java new file mode 100644 index 00000000000..c1a13e2bdaa --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchSpanStoreTest.java @@ -0,0 +1,28 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import zipkin.SpanStoreTest; + +public class ElasticsearchSpanStoreTest extends SpanStoreTest { + + public ElasticsearchSpanStoreTest() { + this.store = ElasticsearchTestGraph.INSTANCE.spanStore(); + } + + @Override + public void clear() { + store.clear(); + } +} diff --git a/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java new file mode 100644 index 00000000000..25c8928edf3 --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/test/java/zipkin/elasticsearch/ElasticsearchTestGraph.java @@ -0,0 +1,42 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.elasticsearch; + +import org.elasticsearch.client.transport.NoNodeAvailableException; +import org.junit.AssumptionViolatedException; + +enum ElasticsearchTestGraph { + INSTANCE; + + static final ElasticsearchConfig CONFIG = new ElasticsearchConfig.Builder().build(); + + static { + ElasticsearchSpanConsumer.BLOCK_ON_FUTURES = true; + } + + private AssumptionViolatedException ex; + private ElasticsearchSpanStore spanStore; + + synchronized ElasticsearchSpanStore spanStore() { + if (ex != null) throw ex; + if (this.spanStore == null) { + try { + this.spanStore = new ElasticsearchSpanStore(CONFIG); + } catch (NoNodeAvailableException e) { + throw ex = new AssumptionViolatedException(e.getMessage()); + } + } + return spanStore; + } +} diff --git a/zipkin-spanstores/elasticsearch/src/test/resources/logback.xml b/zipkin-spanstores/elasticsearch/src/test/resources/logback.xml new file mode 100755 index 00000000000..d92aba939c5 --- /dev/null +++ b/zipkin-spanstores/elasticsearch/src/test/resources/logback.xml @@ -0,0 +1,14 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/zipkin-spanstores/pom.xml b/zipkin-spanstores/pom.xml index 9e0900b8e34..5be14495239 100644 --- a/zipkin-spanstores/pom.xml +++ b/zipkin-spanstores/pom.xml @@ -30,5 +30,6 @@ cassandra jdbc + elasticsearch diff --git a/zipkin/src/main/java/zipkin/Codec.java b/zipkin/src/main/java/zipkin/Codec.java index 58364ba2bff..4703aac9f95 100644 --- a/zipkin/src/main/java/zipkin/Codec.java +++ b/zipkin/src/main/java/zipkin/Codec.java @@ -29,8 +29,8 @@ interface Factory { Codec get(String mediaType); } - Codec JSON = new JsonCodec(); - Codec THRIFT = new ThriftCodec(); + JsonCodec JSON = new JsonCodec(); + ThriftCodec THRIFT = new ThriftCodec(); Factory FACTORY = new Factory() { diff --git a/zipkin/src/main/java/zipkin/internal/JsonCodec.java b/zipkin/src/main/java/zipkin/internal/JsonCodec.java index 9f679d1104e..0fccda8352d 100644 --- a/zipkin/src/main/java/zipkin/internal/JsonCodec.java +++ b/zipkin/src/main/java/zipkin/internal/JsonCodec.java @@ -430,6 +430,26 @@ public String toString() { } }; + // Added since JSON-based storage usually works better with single documents rather than + // a large encoded list. + /** throws {@linkplain IllegalArgumentException} if the dependency link couldn't be decoded */ + public DependencyLink readDependencyLink(byte[] bytes) { + checkArgument(bytes.length > 0, "Empty input reading DependencyLink"); + try { + return DEPENDENCY_LINK_ADAPTER.fromJson(new Buffer().write(bytes)); + } catch (IOException | RuntimeException e) { + throw exceptionReading("Span", bytes, e); + } + } + + // Added since JSON-based storage usually works better with single documents rather than + // a large encoded list. + public byte[] writeDependencyLink(DependencyLink value) { + Buffer buffer = new Buffer(); + write(DEPENDENCY_LINK_ADAPTER, value, buffer); + return buffer.readByteArray(); + } + @Override public List readDependencyLinks(byte[] bytes) { checkArgument(bytes.length > 0, "Empty input reading List"); diff --git a/zipkin/src/test/java/zipkin/DependenciesTest.java b/zipkin/src/test/java/zipkin/DependenciesTest.java index 6739c85befe..f58673e4d1d 100644 --- a/zipkin/src/test/java/zipkin/DependenciesTest.java +++ b/zipkin/src/test/java/zipkin/DependenciesTest.java @@ -98,7 +98,7 @@ public void getDependencies() { processDependencies(trace); assertThat(store.getDependencies(today + 1000L, null)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); } /** Edge-case when there are no spans, or instrumentation isn't logging annotations properly. */ @@ -121,7 +121,7 @@ public void traceIdIsOpaque() { processDependencies(differentTraceId); assertThat(store.getDependencies(today + 1000L, null)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); } /** @@ -163,7 +163,7 @@ public void getDependenciesAllInstrumented() { assertThat( store.getDependencies((trace.get(0).timestamp + traceDuration) / 1000, traceDuration / 1000) - ).containsExactly( + ).containsOnly( new DependencyLink("trace-producer-one", "trace-producer-two", 1), new DependencyLink("trace-producer-two", "trace-producer-three", 1) ); @@ -177,7 +177,7 @@ public void getDependenciesMultiLevel() { processDependencies(trace); assertThat(store.getDependencies(today + 1000L, null)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); } @Test @@ -193,7 +193,7 @@ public void dependencies_loopback() { processDependencies(traceWithLoopback); assertThat(store.getDependencies(today + 1000L, null)) - .containsExactly(new DependencyLink("zipkin-web", "zipkin-web", 1)); + .containsOnly(new DependencyLink("zipkin-web", "zipkin-web", 1)); } /** @@ -205,7 +205,7 @@ public void dependencies_headlessTrace() { processDependencies(asList(trace.get(1), trace.get(2))); assertThat(store.getDependencies(today + 1000L, null)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); } @Test @@ -213,7 +213,7 @@ public void looksBackIndefinitely() { processDependencies(trace); assertThat(store.getDependencies(today + 1000L, null)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); } @Test @@ -221,7 +221,7 @@ public void insideTheInterval() { processDependencies(trace); assertThat(store.getDependencies(dep.endTs, dep.endTs - dep.startTs)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); } @Test @@ -278,7 +278,7 @@ public void notInstrumentedClientAndServer() { processDependencies(trace); - assertThat(store.getDependencies(today + 1000L, null)).containsExactly( + assertThat(store.getDependencies(today + 1000L, null)).containsOnly( new DependencyLink("some-client", "zipkin-web", 1), new DependencyLink("zipkin-web", "zipkin-query", 1), new DependencyLink("zipkin-query", "zipkin-jdbc", 1) @@ -324,7 +324,7 @@ public void noClientSendAddrAnnotations() { processDependencies(trace); assertThat(store.getDependencies(today + 1000L, null)) - .containsExactly(new DependencyLink("zipkin-web", "zipkin-query", 1)); + .containsOnly(new DependencyLink("zipkin-web", "zipkin-query", 1)); } /** @@ -341,14 +341,14 @@ public void canSearchForIntervalsBesidesToday() { // A user looks at today's links. // - Note: Using the smallest lookback avoids bumping into implementation around windowing. assertThat(store.getDependencies(dep.endTs, dep.endTs - dep.startTs)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); // A user compares the links from those a day ago. assertThat(store.getDependencies(dep.endTs - day, dep.endTs - dep.startTs)) - .isEqualTo(dep.links); + .containsOnlyElementsOf(dep.links); // A user looks at all links since data started - assertThat(store.getDependencies(dep.endTs, null)).containsExactly( + assertThat(store.getDependencies(dep.endTs, null)).containsOnly( new DependencyLink("zipkin-web", "zipkin-query", 2), new DependencyLink("zipkin-query", "zipkin-jdbc", 2) ); diff --git a/zipkin/src/test/java/zipkin/SpanStoreTest.java b/zipkin/src/test/java/zipkin/SpanStoreTest.java old mode 100644 new mode 100755 index c4f002ac2cf..22d12a9474c --- a/zipkin/src/test/java/zipkin/SpanStoreTest.java +++ b/zipkin/src/test/java/zipkin/SpanStoreTest.java @@ -264,22 +264,22 @@ public void getTraces_duration() { // Min duration is inclusive and is applied by service. assertThat(store.getTraces(q.serviceName("service1").minDuration(targz.duration).build())) - .containsOnly(trace1); + .containsExactly(trace1); assertThat(store.getTraces(q.serviceName("service3").minDuration(targz.duration).build())) - .containsOnly(trace2); + .containsExactly(trace2); // Duration bounds aren't limited to root spans: they apply to all spans by service in a trace assertThat(store.getTraces(q.serviceName("service2").minDuration(zip.duration).maxDuration(tar.duration).build())) - .containsOnly(trace3, trace2, trace1); // service2 is in the middle of trace1 and 2, but root of trace3 + .containsExactly(trace3, trace2, trace1); // service2 is in the middle of trace1 and 2, but root of trace3 // Span name should apply to the duration filter assertThat(store.getTraces(q.serviceName("service2").spanName("zip").maxDuration(zip.duration).build())) - .containsOnly(trace3); + .containsExactly(trace3); // Max duration should filter our longer spans from the same service assertThat(store.getTraces(q.serviceName("service2").minDuration(gz.duration).maxDuration(zip.duration).build())) - .containsOnly(trace3); + .containsExactly(trace3); } /**