diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml index c77e114616c78..7aed1cbe0a636 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml @@ -60,8 +60,10 @@ --- "Testing require_data_stream in bulk requests": - skip: - version: " - 8.12.99" - reason: "require_data_stream was introduced in 8.13.0" + version: "all" + reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/104774" + #version: " - 8.12.99" + #reason: "require_data_stream was introduced in 8.13.0" features: allowed_warnings - do: diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index dd71fb62c1106..05f9f1bbb19f0 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -522,9 +522,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec public String getResponseContentTypeString() { return "application/octet-stream"; } - - @Override - public void close() {} }; } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index c08d571eaf6bb..1215d54e9ace1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -614,7 +614,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel channel.sendResponse( RestResponse.chunked(OK, ChunkedRestResponseBody.fromXContent(ignored -> Iterators.single((builder, params) -> { throw new AssertionError("should not be called for HEAD REQUEST"); - }), ToXContent.EMPTY_PARAMS, channel, null)) + }), ToXContent.EMPTY_PARAMS, channel), null) ); } catch (IOException e) { throw new AssertionError(e); diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index f4dbf8115da33..9719716c57ce4 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -95,6 +95,7 @@ public void sendResponse(RestResponse restResponse) { toClose.add(() -> CloseableChannel.closeChannel(httpChannel)); } toClose.add(() -> tracer.stopTrace(request)); + toClose.add(restResponse); boolean success = false; String opaque = null; @@ -113,7 +114,6 @@ public void sendResponse(RestResponse restResponse) { final HttpResponse httpResponse; if (isHeadRequest == false && restResponse.isChunked()) { ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent(); - toClose.add(chunkedContent); if (httpLogger != null && httpLogger.isBodyTracerEnabled()) { final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId()); toClose.add(() -> { @@ -131,8 +131,6 @@ public void sendResponse(RestResponse restResponse) { final BytesReference content = restResponse.content(); if (content instanceof Releasable releasable) { toClose.add(releasable); - } else if (restResponse.isChunked()) { - toClose.add(restResponse.chunkedContent()); } toClose.add(this::releaseOutputBuffer); diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index ae267573b4cab..8f96e30c76c45 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -15,8 +15,6 @@ import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.Streams; @@ -36,7 +34,7 @@ * The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and * instead serialize only as much of the response as can be flushed to the network right away. */ -public interface ChunkedRestResponseBody extends Releasable { +public interface ChunkedRestResponseBody { Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class); @@ -67,15 +65,10 @@ public interface ChunkedRestResponseBody extends Releasable { * @param chunkedToXContent chunked x-content instance to serialize * @param params parameters to use for serialization * @param channel channel the response will be written to - * @param releasable resource to release when the response is fully sent, or {@code null} if nothing to release * @return chunked rest response body */ - static ChunkedRestResponseBody fromXContent( - ChunkedToXContent chunkedToXContent, - ToXContent.Params params, - RestChannel channel, - @Nullable Releasable releasable - ) throws IOException { + static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel) + throws IOException { return new ChunkedRestResponseBody() { @@ -146,11 +139,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec public String getResponseContentTypeString() { return builder.getResponseContentTypeString(); } - - @Override - public void close() { - Releasables.closeExpectNoException(releasable); - } }; } @@ -158,11 +146,7 @@ public void close() { * Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a * consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte. */ - static ChunkedRestResponseBody fromTextChunks( - String contentType, - Iterator> chunkIterator, - @Nullable Releasable releasable - ) { + static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator> chunkIterator) { return new ChunkedRestResponseBody() { private RecyclerBytesStreamOutput currentOutput; private final Writer writer = new OutputStreamWriter(new OutputStream() { @@ -235,11 +219,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec public String getResponseContentTypeString() { return contentType; } - - @Override - public void close() { - Releasables.closeExpectNoException(releasable); - } }; } } diff --git a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java index 00b56d0e05051..0508828c70da1 100644 --- a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java @@ -46,9 +46,4 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec public String getResponseContentTypeString() { return inner.getResponseContentTypeString(); } - - @Override - public void close() { - inner.close(); - } } diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 56a70e3bb2ab4..913ce03b1efc1 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.Streams; import org.elasticsearch.core.TimeValue; @@ -824,10 +826,8 @@ public void sendResponse(RestResponse response) { if (response.isChunked() == false) { methodHandlers.addResponseStats(response.content().length()); } else { - response = RestResponse.chunked( - response.status(), - new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers) - ); + final var wrapped = new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers); + response = RestResponse.chunked(response.status(), wrapped, Releasables.wrap(wrapped, response)); } delegate.sendResponse(response); success = true; @@ -851,7 +851,7 @@ private void close() { } } - private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody { + private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody, Releasable { private final ChunkedRestResponseBody delegate; private final RunOnce onCompletion; @@ -884,7 +884,6 @@ public String getResponseContentTypeString() { @Override public void close() { - delegate.close(); // the client might close the connection before we send the last chunk, in which case we won't have recorded the response in the // stats yet, so we do it now: onCompletion.run(); diff --git a/server/src/main/java/org/elasticsearch/rest/RestResponse.java b/server/src/main/java/org/elasticsearch/rest/RestResponse.java index 49a6a34357a45..a4a44a5a65561 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestResponse.java +++ b/server/src/main/java/org/elasticsearch/rest/RestResponse.java @@ -15,9 +15,10 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -33,7 +34,7 @@ import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; import static org.elasticsearch.rest.RestController.ELASTIC_PRODUCT_HTTP_HEADER; -public final class RestResponse { +public final class RestResponse implements Releasable { public static final String TEXT_CONTENT_TYPE = "text/plain; charset=UTF-8"; @@ -51,6 +52,9 @@ public final class RestResponse { private final String responseMediaType; private Map> customHeaders; + @Nullable + private final Releasable releasable; + /** * Creates a new response based on {@link XContentBuilder}. */ @@ -73,18 +77,18 @@ public RestResponse(RestStatus status, String responseMediaType, String content) } public RestResponse(RestStatus status, String responseMediaType, BytesReference content) { - this(status, responseMediaType, content, null); + this(status, responseMediaType, content, null, null); + } + + private RestResponse(RestStatus status, String responseMediaType, BytesReference content, @Nullable Releasable releasable) { + this(status, responseMediaType, content, null, releasable); } - public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content) { + public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content, @Nullable Releasable releasable) { if (content.isDone()) { - return new RestResponse( - restStatus, - content.getResponseContentTypeString(), - new ReleasableBytesReference(BytesArray.EMPTY, content) - ); + return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY, releasable); } else { - return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content); + return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content, releasable); } } @@ -95,12 +99,14 @@ private RestResponse( RestStatus status, String responseMediaType, @Nullable BytesReference content, - @Nullable ChunkedRestResponseBody chunkedResponseBody + @Nullable ChunkedRestResponseBody chunkedResponseBody, + @Nullable Releasable releasable ) { this.status = status; this.content = content; this.responseMediaType = responseMediaType; this.chunkedResponseBody = chunkedResponseBody; + this.releasable = releasable; assert (content == null) != (chunkedResponseBody == null); } @@ -142,6 +148,7 @@ public RestResponse(RestChannel channel, RestStatus status, Exception e) throws copyHeaders(((ElasticsearchException) e)); } this.chunkedResponseBody = null; + this.releasable = null; } public String contentType() { @@ -224,4 +231,9 @@ public Map> filterHeaders(Map> headers } return headers; } + + @Override + public void close() { + Releasables.closeExpectNoException(releasable); + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java b/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java index 2edb042ea23e8..3798f2b6b6fb1 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java +++ b/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java @@ -40,7 +40,8 @@ protected void processResponse(Response response) throws IOException { channel.sendResponse( RestResponse.chunked( getRestStatus(response), - ChunkedRestResponseBody.fromXContent(response, params, channel, releasableFromResponse(response)) + ChunkedRestResponseBody.fromXContent(response, params, channel), + releasableFromResponse(response) ) ); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java index 2942e59aa1bfd..bc0750f16e0e7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java @@ -117,7 +117,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC @Override public RestResponse buildResponse(NodesHotThreadsResponse response) { response.mustIncRef(); - return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks(), response::decRef)); + return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks()), response::decRef); } }); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java index 8ce001f7a1a77..6845fec4db6fe 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java @@ -77,9 +77,9 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel Iterators.single((builder, params) -> builder.endArray()) ), ToXContent.EMPTY_PARAMS, - channel, - null - ) + channel + ), + null ); } @@ -127,9 +127,9 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann } writer.append("\n"); }) - ), - null - ) + ) + ), + null ); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java b/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java index 4300293a1336e..8be023bb4a182 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java @@ -130,9 +130,9 @@ public RestResponse buildResponse(NodesStatsResponse response) throws Exception ChunkedToXContentHelper.endObject() ), EMPTY_PARAMS, - channel, - null - ) + channel + ), + null ); } }); diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index a22f17702b157..1629dbd001dc2 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -543,12 +543,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec public String getResponseContentTypeString() { return RestResponse.TEXT_CONTENT_TYPE; } - - @Override - public void close() { - assertTrue(isClosed.compareAndSet(false, true)); - } - })); + }, () -> assertTrue(isClosed.compareAndSet(false, true)))); @SuppressWarnings("unchecked") Class> listenerClass = (Class>) (Class) ActionListener.class; ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(listenerClass); @@ -750,12 +745,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec public String getResponseContentTypeString() { return RestResponse.TEXT_CONTENT_TYPE; } - - @Override - public void close() { - assertTrue(isClosed.compareAndSet(false, true)); - } - })) + }, () -> assertTrue(isClosed.compareAndSet(false, true)))) ) ); diff --git a/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java b/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java index 485e2a3a3fdd7..cce2a8db25c8e 100644 --- a/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java +++ b/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; public class ChunkedRestResponseBodyTests extends ESTestCase { @@ -51,59 +50,39 @@ public void testEncodesChunkedXContentCorrectly() throws IOException { } final var bytesDirect = BytesReference.bytes(builderDirect); - final var isClosed = new AtomicBoolean(); - try ( - var chunkedResponse = ChunkedRestResponseBody.fromXContent( - chunkedToXContent, - ToXContent.EMPTY_PARAMS, - new FakeRestChannel( - new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(), - randomBoolean(), - 1 - ), - () -> assertTrue(isClosed.compareAndSet(false, true)) + var chunkedResponse = ChunkedRestResponseBody.fromXContent( + chunkedToXContent, + ToXContent.EMPTY_PARAMS, + new FakeRestChannel( + new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(), + randomBoolean(), + 1 ) - ) { - - final List refsGenerated = new ArrayList<>(); - while (chunkedResponse.isDone() == false) { - refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); - } + ); - assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]))); - assertFalse(isClosed.get()); + final List refsGenerated = new ArrayList<>(); + while (chunkedResponse.isDone() == false) { + refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); } - assertTrue(isClosed.get()); + + assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]))); } public void testFromTextChunks() throws IOException { final var chunks = randomList(1000, () -> randomUnicodeOfLengthBetween(1, 100)); - final var isClosed = new AtomicBoolean(); - try ( - var body = ChunkedRestResponseBody.fromTextChunks( - "text/plain", - Iterators.map(chunks.iterator(), s -> w -> w.write(s)), - () -> assertTrue(isClosed.compareAndSet(false, true)) - ) - ) { - final List refsGenerated = new ArrayList<>(); - while (body.isDone() == false) { - refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); - } - final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])); + var body = ChunkedRestResponseBody.fromTextChunks("text/plain", Iterators.map(chunks.iterator(), s -> w -> w.write(s))); + final List refsGenerated = new ArrayList<>(); + while (body.isDone() == false) { + refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); + } + final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])); - try ( - var outputStream = new ByteArrayOutputStream(); - var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8) - ) { - for (final var chunk : chunks) { - writer.write(chunk); - } - writer.flush(); - assertEquals(new BytesArray(outputStream.toByteArray()), chunkedBytes); + try (var outputStream = new ByteArrayOutputStream(); var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { + for (final var chunk : chunks) { + writer.write(chunk); } - assertFalse(isClosed.get()); + writer.flush(); + assertEquals(new BytesArray(outputStream.toByteArray()), chunkedBytes); } - assertTrue(isClosed.get()); } } diff --git a/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java b/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java index 4125c9bb66b4f..41a54ac580a55 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java @@ -97,7 +97,8 @@ public void testWithHeaders() throws Exception { public void testEmptyChunkedBody() { RestResponse response = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator(), null) + ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()), + null ); assertFalse(response.isChunked()); assertNotNull(response.content()); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec index 770e500024c34..c32c4e3e2fd2c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec @@ -315,6 +315,17 @@ from employees | where birth_date > now() | sort emp_no asc | keep emp_no, birth emp_no:integer | birth_date:date ; +autoBucketYearInAgg +FROM employees +| WHERE hire_date >= "1999-01-01T00:00:00Z" +| EVAL bucket = AUTO_BUCKET(hire_date, 5, "1999-01-01T00:00:00Z", NOW()) +| STATS COUNT(*) by bucket +| sort bucket; + +COUNT(*):long | bucket:date +1 | 1999-01-01T00:00:00.000Z +; + autoBucketMonthInAgg // tag::auto_bucket_in_agg[] @@ -910,7 +921,7 @@ docsAutoBucketLast24hr //tag::docsAutoBucketLast24hr[] FROM sample_data | WHERE @timestamp >= NOW() - 1 day and @timestamp < NOW() -| EVAL bucket = AUTO_BUCKET(@timestamp, 25, DATE_FORMAT(NOW() - 1 day), DATE_FORMAT(NOW())) +| EVAL bucket = AUTO_BUCKET(@timestamp, 25, NOW() - 1 day, NOW()) | STATS COUNT(*) BY bucket //end::docsAutoBucketLast24hr[] ; @@ -922,7 +933,7 @@ docsGettingStartedAutoBucket // tag::gs-auto_bucket[] FROM sample_data | KEEP @timestamp -| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z") +| EVAL bucket = AUTO_BUCKET(@timestamp, 24, "2023-10-23T00:00:00Z", NOW()) // end::gs-auto_bucket[] | LIMIT 0 ; @@ -934,7 +945,7 @@ docsGettingStartedAutoBucketStatsBy // tag::gs-auto_bucket-stats-by[] FROM sample_data | KEEP @timestamp, event_duration -| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z") +| EVAL bucket = AUTO_BUCKET(@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z") | STATS COUNT(*) BY bucket // end::gs-auto_bucket-stats-by[] | SORT bucket @@ -949,7 +960,7 @@ docsGettingStartedAutoBucketStatsByMedian // tag::gs-auto_bucket-stats-by-median[] FROM sample_data | KEEP @timestamp, event_duration -| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z") +| EVAL bucket = AUTO_BUCKET(@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z") | STATS median_duration = MEDIAN(event_duration) BY bucket // end::gs-auto_bucket-stats-by-median[] | SORT bucket diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec index b8d6193887c34..76c83984a13ea 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec @@ -15,7 +15,7 @@ acos |"double acos(n:double|integer|long|unsigned_long)" asin |"double asin(n:double|integer|long|unsigned_long)"|n |"double|integer|long|unsigned_long" | "" |double | "Inverse sine trigonometric function." | false | false | false atan |"double atan(n:double|integer|long|unsigned_long)" |n |"double|integer|long|unsigned_long" | "" |double | "Inverse tangent trigonometric function." | false | false | false atan2 |"double atan2(y:double|integer|long|unsigned_long, x:double|integer|long|unsigned_long)" |[y, x] |["double|integer|long|unsigned_long", "double|integer|long|unsigned_long"] |["", ""] |double | "The angle between the positive x-axis and the ray from the origin to the point (x , y) in the Cartesian plane." | [false, false] | false | false -auto_bucket |"double|date auto_bucket(field:integer|long|double|date, buckets:integer, from:integer|long|double|date, to:integer|long|double|date)" |[field, buckets, from, to] |["integer|long|double|date", "integer", "integer|long|double|date", "integer|long|double|date"] |["", "", "", ""] | "double|date" | "Creates human-friendly buckets and returns a datetime value for each row that corresponds to the resulting bucket the row falls into." | [false, false, false, false] | false | false +auto_bucket |"double|date auto_bucket(field:integer|long|double|date, buckets:integer, from:integer|long|double|date|string, to:integer|long|double|date|string)" |[field, buckets, from, to] |["integer|long|double|date", "integer", "integer|long|double|date|string", "integer|long|double|date|string"] |["", "", "", ""] | "double|date" | "Creates human-friendly buckets and returns a datetime value for each row that corresponds to the resulting bucket the row falls into." | [false, false, false, false] | false | false avg |"double avg(field:double|integer|long|unsigned_long)" |field |"double|integer|long|unsigned_long" | "" |double | "The average of a numeric field." | false | false | true case |"boolean|cartesian_point|date|double|geo_point|integer|ip|keyword|long|text|unsigned_long|version case(condition:boolean, rest...:boolean|cartesian_point|date|double|geo_point|integer|ip|keyword|long|text|unsigned_long|version)" |[condition, rest] |["boolean", "boolean|cartesian_point|date|double|geo_point|integer|ip|keyword|long|text|unsigned_long|version"] |["", ""] |"boolean|cartesian_point|date|double|geo_point|integer|ip|keyword|long|text|unsigned_long|version" | "Accepts pairs of conditions and values. The function returns the value that belongs to the first condition that evaluates to true." | [false, false] | true | false ceil |"double|integer|long|unsigned_long ceil(n:double|integer|long|unsigned_long)" |n |"double|integer|long|unsigned_long" | "" | "double|integer|long|unsigned_long" | "Round a number up to the nearest integer." | false | false | false @@ -111,7 +111,7 @@ synopsis:keyword "double asin(n:double|integer|long|unsigned_long)" "double atan(n:double|integer|long|unsigned_long)" "double atan2(y:double|integer|long|unsigned_long, x:double|integer|long|unsigned_long)" -"double|date auto_bucket(field:integer|long|double|date, buckets:integer, from:integer|long|double|date, to:integer|long|double|date)" +"double|date auto_bucket(field:integer|long|double|date, buckets:integer, from:integer|long|double|date|string, to:integer|long|double|date|string)" "double avg(field:double|integer|long|unsigned_long)" "boolean|cartesian_point|date|double|geo_point|integer|ip|keyword|long|text|unsigned_long|version case(condition:boolean, rest...:boolean|cartesian_point|date|double|geo_point|integer|ip|keyword|long|text|unsigned_long|version)" "double|integer|long|unsigned_long ceil(n:double|integer|long|unsigned_long)" diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java index 7b525642009a7..0022866cf1742 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java @@ -132,16 +132,14 @@ private RestResponse buildResponse(EsqlQueryResponse esqlResponse) throws IOExce if (mediaType instanceof TextFormat format) { restResponse = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromTextChunks( - format.contentType(restRequest), - format.format(restRequest, esqlResponse), - releasable - ) + ChunkedRestResponseBody.fromTextChunks(format.contentType(restRequest), format.format(restRequest, esqlResponse)), + releasable ); } else { restResponse = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromXContent(esqlResponse, channel.request(), channel, releasable) + ChunkedRestResponseBody.fromXContent(esqlResponse, channel.request(), channel), + releasable ); } long tookNanos = stopWatch.stop().getNanos(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucket.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucket.java index 33e0addf44d2f..6a8b3f41a9c65 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucket.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucket.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.esql.expression.function.scalar.math; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.DateFieldMapper; @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul; import org.elasticsearch.xpack.ql.expression.Expression; +import org.elasticsearch.xpack.ql.expression.Foldables; import org.elasticsearch.xpack.ql.expression.Literal; import org.elasticsearch.xpack.ql.expression.TypeResolutions; import org.elasticsearch.xpack.ql.expression.function.scalar.ScalarFunction; @@ -40,7 +41,6 @@ import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isFoldable; import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isInteger; import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isNumeric; -import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isString; import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isType; /** @@ -90,8 +90,8 @@ public AutoBucket( Source source, @Param(name = "field", type = { "integer", "long", "double", "date" }) Expression field, @Param(name = "buckets", type = { "integer" }) Expression buckets, - @Param(name = "from", type = { "integer", "long", "double", "date" }) Expression from, - @Param(name = "to", type = { "integer", "long", "double", "date" }) Expression to + @Param(name = "from", type = { "integer", "long", "double", "date", "string" }) Expression from, + @Param(name = "to", type = { "integer", "long", "double", "date", "string" }) Expression to ) { super(source, List.of(field, buckets, from, to)); this.field = field; @@ -115,8 +115,8 @@ public ExpressionEvaluator.Factory toEvaluator(Function isString(e, sourceText(), o)); + return resolveType((e, o) -> isStringOrDate(e, sourceText(), o)); } if (field.dataType().isNumeric()) { return resolveType((e, o) -> isNumeric(e, sourceText(), o)); @@ -216,6 +216,24 @@ private TypeResolution resolveType(BiFunction DataTypes.isString(exp) || DataTypes.isDateTime(exp), + operationName, + paramOrd, + "datetime", + "string" + ); + } + + private long foldToLong(Expression e) { + Object value = Foldables.valueOf(e); + return DataTypes.isDateTime(e.dataType()) + ? ((Number) value).longValue() + : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(BytesRefs.toString(value)); + } + @Override public DataType dataType() { if (field.dataType().isNumeric()) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucketTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucketTests.java index 043bf083b580a..013753c801c39 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucketTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AutoBucketTests.java @@ -84,8 +84,8 @@ private Expression build(Source source, Expression arg) { Literal from; Literal to; if (arg.dataType() == DataTypes.DATETIME) { - from = new Literal(Source.EMPTY, new BytesRef("2023-02-01T00:00:00.00Z"), DataTypes.KEYWORD); - to = new Literal(Source.EMPTY, new BytesRef("2023-03-01T00:00:00.00Z"), DataTypes.KEYWORD); + from = stringOrDateTime("2023-02-01T00:00:00.00Z"); + to = stringOrDateTime("2023-03-01T09:00:00.00Z"); } else { from = new Literal(Source.EMPTY, 0, DataTypes.DOUBLE); to = new Literal(Source.EMPTY, 1000, DataTypes.DOUBLE); @@ -93,6 +93,13 @@ private Expression build(Source source, Expression arg) { return new AutoBucket(source, arg, new Literal(Source.EMPTY, 50, DataTypes.INTEGER), from, to); } + private Literal stringOrDateTime(String date) { + if (randomBoolean()) { + return new Literal(Source.EMPTY, new BytesRef(date), randomBoolean() ? DataTypes.KEYWORD : DataTypes.TEXT); + } + return new Literal(Source.EMPTY, DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis(date), DataTypes.DATETIME); + } + @Override protected DataType expectedType(List argTypes) { if (argTypes.get(0).isNumeric()) {