Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Fix JDBC response for delete query #337

Merged
merged 6 commits into from
Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import com.amazon.opendistroforelasticsearch.sql.domain.Delete;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.reindex.BulkByScrollResponse;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class DeleteResultSet extends ResultSet {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
private Delete query;
private Object queryResult;

public static final String DELETED = "deleted_rows";

public DeleteResultSet(Client client, Delete query, Object queryResult) {
this.client = client;
this.query = query;
this.queryResult = queryResult;
this.schema = new Schema(loadColumns());
this.dataRows = new DataRows(loadRows());
}

private List<Schema.Column> loadColumns() {
return Collections.singletonList(new Schema.Column(DELETED, null, Schema.Type.LONG));
chloe-zh marked this conversation as resolved.
Show resolved Hide resolved
}

private List<DataRows.Row> loadRows() {
return Collections.singletonList(new DataRows.Row(loadDeletedData()));
}

private Map<String, Object> loadDeletedData(){
return Collections.singletonMap(DELETED, ((BulkByScrollResponse) queryResult).getDeleted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.sql.executor.format;

import com.amazon.opendistroforelasticsearch.sql.domain.Delete;
import com.amazon.opendistroforelasticsearch.sql.domain.IndexStatement;
import com.amazon.opendistroforelasticsearch.sql.domain.Query;
import com.amazon.opendistroforelasticsearch.sql.domain.QueryStatement;
Expand Down Expand Up @@ -57,7 +58,9 @@ public Protocol(Exception e) {
}

private ResultSet loadResultSet(Client client, QueryStatement queryStatement, Object queryResult) {
if (queryStatement instanceof Query) {
if (queryStatement instanceof Delete) {
return new DeleteResultSet(client, (Delete) queryStatement, queryResult);
} else if (queryStatement instanceof Query) {
return new SelectResultSet(client, (Query) queryStatement, queryResult);
} else if (queryStatement instanceof IndexStatement) {
IndexStatement statement = (IndexStatement) queryStatement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public Schema(IndexStatement statement, List<Column> columns) {
this.columns = columns;
}

public Schema(List<Column> columns){
this.columns = columns;
}

public String getIndexName() {
return indexName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

package com.amazon.opendistroforelasticsearch.sql.esintgtest;

import com.amazon.opendistroforelasticsearch.sql.utils.StringUtils;
import org.json.JSONObject;
import org.junit.Test;

import java.io.IOException;
import java.util.Locale;

import static org.hamcrest.core.IsEqual.equalTo;

Expand All @@ -33,18 +33,11 @@ protected void init() throws Exception {

@Test
public void deleteAllTest() throws IOException, InterruptedException {
String selectQuery = String.format(
Locale.ROOT,
"SELECT * FROM %s/account",
TestsConstants.TEST_INDEX_ACCOUNT
);
String selectQuery = StringUtils.format("SELECT * FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);
JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = String.format(
Locale.ROOT,
"DELETE FROM %s/account",
TestsConstants.TEST_INDEX_ACCOUNT);
String deleteQuery = StringUtils.format("DELETE FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);
response = executeRequest(makeRequest(deleteQuery));
assertThat(response.getInt("deleted"), equalTo(totalHits));

Expand All @@ -58,16 +51,14 @@ public void deleteAllTest() throws IOException, InterruptedException {

@Test
public void deleteWithConditionTest() throws IOException, InterruptedException {
String selectQuery = String.format(
Locale.ROOT,
String selectQuery = StringUtils.format(
"SELECT * FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);
JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = String.format(
Locale.ROOT,
String deleteQuery = StringUtils.format(
"DELETE FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);
Expand All @@ -77,12 +68,76 @@ public void deleteWithConditionTest() throws IOException, InterruptedException {
// To prevent flakiness, the minimum value of 2000 msec works fine.
Thread.sleep(2000);

selectQuery = String.format(
Locale.ROOT,
"SELECT * FROM %s/phrase",
selectQuery = StringUtils.format("SELECT * FROM %s/phrase", TestsConstants.TEST_INDEX_PHRASE);

response = executeRequest(makeRequest(selectQuery));
assertThat(getTotalHits(response), equalTo(5));
}

@Test
public void deleteAllWithJdbcFormat() throws IOException, InterruptedException {
String selectQuery = StringUtils.format("SELECT * FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);
JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = StringUtils.format("DELETE FROM %s/account", TestsConstants.TEST_INDEX_ACCOUNT);

response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
System.out.println(response);
assertThat(response.query("/schema/0/name"), equalTo("deleted_rows"));
assertThat(response.query("/schema/0/type"), equalTo("long"));
assertThat(response.query("/datarows/0/0"), equalTo(totalHits));
assertThat(response.query("/total"), equalTo(1));
assertThat(response.query("/status"), equalTo(200));
assertThat(response.query("/size"), equalTo(1));
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

// The documents are not deleted immediately, causing the next search call to return all results.
// To prevent flakiness, the minimum value of 2000 msec works fine.
Thread.sleep(2000);

response = executeRequest(makeRequest(selectQuery));
assertThat(getTotalHits(response), equalTo(0));

// Multiple invocation of delete query should return deleted == 0
response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
assertThat(response.query("/datarows/0/0"), equalTo(0));
}

@Test
abbashus marked this conversation as resolved.
Show resolved Hide resolved
public void deleteWithConditionTestJdbcFormat() throws IOException, InterruptedException {
String selectQuery = StringUtils.format(
"SELECT * FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);

JSONObject response = executeRequest(makeRequest(selectQuery));
int totalHits = getTotalHits(response);

String deleteQuery = StringUtils.format(
"DELETE FROM %s/phrase WHERE match_phrase(phrase, 'quick fox here')",
TestsConstants.TEST_INDEX_PHRASE
);

response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
System.out.println(response);
assertThat(response.query("/schema/0/name"), equalTo("deleted_rows"));
assertThat(response.query("/schema/0/type"), equalTo("long"));
assertThat(response.query("/datarows/0/0"), equalTo(totalHits));
assertThat(response.query("/total"), equalTo(1));
assertThat(response.query("/status"), equalTo(200));
assertThat(response.query("/size"), equalTo(1));

// The documents are not deleted immediately, causing the next search call to return all results.
// To prevent flakiness, the minimum value of 2000 msec works fine.
Thread.sleep(2000);

selectQuery = StringUtils.format("SELECT * FROM %s/phrase", TestsConstants.TEST_INDEX_PHRASE);

response = executeRequest(makeRequest(selectQuery));
assertThat(getTotalHits(response), equalTo(5));

// Multiple invocation of delete query should return deleted == 0
response = new JSONObject(executeQuery(deleteQuery, "jdbc"));
assertThat(response.query("/datarows/0/0"), equalTo(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.unittest.executor;

import com.amazon.opendistroforelasticsearch.sql.domain.Delete;
import com.amazon.opendistroforelasticsearch.sql.executor.format.DataRows;
import com.amazon.opendistroforelasticsearch.sql.executor.format.DeleteResultSet;
import com.amazon.opendistroforelasticsearch.sql.executor.format.Schema;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.client.node.NodeClient;

import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;

import org.junit.Test;
import org.mockito.Mock;

import java.io.IOException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;


public class DeleteResultSetTest {

@Mock
NodeClient client;

@Mock
Delete deleteQuery;

@Test
public void testDeleteResponseForJdbcFormat() throws IOException {

String jsonDeleteResponse = "{\n" +
" \"took\" : 73,\n" +
" \"timed_out\" : false,\n" +
" \"total\" : 1,\n" +
" \"updated\" : 0,\n" +
" \"created\" : 0,\n" +
" \"deleted\" : 10,\n" +
" \"batches\" : 1,\n" +
" \"version_conflicts\" : 0,\n" +
" \"noops\" : 0,\n" +
" \"retries\" : {\n" +
" \"bulk\" : 0,\n" +
" \"search\" : 0\n" +
" },\n" +
" \"throttled_millis\" : 0,\n" +
" \"requests_per_second\" : -1.0,\n" +
" \"throttled_until_millis\" : 0,\n" +
" \"failures\" : [ ]\n" +
"}\n";

XContentType xContentType = XContentType.JSON;
XContentParser parser = xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
jsonDeleteResponse
);

BulkByScrollResponse deleteResponse = BulkByScrollResponse.fromXContent(parser);
DeleteResultSet deleteResultSet = new DeleteResultSet(client, deleteQuery, deleteResponse);
Schema schema = deleteResultSet.getSchema();
DataRows dataRows = deleteResultSet.getDataRows();

assertThat(schema.getHeaders().size(), equalTo(1));
assertThat(dataRows.getSize(), equalTo(1L));
assertThat(dataRows.iterator().next().getData(DeleteResultSet.DELETED), equalTo(10L));
}

}