diff --git a/README.md b/README.md index 2fa1787..bc4c151 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status][ci-img]][ci] [![Docker Build Status](https://img.shields.io/docker/build/jaegertracing/spark-dependencies.svg)](https://hub.docker.com/r/jaegertracing/spark-dependencies/builds/) +[![Build Status][ci-img]][ci] [![Docker Build Status](https://img.shields.io/docker/build/jaegertracing/spark-dependencies.svg)](https://hub.docker.com/r/jaegertracing/spark-dependencies/builds/) [![Docker Pulls](https://img.shields.io/docker/pulls/jaegertracing/spark-dependencies.svg)](https://hub.docker.com/r/jaegertracing/spark-dependencies/) # Jaeger Spark dependencies @@ -74,10 +74,12 @@ Elasticsearch is used when `STORAGE=elasticsearch`. * `ES_USERNAME` and `ES_PASSWORD`: Elasticsearch basic authentication. Use when X-Pack security (formerly Shield) is in place. By default no username or password is provided to elasticsearch. - * `ES_CLIENT_NODE_ONLY`: Set to true to disable elasticsearch cluster nodes.discovery and enable nodes.client.only. - If your elasticsearch cluster's data nodes only listen on loopback ip, set this to true. + * `ES_CLIENT_NODE_ONLY`: Set to true to disable elasticsearch cluster nodes.discovery and enable nodes.client.only. + If your elasticsearch cluster's data nodes only listen on loopback ip, set this to true. Defaults to false * `ES_INDEX_PREFIX`: index prefix of Jaeger indices. By default unset. + * `ES_TIME_RANGE`: How far in the past the job should look to for spans, the maximum and default is `24h`. + Any value accepted by [date-math](https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#date-math) can be used here, but the anchor is always `now`. Example usage: @@ -93,11 +95,11 @@ STORAGE=elasticsearch ES_NODES=http://localhost:9200 java -jar jaeger-spark-depe docker build -t jaegertracing/spark-dependencies:latest . ``` -In tests it's possible to specify version of Jaeger images by env variable `JAEGER_VERSION` +In tests it's possible to specify version of Jaeger images by env variable `JAEGER_VERSION` or system property `jaeger.version`. By default tests are using latest images. ## License - + [Apache 2.0 License](./LICENSE). diff --git a/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java b/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java index 36d9160..42426c0 100644 --- a/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java +++ b/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java @@ -59,6 +59,7 @@ public static final class Builder { Boolean clientNodeOnly = Boolean.parseBoolean(Utils.getEnv("ES_CLIENT_NODE_ONLY", "false")); Boolean nodesWanOnly = Boolean.parseBoolean(Utils.getEnv("ES_NODES_WAN_ONLY", "false")); String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null); + String spanRange = Utils.getEnv("ES_TIME_RANGE", "24h"); final Map sparkProperties = new LinkedHashMap<>(); @@ -117,6 +118,12 @@ public Builder indexPrefix(String indexPrefix) { return this; } + /** span range for Jaeger indices. By default 24h */ + public Builder spanRange(String spanRange) { + this.spanRange = spanRange; + return this; + } + /** Day to process dependencies for. Defaults to today. */ public Builder day(LocalDate day) { this.day = day.atStartOfDay(ZoneOffset.UTC); @@ -158,6 +165,7 @@ private static String getSystemPropertyAsFileResource(String key) { private final ZonedDateTime day; private final SparkConf conf; private final String indexPrefix; + private final String spanRange; ElasticsearchDependenciesJob(Builder builder) { this.day = builder.day; @@ -186,6 +194,7 @@ private static String getSystemPropertyAsFileResource(String key) { conf.set(entry.getKey(), entry.getValue()); } this.indexPrefix = builder.indexPrefix; + this.spanRange = builder.spanRange; } /** @@ -219,7 +228,10 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) { String spanIndex = spanIndices[i]; String depIndex = depIndices[i]; log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanIndex, depIndex); - JavaPairRDD> traces = JavaEsSpark.esJsonRDD(sc, spanIndex) + // Send raw query to ES to select only the docs / spans we want to consider for this job + // This doesn't change the default behavior as the daily indexes only contain up to 24h of data + String esQuery = String.format("{\"range\": {\"startTimeMillis\": { \"gte\": \"now-%s\" }}}", spanRange); + JavaPairRDD> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery) .map(new ElasticTupleToSpan()) .groupBy(Span::getTraceId); List dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);