Skip to content

Commit

Permalink
ESQL: Enable async get to support formatting (#111104)
Browse files Browse the repository at this point in the history
I've updated the listener for GET /_query/async/{id} to EsqlResponseListener, so it now accepts parameters (delimiter, drop_null_columns and format) like the POST /_query API. Additionally, I have added tests to verify the correctness of the code.

You can now set the format in the request parameters to specify the return style.

Closes #110926
  • Loading branch information
kanoshiou authored Dec 9, 2024
1 parent b4e852a commit 67ee034
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 74 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/111104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111104
summary: "ESQL: Enable async get to support formatting"
area: ES|QL
type: feature
issues:
- 110926
4 changes: 4 additions & 0 deletions docs/reference/esql/esql-async-query-get-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ parameter is `true`.
[[esql-async-query-get-api-query-params]]
==== {api-query-parms-title}

The API accepts the same parameters as the synchronous
<<esql-query-api-query-params,query API>>, along with the following
parameters:

`wait_for_completion_timeout`::
(Optional, <<time-units,time value>>)
Timeout duration to wait for the request to finish. Defaults to no timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
ActionListener<Response> listener
) {
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
// This is will performed in case of timeout
// This will be performed in case of timeout
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
if (acquiredListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,21 @@ public void testTextMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));
}

public void testCSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
}

public void testTSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
}

public void testCSVNoHeaderMode() throws IOException {
Expand Down Expand Up @@ -1003,53 +1003,35 @@ public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException {
return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings());
return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
}

static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
if (mode == ASYNC) {
return runEsqlAsync(requestObject, assertWarnings);
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
} else {
return runEsqlSync(requestObject, assertWarnings);
}
}

public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
requestObject.build();
Request request = prepareRequest(SYNC);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
Request request = prepareRequestWithOptions(requestObject, SYNC);

HttpEntity entity = performRequest(request, assertWarnings);
return entityToMap(entity, requestObject.contentType());
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
addAsyncParameters(requestObject);
requestObject.build();
Request request = prepareRequest(ASYNC);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
}

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
public static Map<String, Object> runEsqlAsync(
RequestObjectBuilder requestObject,
boolean keepOnCompletion,
AssertWarnings assertWarnings
) throws IOException {
addAsyncParameters(requestObject, keepOnCompletion);
Request request = prepareRequestWithOptions(requestObject, ASYNC);

if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
Expand All @@ -1061,7 +1043,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
Object initialColumns = null;
Object initialValues = null;
var json = entityToMap(entity, requestObject.contentType());
checkKeepOnCompletion(requestObject, json);
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
String id = (String) json.get("id");

var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
Expand Down Expand Up @@ -1101,7 +1083,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec

// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
getRequest.setOptions(options);
getRequest.setOptions(request.getOptions());
response = performRequest(getRequest);
entity = response.getEntity();
}
Expand All @@ -1119,6 +1101,66 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
return removeAsyncProperties(result);
}

public void testAsyncGetWithoutContentType() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");

addAsyncParameters(requestObject, true);
Request request = prepareRequestWithOptions(requestObject, ASYNC);

if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
}

Response response = performRequest(request);
HttpEntity entity = response.getEntity();

var json = entityToMap(entity, requestObject.contentType());
checkKeepOnCompletion(requestObject, json, true);
String id = (String) json.get("id");
// results won't be returned since keepOnCompletion is true
assertThat(id, is(not(emptyOrNullString())));

// issue an "async get" request with no Content-Type
Request getRequest = prepareAsyncGetRequest(id);
response = performRequest(getRequest);
entity = response.getEntity();
var result = entityToMap(entity, XContentType.JSON);

ListMatcher values = matchesList();
for (int i = 0; i < count; i++) {
values = values.item(matchesList().item("keyword" + i).item(i));
}
assertMap(
result,
matchesMap().entry(
"columns",
matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword"))
.item(matchesMap().entry("name", "integer").entry("type", "integer"))
).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false)
);

}

static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
return request;
}

// Removes async properties, otherwise consuming assertions would need to handle sync and async differences
static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
Map<String, Object> copy = new HashMap<>(map);
Expand All @@ -1139,17 +1181,20 @@ protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType
}
}

static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException {
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
// deliberately short in order to frequently trigger return without results
requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100)));
requestObject.keepOnCompletion(randomBoolean());
requestObject.keepOnCompletion(keepOnCompletion);
requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10)));
}

// If keep_on_completion is set then an id must always be present, regardless of the value of any other property.
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) {
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) {
if (requestObject.keepOnCompletion()) {
assertTrue(keepOnCompletion);
assertThat((String) json.get("id"), not(emptyOrNullString()));
} else {
assertFalse(keepOnCompletion);
}
}

Expand All @@ -1167,14 +1212,19 @@ static void deleteNonExistent(Request request) throws IOException {
assertEquals(404, response.getStatusLine().getStatusCode());
}

static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
Request request = prepareRequest(SYNC);
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
throws IOException {
Request request = prepareRequest(mode);
if (mode == ASYNC) {
addAsyncParameters(builder, randomBoolean());
}
String mediaType = attachBody(builder.build(), request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
boolean addParam = randomBoolean();
if (addParam) {
request.addParameter("format", format);
} else {
switch (format) {
Expand All @@ -1188,8 +1238,75 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
}
request.setOptions(options);

HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
}

Response response = performRequest(request);
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());

// get the content, it could be empty because the request might have not completed
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
String id = response.getHeader("X-Elasticsearch-Async-Id");

if (mode == SYNC) {
assertThat(id, is(emptyOrNullString()));
return initialValue;
}

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
assertNull(response.getHeader("is_running"));
// the content cant be empty
assertThat(initialValue, not(emptyOrNullString()));
return initialValue;
} else {
// async may not return results immediately, so may need an async get
assertThat(id, is(not(emptyOrNullString())));
String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
if ("?0".equals(isRunning)) {
// must have completed immediately so keep_on_completion must be true
assertThat(builder.keepOnCompletion(), is(true));
} else {
// did not return results immediately, so we will need an async get
// Also, different format modes return different results.
switch (format) {
case "txt" -> assertThat(initialValue, emptyOrNullString());
case "csv" -> {
assertEquals(initialValue, "\r\n");
initialValue = "";
}
case "tsv" -> {
assertEquals(initialValue, "\n");
initialValue = "";
}
}
}
// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
if (delimiter != null) {
getRequest.addParameter("delimiter", String.valueOf(delimiter));
}
// If the `format` parameter is not added, the GET request will return a response
// with the `Content-Type` type due to the lack of an `Accept` header.
if (addParam) {
getRequest.addParameter("format", format);
}
// if `addParam` is false, `options` will already have an `Accept` header
getRequest.setOptions(options);
response = performRequest(getRequest);
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
}
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

// assert initial contents, if any, are the same as async get contents
if (initialValue != null && initialValue.isEmpty() == false) {
assertEquals(initialValue, newValue);
}

assertDeletable(id);
return newValue;
}

private static Request prepareRequest(Mode mode) {
Expand Down
Loading

0 comments on commit 67ee034

Please sign in to comment.