Skip to content

Commit

Permalink
Add support for ElasticSearch alias to read spans from
Browse files Browse the repository at this point in the history
 * Add ES_USE_ALIASES parameter to use 'jaeger-span-read' index alias instead of indexDate postfixed one
 * Add range query to only fetch the spans for the last 24 hours based on startTimeMillies field

Signed-off-by: Christian Rohmann <[email protected]>
  • Loading branch information
frittentheke committed Mar 26, 2020
1 parent 5828af5 commit 6448c18
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static final class Builder {
Boolean clientNodeOnly = Boolean.parseBoolean(Utils.getEnv("ES_CLIENT_NODE_ONLY", "false"));
Boolean nodesWanOnly = Boolean.parseBoolean(Utils.getEnv("ES_NODES_WAN_ONLY", "false"));
String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null);
Boolean useAliases = Boolean.parseBoolean(Utils.getEnv("ES_USE_ALIASES", "false"));

final Map<String, String> sparkProperties = new LinkedHashMap<>();

Expand Down Expand Up @@ -158,6 +159,7 @@ private static String getSystemPropertyAsFileResource(String key) {
private final ZonedDateTime day;
private final SparkConf conf;
private final String indexPrefix;
private final Boolean useAliases;

ElasticsearchDependenciesJob(Builder builder) {
this.day = builder.day;
Expand Down Expand Up @@ -186,6 +188,7 @@ private static String getSystemPropertyAsFileResource(String key) {
conf.set(entry.getKey(), entry.getValue());
}
this.indexPrefix = builder.indexPrefix;
this.useAliases = builder.useAliases;
}

/**
Expand All @@ -200,7 +203,12 @@ private static String prefix(String prefix) {
}

public void run(String peerServiceTag) {
run(indexDate("jaeger-span"), indexDate("jaeger-dependencies") ,peerServiceTag);
if (this.useAliases) {
run(new String[]{"jaeger-span-read"}, indexDate("jaeger-dependencies") ,peerServiceTag);
}
else {
run(indexDate("jaeger-span"), indexDate("jaeger-dependencies") ,peerServiceTag);
}
}

String[] indexDate(String index) {
Expand All @@ -219,7 +227,13 @@ void run(String[] spanIndices, String[] depIndices,String peerServiceTag) {
String spanIndex = spanIndices[i];
String depIndex = depIndices[i];
log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanIndex, depIndex);
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex)

// Send raw query to ES to select only the docs / spans we want to consider for this job
// When using rollover indices there might be none or even multiple per day - thus Jaeger expects an alias to be available
// pointing to all potential indices.
// Always applying this "filter" does not do any harm even for the indexDated ones
String esQuery = "{\"range\": {\"startTimeMillis\": { \"gte\": \"now-24h\" }}}";
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex, esQuery)
.map(new ElasticTupleToSpan())
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);
Expand Down

0 comments on commit 6448c18

Please sign in to comment.