From 204d086266f5e130b944a753cad1cb6044b47ecb Mon Sep 17 00:00:00 2001 From: Marios Trivyzas Date: Tue, 11 Feb 2020 14:59:06 +0100 Subject: [PATCH] SQL: Fix issue with timezone when paginating (#52101) Previously, when the specified (or default) fetchSize led to subsequent HTTP requests and the usage of cursors, those subsequent were no longer using the client timezone specified in the initial SQL query. As a consequence, Even though the query is executed once (with the correct timezone) the processing of the query results by the HitExtractors in the next pages was done using the default timezone Z. This could lead to incorrect results. Fix the issue by correctly using the initially specified timezone, which is found in the deserialisation of the cursor string. Fixes: #51258 (cherry picked from commit 8f7afbdeb9295999b48a6c36db5b31cbe0cee432) --- .../xpack/sql/qa/jdbc/FetchSizeTestCase.java | 52 +++++++++++++- .../xpack/sql/qa/rest/RestSqlTestCase.java | 70 ++++++++++++++++++- .../plugin/TransportSqlClearCursorAction.java | 2 +- .../sql/plugin/TransportSqlQueryAction.java | 18 +++-- .../xpack/sql/session/Cursors.java | 8 --- .../execution/search/ScrollCursorTests.java | 3 +- .../xpack/sql/plugin/CursorTests.java | 10 ++- .../xpack/sql/session/ListCursorTests.java | 15 ++-- 8 files changed, 152 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java index 61cd6e93c1831..4d1d55c1b3771 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/FetchSizeTestCase.java @@ -16,7 +16,12 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Properties; +import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts; /** @@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception { assertNoSearchContexts(); } + public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException { + Request request = new Request("PUT", "/test_date_timezone"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("date").field("type", "date").field("format", "epoch_millis"); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test_date_timezone/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + for (long datetime : datetimes) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"date\":").append(datetime).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + ZoneId zoneId = randomZone(); + Properties connectionProperties = connectionProperties(); + connectionProperties.put(JDBC_TIMEZONE, zoneId.toString()); + try (Connection c = esJdbc(connectionProperties); + Statement s = c.createStatement()) { + s.setFetchSize(2); + try (ResultSet rs = + s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) { + for (int i = 0; i < datetimes.length; i++) { + assertEquals(2, rs.getFetchSize()); + assertTrue("No more entries left at " + i, rs.next()); + assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset() + .getTotalSeconds()/ 60, rs.getInt(1)); + } + assertFalse(rs.next()); + } + } + } /** * Test for {@code SELECT} that is implemented as an aggregation. @@ -237,4 +287,4 @@ private void addPivotData() throws Exception { request.setJsonEntity(bulk.toString()); assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index efffb069f2206..f4cc628779c41 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.sql.qa.rest; import com.fasterxml.jackson.core.io.JsonStringEncoder; - import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -15,8 +14,10 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; @@ -31,6 +32,9 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.sql.JDBCType; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -151,6 +155,70 @@ public void testNextPage() throws IOException { ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); } + public void testNextPageWithDatetimeAndTimezoneParam() throws IOException { + Request request = new Request("PUT", "/test_date_timezone"); + XContentBuilder createIndex = JsonXContent.contentBuilder().startObject(); + createIndex.startObject("mappings"); + { + createIndex.startObject("properties"); + { + createIndex.startObject("date").field("type", "date").field("format", "epoch_millis"); + createIndex.endObject(); + } + createIndex.endObject(); + } + createIndex.endObject().endObject(); + request.setJsonEntity(Strings.toString(createIndex)); + client().performRequest(request); + + request = new Request("PUT", "/test_date_timezone/_bulk"); + request.addParameter("refresh", "true"); + StringBuilder bulk = new StringBuilder(); + long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + for (long datetime : datetimes) { + bulk.append("{\"index\":{}}\n"); + bulk.append("{\"date\":").append(datetime).append("}\n"); + } + request.setJsonEntity(bulk.toString()); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + + ZoneId zoneId = randomZone(); + String mode = randomMode(); + String sqlRequest = + "{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\"," + + "\"time_zone\":\"" + zoneId.getId() + "\", " + + "\"mode\":\"" + mode + "\", " + + "\"fetch_size\":2}"; + + String cursor = null; + for (int i = 0; i <= datetimes.length; i += 2) { + Map expected = new HashMap<>(); + Map response; + + if (i == 0) { + expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11))); + response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode); + } else { + response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}", + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode); + } + + List values = new ArrayList<>(2); + for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) { + values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId) + .getOffset().getTotalSeconds() / 60)); + } + expected.put("rows", values); + cursor = (String) response.remove("cursor"); + assertResponse(expected, response); + assertNotNull(cursor); + } + Map expected = new HashMap<>(); + expected.put("rows", emptyList()); + assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}", + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); + } + @AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074") public void testTimeZone() throws IOException { String mode = randomMode(); diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java index 55224edbc8a2a..7bd87e7865827 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlClearCursorAction.java @@ -43,7 +43,7 @@ protected void doExecute(Task task, SqlClearCursorRequest request, ActionListene public static void operation(PlanExecutor planExecutor, SqlClearCursorRequest request, ActionListener listener) { - Cursor cursor = Cursors.decodeFromString(request.getCursor()); + Cursor cursor = Cursors.decodeFromStringWithZone(request.getCursor()).v1(); planExecutor.cleanCursor( new Configuration(DateUtils.UTC, Protocol.FETCH_SIZE, Protocol.REQUEST_TIMEOUT, Protocol.PAGE_TIMEOUT, null, request.mode(), StringUtils.EMPTY, StringUtils.EMPTY, StringUtils.EMPTY, Protocol.FIELD_MULTI_VALUE_LENIENCY, diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java index cc0e8088dc522..3b6493654a2c6 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; @@ -26,12 +27,14 @@ import org.elasticsearch.xpack.sql.proto.ColumnInfo; import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; import org.elasticsearch.xpack.sql.session.Cursor.Page; import org.elasticsearch.xpack.sql.session.Cursors; import org.elasticsearch.xpack.sql.session.RowSet; import org.elasticsearch.xpack.sql.session.SchemaRowSet; import org.elasticsearch.xpack.sql.type.SqlDataTypes; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -68,7 +71,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener listener, + static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener listener, String username, String clusterName) { // The configuration is always created however when dealing with the next page, only the timeouts are relevant // the rest having default values (since the query is already created) @@ -80,13 +83,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, planExecutor.sql(cfg, request.query(), request.params(), wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure)); } else { - planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()), - wrap(p -> listener.onResponse(createResponse(request, null, p)), + Tuple decoded = Cursors.decodeFromStringWithZone(request.cursor()); + planExecutor.nextPage(cfg, decoded.v1(), + wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)), listener::onFailure)); } } - static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { + private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) { RowSet rset = page.rowSet(); if ((rset instanceof SchemaRowSet) == false) { throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass()); @@ -102,10 +106,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p } } columns = unmodifiableList(columns); - return createResponse(request, columns, page); + return createResponse(request, request.zoneId(), columns, page); } - static SqlQueryResponse createResponse(SqlQueryRequest request, List header, Page page) { + private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List header, Page page) { List> rows = new ArrayList<>(); page.rowSet().forEachRow(rowView -> { List row = new ArrayList<>(rowView.columnCount()); @@ -114,7 +118,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List }); return new SqlQueryResponse( - Cursors.encodeToString(page.next(), request.zoneId()), + Cursors.encodeToString(page.next(), zoneId), request.mode(), request.columnar(), header, diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java index a5a3ad82dd178..f4ff89695db0e 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java @@ -83,14 +83,6 @@ static String encodeToString(Cursor info, Version version, ZoneId zoneId) { } } - - /** - * Read a {@linkplain Cursor} from a string. - */ - public static Cursor decodeFromString(String base64) { - return decodeFromStringWithZone(base64).v1(); - } - /** * Read a {@linkplain Cursor} from a string. */ diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java index e45e030054dc8..a0dd09df08664 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor; import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase; import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests; +import org.elasticsearch.xpack.sql.plugin.CursorTests; import org.elasticsearch.xpack.sql.session.Cursors; import java.io.IOException; @@ -68,6 +69,6 @@ protected ScrollCursor copyInstance(ScrollCursor instance, Version version) thro if (randomBoolean()) { return super.copyInstance(instance, version); } - return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone())); + return (ScrollCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone())); } } diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java index dc08c5d552784..640a11dfa31a2 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java @@ -104,15 +104,19 @@ static Cursor randomNonEmptyCursor() { public void testVersionHandling() { Cursor cursor = randomNonEmptyCursor(); - assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone()))); + assertEquals(cursor, decodeFromString(Cursors.encodeToString(cursor, randomZone()))); Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000); String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone()); SqlIllegalArgumentException exception = expectThrows(SqlIllegalArgumentException.class, - () -> Cursors.decodeFromString(encodedWithWrongVersion)); + () -> decodeFromString(encodedWithWrongVersion)); assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT), exception.getMessage()); } -} \ No newline at end of file + + public static Cursor decodeFromString(String base64) { + return Cursors.decodeFromStringWithZone(base64).v1(); + } +} diff --git a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java index 2fd88e3c0bc8c..bf3fc548ef6e7 100644 --- a/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java +++ b/x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java @@ -7,14 +7,16 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.test.AbstractWireTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase; +import org.elasticsearch.xpack.sql.plugin.CursorTests; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class ListCursorTests extends AbstractWireTestCase { +public class ListCursorTests extends AbstractSqlWireSerializingTestCase { public static ListCursor randomPagingListCursor() { int size = between(1, 20); int depth = between(1, 20); @@ -44,6 +46,11 @@ protected ListCursor createTestInstance() { return randomPagingListCursor(); } + @Override + protected Writeable.Reader instanceReader() { + return ListCursor::new; + } + @Override protected ListCursor copyInstance(ListCursor instance, Version version) throws IOException { /* Randomly choose between internal protocol round trip and String based @@ -51,6 +58,6 @@ protected ListCursor copyInstance(ListCursor instance, Version version) throws I if (randomBoolean()) { return copyWriteable(instance, getNamedWriteableRegistry(), ListCursor::new, version); } - return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone())); + return (ListCursor) CursorTests.decodeFromString(Cursors.encodeToString(instance, randomZone())); } -} \ No newline at end of file +}