Skip to content

Commit

Permalink
SQL: Fix issue with timezone when paginating
Browse files Browse the repository at this point in the history
Previously, when the specified (or default) fetchSize led to
subsequent HTTP requests and the usage of cursors, those subsequent
were no longer using the client timezone specified in the initial
SQL query. As a consequence, Even though the query is executed once
(with the correct timezone) the processing of the query results by
the HitExtractors in the next pages was done using the default
timezone `Z`. This could lead to incorrect results.

Fix the issue by correctly using the initially specified timezone,
which is found in the deserialisation of the cursor string.

Fixes: elastic#51258
  • Loading branch information
matriv committed Feb 8, 2020
1 parent 749b623 commit a488512
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Properties;

import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_TIMEZONE;
import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.assertNoSearchContexts;

/**
Expand Down Expand Up @@ -102,6 +107,51 @@ public void testIncompleteScroll() throws Exception {
assertNoSearchContexts();
}

public void testScrollWithDatetimeAndTimezoneParam() throws IOException, SQLException {
Request request = new Request("PUT", "/test_date_timezone");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("properties");
{
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test_date_timezone/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
for (long datetime : datetimes) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"date\":").append(datetime).append("}\n");
}
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

ZoneId zoneId = randomZone();
Properties connectionProperties = connectionProperties();
connectionProperties.put(JDBC_TIMEZONE, zoneId.toString());
try (Connection c = esJdbc(connectionProperties);
Statement s = c.createStatement()) {
s.setFetchSize(2);
try (ResultSet rs =
s.executeQuery("SELECT DATE_PART('TZOFFSET', date) FROM test_date_timezone ORDER BY date")) {
for (int i = 0; i < datetimes.length; i++) {
assertEquals(2, rs.getFetchSize());
assertTrue("No more entries left at " + i, rs.next());
assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i]), zoneId).getOffset()
.getTotalSeconds()/ 60, rs.getInt(1));
}
assertFalse(rs.next());
}
}
}

/**
* Test for {@code SELECT} that is implemented as an aggregation.
Expand Down Expand Up @@ -237,4 +287,4 @@ private void addPivotData() throws Exception {
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.sql.qa.rest;

import com.fasterxml.jackson.core.io.JsonStringEncoder;

import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
Expand All @@ -15,9 +14,11 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.NotEqualMessageBuilder;
Expand All @@ -31,6 +32,9 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.sql.JDBCType;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -151,6 +155,74 @@ public void testNextPage() throws IOException {
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));
}

public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
Request request = new Request("PUT", "/test_date_timezone");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("properties");
{
createIndex.startObject("date").field("type", "date").field("format", "epoch_millis");
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test_date_timezone/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
long[] datetimes = new long[] { 1_000, 10_000, 100_000, 1_000_000, 10_000_000 };
for (long datetime : datetimes) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"date\":").append(datetime).append("}\n");
}
request.setJsonEntity(bulk.toString());
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());

ZoneId zoneId = randomZone();
String mode = randomMode();
String sqlRequest =
"{\"query\":\"SELECT DATE_PART('TZOFFSET', date) AS tz FROM test_date_timezone ORDER BY date\","
+ "\"time_zone\":\"" + zoneId.getId() + "\", "
+ "\"mode\":\"" + mode + "\", "
+ "\"fetch_size\":2}";

String cursor = null;
for (int i = 0; i <= datetimes.length; i += 2) {
Map<String, Object> response;
if (i == 0) {
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
} else {
response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode);
}

Map<String, Object> expected = new HashMap<>();
if (i == 0) {
expected.put("columns", singletonList(
columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11)));
}

List<Object> values = new ArrayList<>(2);
for (int j = 0; j < (i < datetimes.length - 1 ? 2 : 1); j++) {
values.add(singletonList(ZonedDateTime.ofInstant(Instant.ofEpochMilli(datetimes[i + j]), zoneId)
.getOffset().getTotalSeconds() / 60));
}
expected.put("rows", values);
cursor = (String) response.remove("cursor");
assertResponse(expected, response);
assertNotNull(cursor);
}
Map<String, Object> expected = new HashMap<>();
expected.put("rows", emptyList());
assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + "}",
ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode));

}

@AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074")
public void testTimeZone() throws IOException {
String mode = randomMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
Expand All @@ -26,12 +27,14 @@
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
import org.elasticsearch.xpack.sql.proto.Mode;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.elasticsearch.xpack.sql.session.Cursor.Page;
import org.elasticsearch.xpack.sql.session.Cursors;
import org.elasticsearch.xpack.sql.session.RowSet;
import org.elasticsearch.xpack.sql.session.SchemaRowSet;
import org.elasticsearch.xpack.sql.type.SqlDataTypes;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -68,7 +71,7 @@ protected void doExecute(Task task, SqlQueryRequest request, ActionListener<SqlQ
/**
* Actual implementation of the action. Statically available to support embedded mode.
*/
public static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
private static void operation(PlanExecutor planExecutor, SqlQueryRequest request, ActionListener<SqlQueryResponse> listener,
String username, String clusterName) {
// The configuration is always created however when dealing with the next page, only the timeouts are relevant
// the rest having default values (since the query is already created)
Expand All @@ -80,13 +83,14 @@ public static void operation(PlanExecutor planExecutor, SqlQueryRequest request,
planExecutor.sql(cfg, request.query(), request.params(),
wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
} else {
planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
wrap(p -> listener.onResponse(createResponse(request, null, p)),
Tuple<Cursor, ZoneId> decoded = Cursors.decodeFromStringWithZone(request.cursor());
planExecutor.nextPage(cfg, decoded.v1(),
wrap(p -> listener.onResponse(createResponse(request, decoded.v2(), null, p)),
listener::onFailure));
}
}

static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
private static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page page) {
RowSet rset = page.rowSet();
if ((rset instanceof SchemaRowSet) == false) {
throw new SqlIllegalArgumentException("No schema found inside {}", rset.getClass());
Expand All @@ -102,10 +106,10 @@ static SqlQueryResponse createResponseWithSchema(SqlQueryRequest request, Page p
}
}
columns = unmodifiableList(columns);
return createResponse(request, columns, page);
return createResponse(request, request.zoneId(), columns, page);
}

static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
private static SqlQueryResponse createResponse(SqlQueryRequest request, ZoneId zoneId, List<ColumnInfo> header, Page page) {
List<List<Object>> rows = new ArrayList<>();
page.rowSet().forEachRow(rowView -> {
List<Object> row = new ArrayList<>(rowView.columnCount());
Expand All @@ -114,7 +118,7 @@ static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo>
});

return new SqlQueryResponse(
Cursors.encodeToString(page.next(), request.zoneId()),
Cursors.encodeToString(page.next(), zoneId),
request.mode(),
request.columnar(),
header,
Expand Down

0 comments on commit a488512

Please sign in to comment.