Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fix JDBC response for delete query (#337)
Browse files Browse the repository at this point in the history
* Fix JDBC response for Delete queries
* Add integration tests
* Add unit tests
  • Loading branch information
abbashus authored Jan 13, 2020
1 parent 4b729a1 commit 31f8624
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import com.amazon.opendistroforelasticsearch.sql.domain.Delete;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.reindex.BulkByScrollResponse;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class DeleteResultSet extends ResultSet {
private Delete query;
private Object queryResult;

public static final String DELETED = "deleted_rows";

public DeleteResultSet(Client client, Delete query, Object queryResult) {
this.client = client;
this.query = query;
this.queryResult = queryResult;
this.schema = new Schema(loadColumns());
this.dataRows = new DataRows(loadRows());
}

private List<Schema.Column> loadColumns() {
return Collections.singletonList(new Schema.Column(DELETED, null, Schema.Type.LONG));
}

private List<DataRows.Row> loadRows() {
return Collections.singletonList(new DataRows.Row(loadDeletedData()));
}

private Map<String, Object> loadDeletedData(){
return Collections.singletonMap(DELETED, ((BulkByScrollResponse) queryResult).getDeleted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import com.amazon.opendistroforelasticsearch.sql.domain.Delete;
import com.amazon.opendistroforelasticsearch.sql.domain.IndexStatement;
import com.amazon.opendistroforelasticsearch.sql.domain.Query;
import com.amazon.opendistroforelasticsearch.sql.domain.QueryStatement;
Expand Down Expand Up @@ -57,7 +58,9 @@ public Protocol(Exception e) {
}

private ResultSet loadResultSet(Client client, QueryStatement queryStatement, Object queryResult) {
if (queryStatement instanceof Query) {
if (queryStatement instanceof Delete) {
return new DeleteResultSet(client, (Delete) queryStatement, queryResult);
} else if (queryStatement instanceof Query) {
return new SelectResultSet(client, (Query) queryStatement, queryResult);
} else if (queryStatement instanceof IndexStatement) {
IndexStatement statement = (IndexStatement) queryStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public Schema(IndexStatement statement, List<Column> columns) {
this.columns = columns;
}

public Schema(List<Column> columns){
this.columns = columns;
}

public String getIndexName() {
return indexName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

package com.amazon.opendistroforelasticsearch.sql.esintgtest;

import com.amazon.opendistroforelasticsearch.sql.utils.StringUtils;
import org.json.JSONObject;
import org.junit.Test;

import java.io.IOException;
import java.util.Locale;

import static org.hamcrest.core.IsEqual.equalTo;

Expand All @@ -33,18 +33,11 @@ protected void init() throws Exception {

@Test
public void deleteAllTest() throws IOException, InterruptedException {
String selectQuery = String.format(
Locale.ROOT,
"SELECT * FROM %s/account",
TestsConstants.TEST_INDEX_ACCOUNT
);
String selectQuery = StringUtils.format("SELECT * FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);
JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = String.format(
Locale.ROOT,
"DELETE FROM %s/account",
TestsConstants.TEST_INDEX_ACCOUNT);
String deleteQuery = StringUtils.format("DELETE FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);
response = executeRequest(makeRequest(deleteQuery));
assertThat(response.getInt("deleted"), equalTo(totalHits));

Expand All @@ -58,16 +51,14 @@ public void deleteAllTest() throws IOException, InterruptedException {

@Test
public void deleteWithConditionTest() throws IOException, InterruptedException {
String selectQuery = String.format(
Locale.ROOT,
String selectQuery = StringUtils.format(
"SELECT * FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);
JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = String.format(
Locale.ROOT,
String deleteQuery = StringUtils.format(
"DELETE FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);
Expand All @@ -77,12 +68,76 @@ public void deleteWithConditionTest() throws IOException, InterruptedException {
// To prevent flakiness, the minimum value of 2000 msec works fine.
Thread.sleep(2000);

selectQuery = String.format(
Locale.ROOT,
"SELECT * FROM %s/phrase",
selectQuery = StringUtils.format("SELECT * FROM %s/phrase", TestsConstants.TEST_INDEX_PHRASE);

response = executeRequest(makeRequest(selectQuery));
assertThat(getTotalHits(response), equalTo(5));
}

@Test
public void deleteAllWithJdbcFormat() throws IOException, InterruptedException {
String selectQuery = StringUtils.format("SELECT * FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);
JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = StringUtils.format("DELETE FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);

response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
System.out.println(response);
assertThat(response.query("/schema/0/name"), equalTo("deleted_rows"));
assertThat(response.query("/schema/0/type"), equalTo("long"));
assertThat(response.query("/datarows/0/0"), equalTo(totalHits));
assertThat(response.query("/total"), equalTo(1));
assertThat(response.query("/status"), equalTo(200));
assertThat(response.query("/size"), equalTo(1));

// The documents are not deleted immediately, causing the next search call to return all results.
// To prevent flakiness, the minimum value of 2000 msec works fine.
Thread.sleep(2000);

response = executeRequest(makeRequest(selectQuery));
assertThat(getTotalHits(response), equalTo(0));

// Multiple invocation of delete query should return deleted == 0
response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
assertThat(response.query("/datarows/0/0"), equalTo(0));
}

@Test
public void deleteWithConditionTestJdbcFormat() throws IOException, InterruptedException {
String selectQuery = StringUtils.format(
"SELECT * FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);

JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = StringUtils.format(
"DELETE FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);

response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
System.out.println(response);
assertThat(response.query("/schema/0/name"), equalTo("deleted_rows"));
assertThat(response.query("/schema/0/type"), equalTo("long"));
assertThat(response.query("/datarows/0/0"), equalTo(totalHits));
assertThat(response.query("/total"), equalTo(1));
assertThat(response.query("/status"), equalTo(200));
assertThat(response.query("/size"), equalTo(1));

// The documents are not deleted immediately, causing the next search call to return all results.
// To prevent flakiness, the minimum value of 2000 msec works fine.
Thread.sleep(2000);

selectQuery = StringUtils.format("SELECT * FROM %s/phrase", TestsConstants.TEST_INDEX_PHRASE);

response = executeRequest(makeRequest(selectQuery));
assertThat(getTotalHits(response), equalTo(5));

// Multiple invocation of delete query should return deleted == 0
response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
assertThat(response.query("/datarows/0/0"), equalTo(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.unittest.executor;

import com.amazon.opendistroforelasticsearch.sql.domain.Delete;
import com.amazon.opendistroforelasticsearch.sql.executor.format.DataRows;
import com.amazon.opendistroforelasticsearch.sql.executor.format.DeleteResultSet;
import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.client.node.NodeClient;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;

import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;


public class DeleteResultSetTest {

@Mock
NodeClient client;

@Mock
Delete deleteQuery;

@Test
public void testDeleteResponseForJdbcFormat() throws IOException {

String jsonDeleteResponse = "{\n" +
" \"took\" : 73,\n" +
" \"timed_out\" : false,\n" +
" \"total\" : 1,\n" +
" \"updated\" : 0,\n" +
" \"created\" : 0,\n" +
" \"deleted\" : 10,\n" +
" \"batches\" : 1,\n" +
" \"version_conflicts\" : 0,\n" +
" \"noops\" : 0,\n" +
" \"retries\" : {\n" +
" \"bulk\" : 0,\n" +
" \"search\" : 0\n" +
" },\n" +
" \"throttled_millis\" : 0,\n" +
" \"requests_per_second\" : -1.0,\n" +
" \"throttled_until_millis\" : 0,\n" +
" \"failures\" : [ ]\n" +
"}\n";

XContentType xContentType = XContentType.JSON;
XContentParser parser = xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
jsonDeleteResponse
);

BulkByScrollResponse deleteResponse = BulkByScrollResponse.fromXContent(parser);
DeleteResultSet deleteResultSet = new DeleteResultSet(client, deleteQuery, deleteResponse);
Schema schema = deleteResultSet.getSchema();
DataRows dataRows = deleteResultSet.getDataRows();

assertThat(schema.getHeaders().size(), equalTo(1));
assertThat(dataRows.getSize(), equalTo(1L));
assertThat(dataRows.iterator().next().getData(DeleteResultSet.DELETED), equalTo(10L));
}

}

0 comments on commit 31f8624

Please sign in to comment.