Skip to content

Commit

Permalink
[Transform] Release test resources
Browse files Browse the repository at this point in the history
Consume the HttpEntity after the API response is parsed, releasing
network and thread resources back to their respective pools.

Leaving them unconsumed does not appear to be causing issues during
tests, but it does log a large amount of hanging threads on test
failure, making it harder to spot what may be the issue when a thread is
hanging during a transform test.

Close elastic#107055
  • Loading branch information
prwhelan committed Apr 3, 2024
1 parent ea9e6a9 commit b95d9b5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,16 @@ public abstract class ESRestTestCase extends ESTestCase {

/**
* Convert the entity from a {@link Response} into a map of maps.
* Consumes the underlying HttpEntity, releasing any resources it may be holding.
*/
public static Map<String, Object> entityAsMap(Response response) throws IOException {
return entityAsMap(response.getEntity());
}

/**
* Convert the entity from a {@link Response} into a map of maps.
* Consumes the underlying HttpEntity, releasing any resources it may be holding.
*/
public static Map<String, Object> entityAsMap(HttpEntity entity) throws IOException {
XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue());
// EMPTY and THROW are fine here because `.map` doesn't use named x content or deprecation
Expand All @@ -174,11 +179,14 @@ public static Map<String, Object> entityAsMap(HttpEntity entity) throws IOExcept
)
) {
return parser.map();
} finally {
EntityUtils.consumeQuietly(entity);
}
}

/**
* Convert the entity from a {@link Response} into a list of maps.
* Consumes the underlying HttpEntity, releasing any resources it may be holding.
*/
public static List<Object> entityAsList(Response response) throws IOException {
XContentType xContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue());
Expand All @@ -192,6 +200,8 @@ public static List<Object> entityAsList(Response response) throws IOException {
)
) {
return parser.list();
} finally {
EntityUtils.consumeQuietly(response.getEntity());
}
}

Expand Down Expand Up @@ -1603,6 +1613,14 @@ public static Response assertOK(Response response) {
return response;
}

public static void assertOKAndConsume(Response response) {
try {
assertOK(response);
} finally {
EntityUtils.consumeQuietly(response.getEntity());
}
}

public static ObjectPath assertOKAndCreateObjectPath(Response response) throws IOException {
assertOK(response);
return ObjectPath.createFromResponse(response);
Expand All @@ -1622,9 +1640,14 @@ public static void assertDocCount(RestClient client, String indexName, long docC
}

public static void assertAcknowledged(Response response) throws IOException {
assertOK(response);
String jsonBody = EntityUtils.toString(response.getEntity());
assertThat(jsonBody, containsString("\"acknowledged\":true"));
try {
assertOK(response);
String jsonBody = EntityUtils.toString(response.getEntity());
assertThat(jsonBody, containsString("\"acknowledged\":true"));
} finally {
// if assertOK throws an exception, still release resources
EntityUtils.consumeQuietly(response.getEntity());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
assertFalse(aliasExists(destWriteAlias));

String transformConfig = createTransformConfig(sourceIndex, destIndex, destReadAlias, destWriteAlias);
assertAcknowledged(putTransform(transformId, transformConfig, true, RequestOptions.DEFAULT));
putTransform(transformId, transformConfig, true, RequestOptions.DEFAULT);
}

List<String> transformIdsShuffled = new ArrayList<>(transformIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.Level;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -94,9 +95,7 @@ private void logAudits() throws Exception {
assertBusy(() -> {
try {
refreshIndex(TransformInternalIndexConstants.AUDIT_INDEX_PATTERN, RequestOptions.DEFAULT);
Response searchResponse = client().performRequest(searchRequest);

Map<String, Object> searchResult = entityAsMap(searchResponse);
Map<String, Object> searchResult = entityAsMap(client().performRequest(searchRequest));
List<Map<String, Object>> searchHits = (List<Map<String, Object>>) XContentMapValues.extractValue(
"hits.hits",
searchResult
Expand Down Expand Up @@ -143,7 +142,7 @@ protected void cleanUpTransforms() throws IOException {
protected void refreshIndex(String index, RequestOptions options) throws IOException {
var r = new Request("POST", index + "/_refresh");
r.setOptions(options);
assertOK(adminClient().performRequest(r));
assertOKAndConsume(adminClient().performRequest(r));
}

protected Map<String, Object> getIndexMapping(String index, RequestOptions options) throws IOException {
Expand Down Expand Up @@ -225,15 +224,15 @@ protected void deleteTransform(String id, boolean force) throws IOException {
if (force) {
request.addParameter(TransformField.FORCE.getPreferredName(), "true");
}
assertOK(adminClient().performRequest(request));
assertAcknowledged(adminClient().performRequest(request));
createdTransformIds.remove(id);
}

protected Response putTransform(String id, String config, RequestOptions options) throws IOException {
return putTransform(id, config, false, options);
protected void putTransform(String id, String config, RequestOptions options) throws IOException {
putTransform(id, config, false, options);
}

protected Response putTransform(String id, String config, boolean deferValidation, RequestOptions options) throws IOException {
protected void putTransform(String id, String config, boolean deferValidation, RequestOptions options) throws IOException {
if (createdTransformIds.contains(id)) {
throw new IllegalArgumentException("transform [" + id + "] is already registered");
}
Expand All @@ -244,9 +243,8 @@ protected Response putTransform(String id, String config, boolean deferValidatio
request.addParameter("defer_validation", "true");
}
request.setOptions(options);
Response response = assertOK(client().performRequest(request));
assertAcknowledged(client().performRequest(request));
createdTransformIds.add(id);
return response;
}

protected Map<String, Object> previewTransform(String transformConfig, RequestOptions options) throws IOException {
Expand All @@ -271,8 +269,7 @@ protected Map<String, Object> getBasicTransformStats(String id) throws IOExcepti
var request = new Request("GET", TRANSFORM_ENDPOINT + id + "/_stats");
request.addParameter(BASIC_STATS.getPreferredName(), "true");
request.setOptions(RequestOptions.DEFAULT);
Response response = client().performRequest(request);
List<Map<String, Object>> stats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", entityAsMap(response));
var stats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", entityAsMap(client().performRequest(request)));
assertThat(stats, hasSize(1));
return stats.get(0);
}
Expand All @@ -283,11 +280,10 @@ protected String getTransformState(String id) throws IOException {

@SuppressWarnings("unchecked")
protected Map<String, Object> getTransform(String id) throws IOException {
Request request = new Request("GET", TRANSFORM_ENDPOINT + id);
Response response = client().performRequest(request);
List<Map<String, Object>> transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue(
var request = new Request("GET", TRANSFORM_ENDPOINT + id);
var transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue(
"transforms",
entityAsMap(response)
entityAsMap(client().performRequest(request))
);
assertThat(transformConfigs, hasSize(1));
return transformConfigs.get(0);
Expand All @@ -314,14 +310,6 @@ protected long getCheckpoint(Map<String, Object> stats) {
return ((Integer) XContentMapValues.extractValue("checkpointing.last.checkpoint", stats)).longValue();
}

protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval(
String field,
DateHistogramInterval interval,
ZoneId zone
) {
return new DateHistogramGroupSource(field, null, false, new DateHistogramGroupSource.FixedInterval(interval), zone, null);
}

protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInterval(
String field,
DateHistogramInterval interval,
Expand Down Expand Up @@ -414,7 +402,7 @@ protected TransformConfig.Builder createTransformConfigBuilder(
String destinationIndex,
QueryConfig queryConfig,
String... sourceIndices
) throws Exception {
) {
return TransformConfig.builder()
.setId(id)
.setSource(new SourceConfig(sourceIndices, queryConfig, Collections.emptyMap()))
Expand All @@ -434,7 +422,7 @@ protected void updateConfig(String id, String update, boolean deferValidation, R
}
updateRequest.setJsonEntity(update);
updateRequest.setOptions(options);
assertOK(client().performRequest(updateRequest));
assertOKAndConsume(client().performRequest(updateRequest));
}

protected void createReviewsIndex(
Expand Down Expand Up @@ -504,7 +492,7 @@ protected void createReviewsIndex(
Request req = new Request("PUT", indexName);
req.setEntity(indexMappings);
req.setOptions(RequestOptions.DEFAULT);
assertOK(adminClient().performRequest(req));
assertAcknowledged(adminClient().performRequest(req));
}

// create index
Expand Down Expand Up @@ -546,18 +534,24 @@ protected void doBulk(String bulkDocuments, boolean refresh) throws IOException
bulkRequest.setJsonEntity(bulkDocuments);
bulkRequest.setOptions(RequestOptions.DEFAULT);
Response bulkResponse = adminClient().performRequest(bulkRequest);
assertOK(bulkResponse);
var bulkMap = entityAsMap(bulkResponse);
assertThat((boolean) bulkMap.get("errors"), is(equalTo(false)));
try {
var bulkMap = entityAsMap(assertOK(bulkResponse));
assertThat((boolean) bulkMap.get("errors"), is(equalTo(false)));
} finally {
EntityUtils.consumeQuietly(bulkResponse.getEntity());
}
}

protected Map<String, Object> matchAllSearch(String index, int size, RequestOptions options) throws IOException {
Request request = new Request("GET", index + "/_search");
request.addParameter("size", Integer.toString(size));
request.setOptions(options);
Response response = client().performRequest(request);
assertOK(response);
return entityAsMap(response);
try {
return entityAsMap(assertOK(response));
} finally {
EntityUtils.consumeQuietly(response.getEntity());
}
}

private void waitForPendingTasks() {
Expand All @@ -572,7 +566,7 @@ private void waitForPendingTasks() {
);
request.addParameters(parameters);
try {
adminClient().performRequest(request);
EntityUtils.consumeQuietly(adminClient().performRequest(request).getEntity());
} catch (Exception e) {
throw new AssertionError("Failed to wait for pending tasks to complete", e);
}
Expand Down

0 comments on commit b95d9b5

Please sign in to comment.