Skip to content

Commit

Permalink
Elasticsearch: Make the span range configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach committed Jun 9, 2020
1 parent 5828af5 commit 99f12ce
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Build Status][ci-img]][ci] [![Docker Build Status](https://img.shields.io/docker/build/jaegertracing/spark-dependencies.svg)](https://hub.docker.com/r/jaegertracing/spark-dependencies/builds/)
[![Build Status][ci-img]][ci] [![Docker Build Status](https://img.shields.io/docker/build/jaegertracing/spark-dependencies.svg)](https://hub.docker.com/r/jaegertracing/spark-dependencies/builds/)
[![Docker Pulls](https://img.shields.io/docker/pulls/jaegertracing/spark-dependencies.svg)](https://hub.docker.com/r/jaegertracing/spark-dependencies/)

# Jaeger Spark dependencies
Expand Down Expand Up @@ -74,10 +74,11 @@ Elasticsearch is used when `STORAGE=elasticsearch`.
* `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security
(formerly Shield) is in place. By default no username or
password is provided to elasticsearch.
* `ES_CLIENT_NODE_ONLY`: Set to true to disable elasticsearch cluster nodes.discovery and enable nodes.client.only.
If your elasticsearch cluster's data nodes only listen on loopback ip, set this to true.
* `ES_CLIENT_NODE_ONLY`: Set to true to disable elasticsearch cluster nodes.discovery and enable nodes.client.only.
If your elasticsearch cluster's data nodes only listen on loopback ip, set this to true.
Defaults to false
* `ES_INDEX_PREFIX`: index prefix of Jaeger indices. By default unset.
* `ES_SPAN_RANGE`: How far into the past should the job look, the maximum and default is 24h.

Example usage:

Expand All @@ -93,11 +94,11 @@ STORAGE=elasticsearch ES_NODES=http://localhost:9200 java -jar jaeger-spark-depe
docker build -t jaegertracing/spark-dependencies:latest .
```

In tests it's possible to specify version of Jaeger images by env variable `JAEGER_VERSION`
In tests it's possible to specify version of Jaeger images by env variable `JAEGER_VERSION`
or system property `jaeger.version`. By default tests are using latest images.

## License

[Apache 2.0 License](./LICENSE).


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static final class Builder {
Boolean clientNodeOnly = Boolean.parseBoolean(Utils.getEnv("ES_CLIENT_NODE_ONLY", "false"));
Boolean nodesWanOnly = Boolean.parseBoolean(Utils.getEnv("ES_NODES_WAN_ONLY", "false"));
String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null);
String spanRange = Utils.getEnv("ES_SPAN_RANGE", "24h");

final Map<String, String> sparkProperties = new LinkedHashMap<>();

Expand Down Expand Up @@ -117,6 +118,12 @@ public Builder indexPrefix(String indexPrefix) {
return this;
}

/** span range for Jaeger indices. By default 24h */
public Builder spanRange(String spanRange) {
this.spanRange = spanRange;
return this;
}

/** Day to process dependencies for. Defaults to today. */
public Builder day(LocalDate day) {
this.day = day.atStartOfDay(ZoneOffset.UTC);
Expand Down Expand Up @@ -158,6 +165,7 @@ private static String getSystemPropertyAsFileResource(String key) {
private final ZonedDateTime day;
private final SparkConf conf;
private final String indexPrefix;
private final String spanRange;

ElasticsearchDependenciesJob(Builder builder) {
this.day = builder.day;
Expand Down Expand Up @@ -186,6 +194,7 @@ private static String getSystemPropertyAsFileResource(String key) {
conf.set(entry.getKey(), entry.getValue());
}
this.indexPrefix = builder.indexPrefix;
this.spanRange = builder.spanRange;
}

/**
Expand Down Expand Up @@ -219,7 +228,9 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) {
String spanIndex = spanIndices[i];
String depIndex = depIndices[i];
log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanIndex, depIndex);
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex)
// Send raw query to ES to select only the docs / spans we want to consider for this job
String esQuery = String.format("{\"range\": {\"startTimeMillis\": { \"gte\": \"now-%s\" }}}", spanRange);
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery)
.map(new ElasticTupleToSpan())
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);
Expand Down

0 comments on commit 99f12ce

Please sign in to comment.