Skip to content

Commit

Permalink
[Transform] improve TransformRestTestCase robustness (#55786)
Browse files Browse the repository at this point in the history
handles/retries temporary SearchPhaseExecutionErrors

fixes #54810
  • Loading branch information
Hendrik Muhs committed Apr 27, 2020
1 parent 6f392cf commit 4b93f17
Showing 1 changed file with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ protected static String getTransformEndpoint() {
}

@SuppressWarnings("unchecked")
private void logAudits() throws IOException {
private void logAudits() throws Exception {
logger.info("writing audit messages to the log");
Request searchRequest = new Request("GET", TransformInternalIndexConstants.AUDIT_INDEX + "/_search?ignore_unavailable=true");
searchRequest.setJsonEntity(
Expand All @@ -501,23 +501,36 @@ private void logAudits() throws IOException {
+ " ] }"
);

refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);

Response searchResponse = client().performRequest(searchRequest);
Map<String, Object> searchResult = entityAsMap(searchResponse);
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", searchResult);

for (Map<String, Object> hit : searchHits) {
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
String level = (String) source.getOrDefault("level", "info");
logger.log(
Level.getLevel(level.toUpperCase(Locale.ROOT)),
"Transform audit: [{}] [{}] [{}] [{}]",
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
source.getOrDefault("transform_id", "n/a"),
source.getOrDefault("message", "n/a"),
source.getOrDefault("node_name", "n/a")
);
}
assertBusy(() -> {
try {
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN);
Response searchResponse = client().performRequest(searchRequest);

Map<String, Object> searchResult = entityAsMap(searchResponse);
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
"hits.hits",
searchResult
);

for (Map<String, Object> hit : searchHits) {
Map<String, Object> source = (Map<String, Object>) XContentMapValues.extractValue("_source", hit);
String level = (String) source.getOrDefault("level", "info");
logger.log(
Level.getLevel(level.toUpperCase(Locale.ROOT)),
"Transform audit: [{}] [{}] [{}] [{}]",
Instant.ofEpochMilli((long) source.getOrDefault("timestamp", 0)),
source.getOrDefault("transform_id", "n/a"),
source.getOrDefault("message", "n/a"),
source.getOrDefault("node_name", "n/a")
);
}
} catch (ResponseException e) {
// see gh#54810, wrap temporary 503's as assertion error for retry
if (e.getResponse().getStatusLine().getStatusCode() != 503) {
throw e;
}
throw new AssertionError("Failed to retrieve audit logs", e);
}
}, 5, TimeUnit.SECONDS);
}
}

0 comments on commit 4b93f17

Please sign in to comment.