Skip to content

Commit

Permalink
Add zoneId support to ES metrics queries
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Mar 21, 2024
1 parent 59dc465 commit 0672c70
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,28 +93,21 @@ public HttpResponse clusterMetadata() {
@Blocking
@Path("/_msearch")
public HttpResponse multiSearch(String postBody) throws Exception {
LOG.info("Search request: {}", postBody);
LOG.debug("Search request: {}", postBody);

CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext();
try (var scope = new StructuredTaskScope<EsSearchResponse>()) {
List<StructuredTaskScope.Subtask<EsSearchResponse>> requestSubtasks = new ArrayList<>();
try {
requestSubtasks =
openSearchRequest.parseHttpPostBody(postBody).stream()
.map((request) -> scope.fork(currentTraceContext.wrap(() -> doSearch(request))))
.toList();
} catch (Exception e) {
LOG.error("Error parsing request", e);
}
List<StructuredTaskScope.Subtask<EsSearchResponse>> requestSubtasks =
openSearchRequest.parseHttpPostBody(postBody).stream()
.map((request) -> scope.fork(currentTraceContext.wrap(() -> doSearch(request))))
.toList();

scope.join();
SearchResponseMetadata responseMetadata =
new SearchResponseMetadata(
0,
requestSubtasks.stream().map(StructuredTaskScope.Subtask::get).toList(),
Map.of("traceId", getTraceId()));

LOG.info("search results - {}", JsonUtil.writeAsString(responseMetadata));
return HttpResponse.of(
HttpStatus.OK, MediaType.JSON_UTF_8, JsonUtil.writeAsString(responseMetadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private static List<KaldbSearch.SearchRequest.SearchAggregation> getRecursive(Js
getDateHistogramMinDocCount(dateHistogram))
.setInterval(
getDateHistogramInterval(dateHistogram))
.setZoneId(getDateHistogramZoneId(dateHistogram))
.putAllExtendedBounds(
getDateHistogramExtendedBounds(dateHistogram))
.setFormat(getDateHistogramFormat(dateHistogram))
Expand Down Expand Up @@ -484,6 +485,13 @@ private static String getDateHistogramInterval(JsonNode dateHistogram) {
return "auto";
}

private static String getDateHistogramZoneId(JsonNode dateHistogram) {
if (dateHistogram.has("time_zone")) {
return dateHistogram.get("time_zone").asText();
}
return null;
}

private static String getHistogramInterval(JsonNode dateHistogram) {
if (dateHistogram.has("interval")) {
return dateHistogram.get("interval").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -953,8 +954,8 @@ protected static DateHistogramAggregationBuilder getDateHistogramAggregationBuil
DateHistogramAggregationBuilder dateHistogramAggregationBuilder =
new DateHistogramAggregationBuilder(builder.getName())
.field(builder.getField())
.fixedInterval(new DateHistogramInterval(builder.getInterval()))
.minDocCount(builder.getMinDocCount());
.minDocCount(builder.getMinDocCount())
.fixedInterval(new DateHistogramInterval(builder.getInterval()));

if (builder.getOffset() != null && !builder.getOffset().isEmpty()) {
dateHistogramAggregationBuilder.offset(builder.getOffset());
Expand All @@ -965,6 +966,10 @@ protected static DateHistogramAggregationBuilder getDateHistogramAggregationBuil
// dateHistogramAggregationBuilder.format(builder.getFormat());
}

if (builder.getZoneId() != null && !builder.getZoneId().isEmpty()) {
dateHistogramAggregationBuilder.timeZone(ZoneId.of(builder.getZoneId()));
}

if (builder.getMinDocCount() == 0) {
if (builder.getExtendedBounds() != null
&& builder.getExtendedBounds().containsKey("min")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {

addField(
doc, LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toEpochMilli(), "", 0);

// todo - this should be removed once we simplify the time handling
// this will be overridden below if a user provided value exists
jsonMap.put(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timestamp.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public static AggBuilder fromSearchAggregations(
searchAggregation.getValueSource().getField(),
searchAggregation.getValueSource().getDateHistogram().getInterval(),
searchAggregation.getValueSource().getDateHistogram().getOffset(),
searchAggregation.getValueSource().getDateHistogram().getZoneId(),
searchAggregation.getValueSource().getDateHistogram().getMinDocCount(),
searchAggregation.getValueSource().getDateHistogram().getFormat(),
searchAggregation.getValueSource().getDateHistogram().getExtendedBoundsMap(),
Expand Down Expand Up @@ -517,6 +518,11 @@ public static KaldbSearch.SearchRequest.SearchAggregation toSearchAggregationPro
dateHistogramAggregationBuilder.setFormat(dateHistogramAggBuilder.getFormat());
}

if (dateHistogramAggBuilder.getZoneId() != null
&& !dateHistogramAggBuilder.getZoneId().isEmpty()) {
dateHistogramAggregationBuilder.setZoneId(dateHistogramAggBuilder.getZoneId());
}

return KaldbSearch.SearchRequest.SearchAggregation.newBuilder()
.setType(DateHistogramAggBuilder.TYPE)
.setName(dateHistogramAggBuilder.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class DateHistogramAggBuilder extends ValueSourceAggBuilder {
public static final String TYPE = "date_histogram";
private final String interval;
private final String offset;
private final String zoneId;
private final long minDocCount;

private final String format;
Expand All @@ -20,6 +21,7 @@ public DateHistogramAggBuilder(String name, String fieldName, String interval) {

this.interval = interval;
this.offset = "";
this.zoneId = null;
this.minDocCount = 1;
this.format = null;
this.extendedBounds = Map.of();
Expand All @@ -30,6 +32,7 @@ public DateHistogramAggBuilder(
String fieldName,
String interval,
String offset,
String zoneId,
long minDocCount,
String format,
Map<String, Long> extendedBounds,
Expand All @@ -39,6 +42,7 @@ public DateHistogramAggBuilder(

this.interval = interval;
this.offset = offset;
this.zoneId = zoneId;
this.minDocCount = minDocCount;
this.format = format;
this.extendedBounds = extendedBounds;
Expand All @@ -48,6 +52,10 @@ public String getInterval() {
return interval;
}

public String getZoneId() {
return zoneId;
}

public String getOffset() {
return offset;
}
Expand All @@ -72,14 +80,13 @@ public String getType() {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DateHistogramAggBuilder)) return false;
if (!(o instanceof DateHistogramAggBuilder that)) return false;
if (!super.equals(o)) return false;

DateHistogramAggBuilder that = (DateHistogramAggBuilder) o;

if (minDocCount != that.minDocCount) return false;
if (!interval.equals(that.interval)) return false;
if (!Objects.equals(offset, that.offset)) return false;
if (!Objects.equals(zoneId, that.zoneId)) return false;
if (!Objects.equals(format, that.format)) return false;
return Objects.equals(extendedBounds, that.extendedBounds);
}
Expand All @@ -89,6 +96,7 @@ public int hashCode() {
int result = super.hashCode();
result = 31 * result + interval.hashCode();
result = 31 * result + (offset != null ? offset.hashCode() : 0);
result = 31 * result + (zoneId != null ? zoneId.hashCode() : 0);
result = 31 * result + (int) (minDocCount ^ (minDocCount >>> 32));
result = 31 * result + (format != null ? format.hashCode() : 0);
result = 31 * result + (extendedBounds != null ? extendedBounds.hashCode() : 0);
Expand All @@ -104,6 +112,9 @@ public String toString() {
+ ", offset='"
+ offset
+ '\''
+ ", zoneId='"
+ zoneId
+ '\''
+ ", minDocCount="
+ minDocCount
+ ", format='"
Expand All @@ -114,6 +125,11 @@ public String toString() {
+ ", field='"
+ field
+ '\''
+ ", missing="
+ missing
+ ", script='"
+ script
+ '\''
+ ", name='"
+ name
+ '\''
Expand Down
2 changes: 2 additions & 0 deletions kaldb/src/main/proto/kaldb_search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ message SearchRequest {
map<string, int64> extended_bounds = 4;
// Format for the resulting buckets timestamps
string format = 5;
// Date zoneId if requesting with timezone option
string zoneId = 6;
}

// Unique fields specific to the auto date histogram aggregation request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public void canBuildValidDateHistogram() throws IOException {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
100,
"epoch_ms",
Map.of(),
Expand Down Expand Up @@ -265,6 +266,7 @@ public void canBuildValidCumulativeSumPipelineAggregator() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
100,
"epoch_ms",
Map.of(),
Expand Down Expand Up @@ -292,6 +294,7 @@ public void canBuildValidMovingFunctionPipelineAggregator() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
100,
"epoch_ms",
Map.of(),
Expand Down Expand Up @@ -319,6 +322,7 @@ public void canBuildValidMovingAveragePipelineAggregator() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
100,
"epoch_ms",
Map.of(),
Expand Down Expand Up @@ -346,6 +350,7 @@ public void canBuildValidDerivativePipelineAggregator() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
100,
"epoch_ms",
Map.of(),
Expand Down Expand Up @@ -392,6 +397,7 @@ public void handlesDateHistogramExtendedBoundsMinDocEdgeCases() throws IOExcepti
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
0,
"epoch_ms",
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void canSerializeDeserializeInternalDateHistogramAggregation() throws IOE
new AvgAggBuilder("foo", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, "3", null);
DateHistogramAggBuilder dateHistogramAggBuilder =
new DateHistogramAggBuilder(
"foo", "epoch_ms", "10s", "5s", 10, "epoch_ms", Map.of(), List.of(avgAggBuilder));
"foo", "epoch_ms", "10s", "5s", null, 10, "epoch_ms", Map.of(), List.of(avgAggBuilder));
CollectorManager<Aggregator, InternalAggregation> collectorManager =
openSearchAdapter.getCollectorManager(
dateHistogramAggBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ public void testPipelineAggregation() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"1s",
null,
null,
0,
"epoch_ms",
Map.of("min", 1593365471000L, "max", 1593365471000L + 5000L),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ private InternalAggregation makeHistogram(
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
interval,
"0",
null,
0,
"",
Map.of("min", histogramStartMs, "max", histogramEndMs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void shouldConvertDateHistogramAggToFromProto() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
10000,
"epoch_ms",
Map.of(
Expand Down Expand Up @@ -281,6 +282,7 @@ public void shouldConvertNestedAggregations() {
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
"5s",
"2s",
null,
10000,
"epoch_ms",
Map.of(
Expand All @@ -294,6 +296,7 @@ public void shouldConvertNestedAggregations() {
"duration_ms",
"10s",
"7s",
null,
1000,
"epoch_ms",
Map.of(
Expand Down
Loading

0 comments on commit 0672c70

Please sign in to comment.