diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java index 61cd6e93c1831..4d1d55c1b3771 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java @@ -16,7 +16,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Properties; +import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts; /** @@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception { assertNoSearchContexts(); } + public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException { + Request request = new Request("PUT", "/test_date_timezone"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("date").field("type", "date").field("format", "epoch_millis"); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test_date_timezone/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + for (long datetime : datetimes) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"date\":").append(datetime).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + ZoneId zoneId = randomZone(); + Properties connectionProperties = connectionProperties(); + connectionProperties.put(JDBC_TIMEZONE, zoneId.toString()); + try (Connection c = esJdbc(connectionProperties); + Statement s = c.createStatement()) { + s.setFetchSize(2); + try (ResultSet rs = + s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) { + for (int i = 0; i < datetimes.length; i++) { + assertEquals(2, rs.getFetchSize()); + assertTrue("No more entries left at " + i, rs.next()); + assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset() + .getTotalSeconds()/ 60, rs.getInt(1)); + } + assertFalse(rs.next()); + } + } + } /** * Test for {@code SELECT} that is implemented as an aggregation. @@ -237,4 +287,4 @@ private void addPivotData() throws Exception { request.setJsonEntity(bulk.toString()); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index 46c5bd9d406c8..c8c38ee1adcae 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.qa.rest; import com.fasterxml.jackson.core.io.JsonStringEncoder; - import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -15,8 +14,10 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; @@ -31,6 +32,9 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.sql.JDBCType; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -151,6 +155,70 @@ public void testNextPage() throws IOException { ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); } + public void testNextPageWithDatetimeAndTimezoneParam() throws IOException { + Request request = new Request("PUT", "/test_date_timezone"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("date").field("type", "date").field("format", "epoch_millis"); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test_date_timezone/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + for (long datetime : datetimes) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"date\":").append(datetime).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + ZoneId zoneId = randomZone(); + String mode = randomMode(); + String sqlRequest = + "{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\"," + + "\"time_zone\":\"" + zoneId.getId() + "\", " + + "\"mode\":\"" + mode + "\", " + + "\"fetch_size\":2}"; + + String cursor = null; + for (int i = 0; i <= datetimes.length; i += 2) { + Map expected = new HashMap<>(); + Map response; + + if (i == 0) { + expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11))); + response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode); + } else { + response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}", + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode); + } + + List values = new ArrayList<>(2); + for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) { + values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId) + .getOffset().getTotalSeconds() / 60)); + } + expected.put("rows", values); + cursor = (String) response.remove("cursor"); + assertResponse(expected, response); + assertNotNull(cursor); + } + Map expected = new HashMap<>(); + expected.put("rows", emptyList()); + assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}", + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); + } + @AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074") public void testTimeZone() throws IOException { String mode = randomMode(); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java index 6d3802c341f1d..b6ffb45a36198 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java @@ -43,7 +43,7 @@ protected void doExecute(Task task, SqlClearCursorRequest request, ActionListene public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request, ActionListener listener) { - Cursor cursor = Cursors.decodeFromString(request.getCursor()); + Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1(); planExecutor.cleanCursor( new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null, request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY, diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java index 97da20902a0d3..52bb1f297cb2c 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; @@ -25,12 +26,14 @@ import org.elasticsearch.xpack.sql.proto.ColumnInfo; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Cursors; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.Schema; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -67,7 +70,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener listener, + static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener listener, String username, String clusterName) { // The configuration is always created however when dealing with the next page, only the timeouts are relevant // the rest having default values (since the query is already created) @@ -79,13 +82,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, planExecutor.sql(cfg, request.query(), request.params(), wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure)); } else { - planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()), - wrap(p -> listener.onResponse(createResponse(request, null, p)), + Tuple decoded = Cursors.decodeFromStringWithZone(request.cursor()); + planExecutor.nextPage(cfg, decoded.v1(), + wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)), listener::onFailure)); } } - static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { + private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { RowSet rset = page.rowSet(); if ((rset instanceof SchemaRowSet) == false) { throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass()); @@ -101,10 +105,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p } } columns = unmodifiableList(columns); - return createResponse(request, columns, page); + return createResponse(request, request.zoneId(), columns, page); } - static SqlQueryResponse createResponse(SqlQueryRequest request, List header, Page page) { + private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List header, Page page) { List> rows = new ArrayList<>(); page.rowSet().forEachRow(rowView -> { List row = new ArrayList<>(rowView.columnCount()); @@ -113,7 +117,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List }); return new SqlQueryResponse( - Cursors.encodeToString(page.next(), request.zoneId()), + Cursors.encodeToString(page.next(), zoneId), request.mode(), request.columnar(), header, diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java index 6f1ee47f4da34..3282f0bb99641 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java @@ -83,14 +83,6 @@ static String encodeToString(Cursor info, Version version, ZoneId zoneId) { } } - - /** - * Read a {@linkplain Cursor} from a string. - */ - public static Cursor decodeFromString(String base64) { - return decodeFromStringWithZone(base64).v1(); - } - /** * Read a {@linkplain Cursor} from a string. */ diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java index ca135d5170fd4..26d9679157eac 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests; import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests; import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor; +import org.elasticsearch.xpack.sql.plugin.CursorTests; import org.elasticsearch.xpack.sql.session.Cursors; import java.io.IOException; @@ -68,6 +69,6 @@ protected ScrollCursor copyInstance(ScrollCursor instance, Version version) thro if (randomBoolean()) { return super.copyInstance(instance, version); } - return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone())); + return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone())); } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java index 28d9c278ed163..0523af135686e 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java @@ -104,14 +104,18 @@ static Cursor randomNonEmptyCursor() { public void testVersionHandling() { Cursor cursor = randomNonEmptyCursor(); - assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone()))); + assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone()))); Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000); String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone()); - SqlException exception = expectThrows(SqlException.class, () -> Cursors.decodeFromString(encodedWithWrongVersion)); + SqlException exception = expectThrows(SqlException.class, () -> decodeFromString(encodedWithWrongVersion)); assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT), exception.getMessage()); } -} \ No newline at end of file + + public static Cursor decodeFromString(String base64) { + return Cursors.decodeFromStringWithZone(base64).v1(); + } +} diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java index 2fd88e3c0bc8c..bf3fc548ef6e7 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java @@ -7,14 +7,16 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.test.AbstractWireTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase; +import org.elasticsearch.xpack.sql.plugin.CursorTests; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class ListCursorTests extends AbstractWireTestCase { +public class ListCursorTests extends AbstractSqlWireSerializingTestCase { public static ListCursor randomPagingListCursor() { int size = between(1, 20); int depth = between(1, 20); @@ -44,6 +46,11 @@ protected ListCursor createTestInstance() { return randomPagingListCursor(); } + @Override + protected Writeable.Reader instanceReader() { + return ListCursor::new; + } + @Override protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException { /* Randomly choose between internal protocol round trip and String based @@ -51,6 +58,6 @@ protected ListCursor copyInstance(ListCursor instance, Version version) throws I if (randomBoolean()) { return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version); } - return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone())); + return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone())); } -} \ No newline at end of file +}