From 31f8624fc541d9c7220bb0c5dc24f55f087dd145 Mon Sep 17 00:00:00 2001 From: Abbas Hussain <46726078+abbashus@users.noreply.github.com> Date: Mon, 13 Jan 2020 13:36:13 -0800 Subject: [PATCH] Fix JDBC response for delete query (#337) * Fix JDBC response for Delete queries * Add integration tests * Add unit tests --- .../sql/executor/format/DeleteResultSet.java | 51 +++++++++++ .../sql/executor/format/Protocol.java | 5 +- .../sql/executor/format/Schema.java | 4 + .../sql/esintgtest/DeleteIT.java | 89 +++++++++++++++---- .../executor/DeleteResultSetTest.java | 89 +++++++++++++++++++ 5 files changed, 220 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/DeleteResultSet.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/executor/DeleteResultSetTest.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/DeleteResultSet.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/DeleteResultSet.java new file mode 100644 index 0000000000..a40c57edfd --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/DeleteResultSet.java @@ -0,0 +1,51 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.executor.format; + +import com.amazon.opendistroforelasticsearch.sql.domain.Delete; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.reindex.BulkByScrollResponse; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class DeleteResultSet extends ResultSet { + private Delete query; + private Object queryResult; + + public static final String DELETED = "deleted_rows"; + + public DeleteResultSet(Client client, Delete query, Object queryResult) { + this.client = client; + this.query = query; + this.queryResult = queryResult; + this.schema = new Schema(loadColumns()); + this.dataRows = new DataRows(loadRows()); + } + + private List loadColumns() { + return Collections.singletonList(new Schema.Column(DELETED, null, Schema.Type.LONG)); + } + + private List loadRows() { + return Collections.singletonList(new DataRows.Row(loadDeletedData())); + } + + private Map loadDeletedData(){ + return Collections.singletonMap(DELETED, ((BulkByScrollResponse) queryResult).getDeleted()); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java index 6b1376c0a4..dace008c2a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Protocol.java @@ -15,6 +15,7 @@ package com.amazon.opendistroforelasticsearch.sql.executor.format; +import com.amazon.opendistroforelasticsearch.sql.domain.Delete; import com.amazon.opendistroforelasticsearch.sql.domain.IndexStatement; import com.amazon.opendistroforelasticsearch.sql.domain.Query; import com.amazon.opendistroforelasticsearch.sql.domain.QueryStatement; @@ -57,7 +58,9 @@ public Protocol(Exception e) { } private ResultSet loadResultSet(Client client, QueryStatement queryStatement, Object queryResult) { - if (queryStatement instanceof Query) { + if (queryStatement instanceof Delete) { + return new DeleteResultSet(client, (Delete) queryStatement, queryResult); + } else if (queryStatement instanceof Query) { return new SelectResultSet(client, (Query) queryStatement, queryResult); } else if (queryStatement instanceof IndexStatement) { IndexStatement statement = (IndexStatement) queryStatement; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java index 94552fe12a..2ac042fa3a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/sql/executor/format/Schema.java @@ -46,6 +46,10 @@ public Schema(IndexStatement statement, List columns) { this.columns = columns; } + public Schema(List columns){ + this.columns = columns; + } + public String getIndexName() { return indexName; } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/DeleteIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/DeleteIT.java index cf8478be8f..fc327a070b 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/DeleteIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/esintgtest/DeleteIT.java @@ -15,11 +15,11 @@ package com.amazon.opendistroforelasticsearch.sql.esintgtest; +import com.amazon.opendistroforelasticsearch.sql.utils.StringUtils; import org.json.JSONObject; import org.junit.Test; import java.io.IOException; -import java.util.Locale; import static org.hamcrest.core.IsEqual.equalTo; @@ -33,18 +33,11 @@ protected void init() throws Exception { @Test public void deleteAllTest() throws IOException, InterruptedException { - String selectQuery = String.format( - Locale.ROOT, - "SELECT * FROM %s/account", - TestsConstants.TEST_INDEX_ACCOUNT - ); + String selectQuery = StringUtils.format("SELECT * FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT); JSONObject response = executeRequest(makeRequest(selectQuery)); int totalHits = getTotalHits(response); - String deleteQuery = String.format( - Locale.ROOT, - "DELETE FROM %s/account", - TestsConstants.TEST_INDEX_ACCOUNT); + String deleteQuery = StringUtils.format("DELETE FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT); response = executeRequest(makeRequest(deleteQuery)); assertThat(response.getInt("deleted"), equalTo(totalHits)); @@ -58,16 +51,14 @@ public void deleteAllTest() throws IOException, InterruptedException { @Test public void deleteWithConditionTest() throws IOException, InterruptedException { - String selectQuery = String.format( - Locale.ROOT, + String selectQuery = StringUtils.format( "SELECT * FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')", TestsConstants.TEST_INDEX_PHRASE ); JSONObject response = executeRequest(makeRequest(selectQuery)); int totalHits = getTotalHits(response); - String deleteQuery = String.format( - Locale.ROOT, + String deleteQuery = StringUtils.format( "DELETE FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')", TestsConstants.TEST_INDEX_PHRASE ); @@ -77,12 +68,76 @@ public void deleteWithConditionTest() throws IOException, InterruptedException { // To prevent flakiness, the minimum value of 2000 msec works fine. Thread.sleep(2000); - selectQuery = String.format( - Locale.ROOT, - "SELECT * FROM %s/phrase", + selectQuery = StringUtils.format("SELECT * FROM %s/phrase", TestsConstants.TEST_INDEX_PHRASE); + + response = executeRequest(makeRequest(selectQuery)); + assertThat(getTotalHits(response), equalTo(5)); + } + + @Test + public void deleteAllWithJdbcFormat() throws IOException, InterruptedException { + String selectQuery = StringUtils.format("SELECT * FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT); + JSONObject response = executeRequest(makeRequest(selectQuery)); + int totalHits = getTotalHits(response); + + String deleteQuery = StringUtils.format("DELETE FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT); + + response = new JSONObject(executeQuery(deleteQuery, "jdbc")); + System.out.println(response); + assertThat(response.query("/schema/0/name"), equalTo("deleted_rows")); + assertThat(response.query("/schema/0/type"), equalTo("long")); + assertThat(response.query("/datarows/0/0"), equalTo(totalHits)); + assertThat(response.query("/total"), equalTo(1)); + assertThat(response.query("/status"), equalTo(200)); + assertThat(response.query("/size"), equalTo(1)); + + // The documents are not deleted immediately, causing the next search call to return all results. + // To prevent flakiness, the minimum value of 2000 msec works fine. + Thread.sleep(2000); + + response = executeRequest(makeRequest(selectQuery)); + assertThat(getTotalHits(response), equalTo(0)); + + // Multiple invocation of delete query should return deleted == 0 + response = new JSONObject(executeQuery(deleteQuery, "jdbc")); + assertThat(response.query("/datarows/0/0"), equalTo(0)); + } + + @Test + public void deleteWithConditionTestJdbcFormat() throws IOException, InterruptedException { + String selectQuery = StringUtils.format( + "SELECT * FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')", TestsConstants.TEST_INDEX_PHRASE ); + + JSONObject response = executeRequest(makeRequest(selectQuery)); + int totalHits = getTotalHits(response); + + String deleteQuery = StringUtils.format( + "DELETE FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')", + TestsConstants.TEST_INDEX_PHRASE + ); + + response = new JSONObject(executeQuery(deleteQuery, "jdbc")); + System.out.println(response); + assertThat(response.query("/schema/0/name"), equalTo("deleted_rows")); + assertThat(response.query("/schema/0/type"), equalTo("long")); + assertThat(response.query("/datarows/0/0"), equalTo(totalHits)); + assertThat(response.query("/total"), equalTo(1)); + assertThat(response.query("/status"), equalTo(200)); + assertThat(response.query("/size"), equalTo(1)); + + // The documents are not deleted immediately, causing the next search call to return all results. + // To prevent flakiness, the minimum value of 2000 msec works fine. + Thread.sleep(2000); + + selectQuery = StringUtils.format("SELECT * FROM %s/phrase", TestsConstants.TEST_INDEX_PHRASE); + response = executeRequest(makeRequest(selectQuery)); assertThat(getTotalHits(response), equalTo(5)); + + // Multiple invocation of delete query should return deleted == 0 + response = new JSONObject(executeQuery(deleteQuery, "jdbc")); + assertThat(response.query("/datarows/0/0"), equalTo(0)); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/executor/DeleteResultSetTest.java b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/executor/DeleteResultSetTest.java new file mode 100644 index 0000000000..d23f8b04c8 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/sql/unittest/executor/DeleteResultSetTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.sql.unittest.executor; + +import com.amazon.opendistroforelasticsearch.sql.domain.Delete; +import com.amazon.opendistroforelasticsearch.sql.executor.format.DataRows; +import com.amazon.opendistroforelasticsearch.sql.executor.format.DeleteResultSet; +import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.client.node.NodeClient; + +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.BulkByScrollResponse; + +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + + +public class DeleteResultSetTest { + + @Mock + NodeClient client; + + @Mock + Delete deleteQuery; + + @Test + public void testDeleteResponseForJdbcFormat() throws IOException { + + String jsonDeleteResponse = "{\n" + + " \"took\" : 73,\n" + + " \"timed_out\" : false,\n" + + " \"total\" : 1,\n" + + " \"updated\" : 0,\n" + + " \"created\" : 0,\n" + + " \"deleted\" : 10,\n" + + " \"batches\" : 1,\n" + + " \"version_conflicts\" : 0,\n" + + " \"noops\" : 0,\n" + + " \"retries\" : {\n" + + " \"bulk\" : 0,\n" + + " \"search\" : 0\n" + + " },\n" + + " \"throttled_millis\" : 0,\n" + + " \"requests_per_second\" : -1.0,\n" + + " \"throttled_until_millis\" : 0,\n" + + " \"failures\" : [ ]\n" + + "}\n"; + + XContentType xContentType = XContentType.JSON; + XContentParser parser = xContentType.xContent().createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + jsonDeleteResponse + ); + + BulkByScrollResponse deleteResponse = BulkByScrollResponse.fromXContent(parser); + DeleteResultSet deleteResultSet = new DeleteResultSet(client, deleteQuery, deleteResponse); + Schema schema = deleteResultSet.getSchema(); + DataRows dataRows = deleteResultSet.getDataRows(); + + assertThat(schema.getHeaders().size(), equalTo(1)); + assertThat(dataRows.getSize(), equalTo(1L)); + assertThat(dataRows.iterator().next().getData(DeleteResultSet.DELETED), equalTo(10L)); + } + +}