Skip to content

Commit

Permalink
Merge pull request #107 from anuraaga/elasticsearch
Browse files Browse the repository at this point in the history
Add an elasticsearch span store.
  • Loading branch information
adriancole committed Mar 22, 2016
2 parents 6162686 + 9af0712 commit 36a3e53
Show file tree
Hide file tree
Showing 30 changed files with 1,270 additions and 23 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ before_install:
- git config credential.helper "store --file=.git/credentials"
- echo "https://$GH_TOKEN:@github.com" > .git/credentials

# Manually install elasticsearch until https://github.com/travis-ci/apt-source-whitelist/issues/190
- curl -SL https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz | tar xz
- elasticsearch-*/bin/elasticsearch -d > /dev/null

script:
# If we aren't a pull request, we are publishing either a snapshot or a release.
# We don't currently publish the benchmarks or interop modules, as they are test code only.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![Gitter chat](http://img.shields.io/badge/gitter-join%20chat%20%E2%86%92-brightgreen.svg)](https://gitter.im/openzipkin/zipkin) [![Build Status](https://travis-ci.org/openzipkin/zipkin-java.svg?branch=master)](https://travis-ci.org/openzipkin/zipkin-java) [![Download](https://api.bintray.com/packages/openzipkin/maven/zipkin-java/images/download.svg) ](https://bintray.com/openzipkin/maven/zipkin-java/_latestVersion)

# zipkin-java
This project is a native java port of [zipkin](https://github.com/openzipkin/zipkin), which was historically written in scala+finagle. This includes a dependency-free library and a [spring-boot](http://projects.spring.io/spring-boot/) replacement for zipkin's query server. Storage options include in-memory, JDBC (mysql) and Cassandra.
This project is a native java port of [zipkin](https://github.com/openzipkin/zipkin), which was historically written in scala+finagle. This includes a dependency-free library and a [spring-boot](http://projects.spring.io/spring-boot/) replacement for zipkin's query server. Storage options include in-memory, JDBC (mysql), Cassandra, and Elasticsearch.

## Library
The [core library](https://github.com/openzipkin/zipkin-java/tree/master/zipkin/src/main/java/io/zipkin) requires minimum language level 7. While currently only used by the server, we expect this library to be used in native instrumentation as well.
Expand Down
15 changes: 15 additions & 0 deletions interop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,27 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-elasticsearch</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-jdbc</artifactId>
<scope>test</scope>
</dependency>

<!-- Force guava version to 19.0 for elasticsearch because zipkin-common uses 16.0 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
<scope>test</scope>
</dependency>


<!-- Dependencies for SpanStoreSpec -->
<dependency>
<groupId>io.zipkin</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.elasticsearch;

import com.twitter.zipkin.common.Span;
import com.twitter.zipkin.storage.DependencyStore;
import com.twitter.zipkin.storage.DependencyStoreSpec;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import scala.collection.immutable.List;
import zipkin.DependencyLink;
import zipkin.InMemorySpanStore;
import zipkin.SpanStore;
import zipkin.interop.ScalaDependencyStoreAdapter;
import zipkin.interop.ScalaSpanStoreAdapter;

import static zipkin.internal.Util.midnightUTC;

public class ElasticsearchScalaDependencyStoreTest extends DependencyStoreSpec {
private static ElasticsearchSpanStore spanStore;

@BeforeClass
public static void setupDB() {
spanStore = ElasticsearchTestGraph.INSTANCE.spanStore();
}

public DependencyStore store() {
return new ScalaDependencyStoreAdapter(spanStore);
}

@Override
public void processDependencies(List<Span> input) {
SpanStore mem = new InMemorySpanStore();
new ScalaSpanStoreAdapter(mem).apply(input);
java.util.List<DependencyLink>
links = mem.getDependencies(today() + TimeUnit.DAYS.toMillis(1), null);

long midnight = midnightUTC(((long) input.apply(0).timestamp().get()) / 1000);
ElasticsearchTestGraph.INSTANCE.spanStore().writeDependencyLinks(links, midnight);
}

public void clear() {
spanStore.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.elasticsearch;

import com.twitter.zipkin.storage.SpanStore;
import com.twitter.zipkin.storage.SpanStoreSpec;
import org.junit.BeforeClass;
import zipkin.interop.ScalaSpanStoreAdapter;

public class ElasticsearchScalaSpanStoreTest extends SpanStoreSpec {
private static ElasticsearchSpanStore spanStore;

@BeforeClass
public static void setupDB() {
spanStore = ElasticsearchTestGraph.INSTANCE.spanStore();
}

public SpanStore store() {
return new ScalaSpanStoreAdapter(spanStore);
}

public void clear() {
spanStore.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.elasticsearch;

import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.junit.AssumptionViolatedException;

enum ElasticsearchTestGraph {
INSTANCE;

static final ElasticsearchConfig CONFIG = new ElasticsearchConfig.Builder().build();

static {
// Avoid race-conditions in travis by forcing read-your-writes consistency.
ElasticsearchSpanConsumer.BLOCK_ON_FUTURES = true;
}

private AssumptionViolatedException ex;
private ElasticsearchSpanStore spanStore;

/** A lot of tech debt here because the spanstore constructor performs I/O. */
synchronized ElasticsearchSpanStore spanStore() {
if (ex != null) throw ex;
if (this.spanStore == null) {
try {
this.spanStore = new ElasticsearchSpanStore(CONFIG);
} catch (NoNodeAvailableException e) {
throw ex = new AssumptionViolatedException(e.getMessage());
}
}
return spanStore;
}
}
9 changes: 9 additions & 0 deletions pom.xml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
MariaDB has a friendlier license, LGPL, which is less scary in audits.
-->
<mariadb-java-client.version>1.3.6</mariadb-java-client.version>
<elasticsearch.version>2.2.1</elasticsearch.version>
<slf4j.version>1.7.19</slf4j.version>
<junit.version>4.12</junit.version>
<assertj.version>3.3.0</assertj.version>

Expand Down Expand Up @@ -146,6 +148,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>transport-kafka</artifactId>
Expand Down Expand Up @@ -295,6 +303,7 @@
<exclude>LICENSE</exclude>
<exclude>**/*.md</exclude>
<exclude>dsc-cassandra-*/**</exclude>
<exclude>elasticsearch-*/**</exclude>
<exclude>src/test/resources/**</exclude>
<exclude>src/main/resources/**</exclude>
</excludes>
Expand Down
18 changes: 18 additions & 0 deletions zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ Example usage:
$ STORAGE_TYPE=mysql MYSQL_USER=root ./mvnw -pl zipkin-server spring-boot:run
```


### Elasticsearch
The following apply when `STORAGE_TYPE` is set to `elasticsearch`:

* `ES_CLUSTER`: The name of the elasticsearch cluster to connect to. Defaults to "elasticsearch".
* `ES_HOSTS`: A comma separated list of elasticsearch hostnodes to connect to, in host:port
format. The port should be the transport port, not the http port. Defaults to
"localhost:9300". Only one of these hosts needs to be available to fetch the
remaining nodes in the cluster. It is recommended to set this to all the master
nodes of the cluster.
* `ES_INDEX`: The index prefix to use when generating daily index names. Defaults to zipkin.

Example usage:

```bash
$ STORAGE_TYPE=elasticsearch ES_CLUSTER=monitoring ES_HOSTS=host1:9300,host2:9300 ./mvnw -pl zipkin-server spring-boot:run
```

### Kafka
The following apply when `KAFKA_ZOOKEEPER` is set:

Expand Down
15 changes: 15 additions & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@
<optional>true</optional>
</dependency>

<!-- Elasticsearch backend -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spanstore-elasticsearch</artifactId>
<optional>true</optional>
</dependency>

<!-- Explicitly marked for Intellij -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<optional>true</optional>
</dependency>

<!-- JDBC backend -->
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.server;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("elasticsearch")
public class ZipkinElasticsearchProperties {

/**
* The elasticsearch cluster to connect to, defaults to "elasticsearch".
*/
private String cluster = "elasticsearch";

/**
* A comma separated list of elasticsearch hostnodes to connect to, in host:port
* format. The port should be the transport port, not the http port. Defaults to
* "localhost:9300".
*/
private String hosts = "localhost:9300";

/**
* The index prefix to use when generating daily index names. Defaults to zipkin.
*/
private String index = "zipkin";

public String getCluster() {
return cluster;
}

public ZipkinElasticsearchProperties setCluster(String cluster) {
this.cluster = cluster;
return this;
}

public String getHosts() {
return hosts;
}

public ZipkinElasticsearchProperties setHosts(String hosts) {
this.hosts = hosts;
return this;
}

public String getIndex() {
return index;
}

public ZipkinElasticsearchProperties setIndex(String index) {
this.index = index;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import zipkin.SpanStore;
import zipkin.cassandra.CassandraConfig;
import zipkin.cassandra.CassandraSpanStore;
import zipkin.elasticsearch.ElasticsearchConfig;
import zipkin.elasticsearch.ElasticsearchSpanStore;
import zipkin.jdbc.JDBCSpanStore;
import zipkin.kafka.KafkaConfig;
import zipkin.kafka.KafkaTransport;
import zipkin.server.brave.TraceWritesSpanStore;

@Configuration
@EnableConfigurationProperties({ZipkinServerProperties.class, ZipkinCassandraProperties.class})
@EnableConfigurationProperties(ZipkinServerProperties.class)
@EnableAsync(proxyTargetClass = true)
public class ZipkinServerConfiguration {

Expand All @@ -71,6 +73,11 @@ Sampler traceIdSampler(@Value("${zipkin.collector.sample-rate:1.0}") float rate)
@Bean
@ConditionalOnMissingBean(SpanStore.class)
SpanStore spanStore() {
if (server.getStore().getType() != ZipkinServerProperties.Store.Type.mem) {
throw new IllegalStateException("Attempted to set storage type to "
+ server.getStore().getType() + " but could not initialize the spanstore for "
+ "that storage type. Did you include it on the classpath?");
}
return new InMemorySpanStore();
}

Expand Down Expand Up @@ -132,6 +139,21 @@ static class CassandraConfiguration {
}
}

@Configuration
@EnableConfigurationProperties(ZipkinElasticsearchProperties.class)
@ConditionalOnProperty(name = "zipkin.store.type", havingValue = "elasticsearch")
@ConditionalOnClass(name = "zipkin.elasticsearch.ElasticsearchSpanStore")
static class ElasticsearchConfiguration {
@Bean SpanStore elasticsearchSpanStore(ZipkinElasticsearchProperties elasticsearch) {
ElasticsearchConfig config = new ElasticsearchConfig.Builder()
.cluster(elasticsearch.getCluster())
.hosts(elasticsearch.getHosts())
.index(elasticsearch.getIndex())
.build();
return new ElasticsearchSpanStore(config);
}
}

/**
* This transport consumes a topic, decodes spans from thrift messages and stores them subject to
* sampling policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Ui getUi() {

static class Store {
enum Type {
cassandra, mysql, mem
cassandra, mysql, elasticsearch, mem
}

private Type type = Type.mem;
Expand Down
4 changes: 4 additions & 0 deletions zipkin-server/src/main/resources/zipkin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ mysql:
db: ${MYSQL_DB:zipkin}
max-active: ${MYSQL_MAX_CONNECTIONS:10}
use-ssl: ${MYSQL_USE_SSL:false}
elasticsearch:
cluster: ${ES_CLUSTER:elasticsearch}
hosts: ${ES_HOSTS:localhost:9300}
index: ${ES_INDEX:zipkin}
kafka:
# ZooKeeper host string, comma-separated host:port value.
zookeeper: ${KAFKA_ZOOKEEPER:}
Expand Down
Loading

0 comments on commit 36a3e53

Please sign in to comment.