Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

[PPL] Add ES rest client to support standalone mode #491

1 change: 1 addition & 0 deletions elasticsearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repositories {
dependencies {
compile project(':core')
compile group: 'org.elasticsearch', name: 'elasticsearch', version: "${es_version}"
compile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-high-level-client', version:"${es_version}"

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public interface ElasticsearchClient {
*/
ElasticsearchResponse search(ElasticsearchRequest request);

/**
* Clean up resources related to the search request, for example scroll context.
* @param request search request
*/
void cleanup(ElasticsearchRequest request);

/**
* Schedule a task to run.
* @param task task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,20 @@ public ElasticsearchResponse search(ElasticsearchRequest request) {
esResponse = client.searchScroll(request.scrollRequest()).actionGet();
} else {
esResponse = client.search(request.searchRequest()).actionGet();
request.setScrollId(esResponse.getScrollId());
}
request.setScrollId(esResponse.getScrollId());

ElasticsearchResponse response = new ElasticsearchResponse(esResponse);
if (response.isEmpty()) {
return new ElasticsearchResponse(esResponse);
}

@Override
public void cleanup(ElasticsearchRequest request) {
if (request.isScrollStarted()) {
client.prepareClearScroll().
addScrollId(esResponse.getScrollId()).
addScrollId(request.getScrollId()).
get();
request.reset();
}
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.elasticsearch.client;

import com.amazon.opendistroforelasticsearch.sql.elasticsearch.mapping.IndexMapping;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.request.ElasticsearchRequest;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.response.ElasticsearchResponse;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Elasticsearch REST client to support standalone mode that runs entire engine from remote.
*
* TODO: Support for authN and authZ with AWS Sigv4 or security plugin.
*/
@RequiredArgsConstructor
public class ElasticsearchRestClient implements ElasticsearchClient {

/**
* Elasticsearch high level REST client
*/
private final RestHighLevelClient client;


@Override
public Map<String, IndexMapping> getIndexMappings(String indexExpression) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexExpression);
try {
GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
return response.mappings().entrySet().stream().
collect(Collectors.toMap(
Map.Entry::getKey,
e -> new IndexMapping(e.getValue()))
);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to get index mappings for " + indexExpression, e);
}
}

@Override
public ElasticsearchResponse search(ElasticsearchRequest request) {
try {
SearchResponse esResponse;
if (request.isScrollStarted()) {
esResponse = client.scroll(request.scrollRequest(), RequestOptions.DEFAULT);
} else {
esResponse = client.search(request.searchRequest(), RequestOptions.DEFAULT);
}
request.setScrollId(esResponse.getScrollId());

return new ElasticsearchResponse(esResponse);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to perform search operation with request " + request, e);
}
}

@Override
public void cleanup(ElasticsearchRequest request) {
try {
if (!request.isScrollStarted()) {
return;
}

ClearScrollRequest clearRequest = new ClearScrollRequest();
clearRequest.addScrollId(request.getScrollId());
client.clearScroll(clearRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to clean up resources for search request " + request, e);
} finally {
request.reset();
}
}

@Override
public void schedule(Runnable task) {
task.run();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import java.util.Objects;

/**
* Elasticsearch search request
* Elasticsearch search request. This has to be stateful because it needs to:
*
* 1) Accumulate search source builder when visiting logical plan to push down operation
* 2) Maintain scroll ID between calls to client search method
*/
@EqualsAndHashCode
@RequiredArgsConstructor
@Getter
@ToString
public class ElasticsearchRequest {

Expand All @@ -56,7 +60,6 @@ public class ElasticsearchRequest {
/**
* Search request source builder.
*/
@Getter
private final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

/**
Expand Down Expand Up @@ -87,4 +90,12 @@ public SearchScrollRequest scrollRequest() {
scrollId(scrollId);
}

/**
* Reset internal state in case any stale data. However, ideally the same instance
* is not supposed to be reused across different physical plan.
*/
public void reset() {
scrollId = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public ElasticsearchIndexScan(ElasticsearchClient client, String indexName) {

@Override
public void open() {
super.open();

// For now pull all results immediately once open
List<ElasticsearchResponse> responses = new ArrayList<>();
ElasticsearchResponse response = client.search(request);
Expand All @@ -83,4 +85,11 @@ public ExprValue next() {
return ExprValueUtils.fromObjectValue(hits.next().getSourceAsMap());
}

@Override
public void close() {
super.close();

client.cleanup(request);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.io.Resources;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -32,7 +33,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -43,7 +43,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.io.IOException;
Expand All @@ -60,6 +62,8 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -156,9 +160,6 @@ public void search() {
when(scrollResponse.getScrollId()).thenReturn("scroll456");
when(scrollResponse.getHits()).thenReturn(SearchHits.empty());

// Mock clear scroll request
when(nodeClient.prepareClearScroll().addScrollId("scroll456").get()).thenReturn(null);

// Verify response for first scroll request
ElasticsearchRequest request = new ElasticsearchRequest("test");
ElasticsearchResponse response1 = client.search(request);
Expand All @@ -178,9 +179,8 @@ public void search() {
void schedule() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.preserveContext(any())).then(invocation -> invocation.getArgument(0));
when(nodeClient.threadPool()).thenReturn(threadPool);

// Instantiate NodeClient because Mockito cannot mock final method threadPool()
nodeClient = new NodeClient(Settings.EMPTY, threadPool);
doAnswer(invocation -> {
Runnable task = invocation.getArgument(0);
task.run();
Expand All @@ -194,6 +194,37 @@ void schedule() {
assertTrue(isRun.get());
}

@Test
void cleanup() {
ElasticsearchNodeClient client = new ElasticsearchNodeClient(mock(ClusterService.class),
nodeClient);

ClearScrollRequestBuilder requestBuilder = mock(ClearScrollRequestBuilder.class);
when(nodeClient.prepareClearScroll()).thenReturn(requestBuilder);
when(requestBuilder.addScrollId(any())).thenReturn(requestBuilder);
when(requestBuilder.get()).thenReturn(null);

ElasticsearchRequest request = new ElasticsearchRequest("test");
request.setScrollId("scroll123");
client.cleanup(request);
assertFalse(request.isScrollStarted());

InOrder inOrder = Mockito.inOrder(nodeClient, requestBuilder);
inOrder.verify(nodeClient).prepareClearScroll();
inOrder.verify(requestBuilder).addScrollId("scroll123");
inOrder.verify(requestBuilder).get();
}

@Test
void cleanupWithoutScrollId() {
ElasticsearchNodeClient client = new ElasticsearchNodeClient(mock(ClusterService.class),
nodeClient);

ElasticsearchRequest request = new ElasticsearchRequest("test");
client.cleanup(request);
verify(nodeClient, never()).prepareClearScroll();
}

private ElasticsearchNodeClient mockClient(String indexName, String mappings) {
ClusterService clusterService = mockClusterService(indexName, mappings);
return new ElasticsearchNodeClient(clusterService, nodeClient);
Expand Down
Loading