Skip to content

Commit

Permalink
Fix scroll cleaning.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Mar 10, 2023
1 parent 304616d commit 1b5ab7e
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class OpenSearchNodeClient implements OpenSearchClient {
private final NodeClient client;

/**
* Constructor of ElasticsearchNodeClient.
* Constructor of OpenSearchNodeClient.
*/
public OpenSearchNodeClient(NodeClient client) {
this.client = client;
Expand Down Expand Up @@ -172,7 +172,14 @@ public Map<String, String> meta() {

@Override
public void cleanup(OpenSearchRequest request) {
request.clean(scrollId -> {}/* client.prepareClearScroll().addScrollId(scrollId).get() */);
request.clean(scrollId -> {
try {
client.prepareClearScroll().addScrollId(scrollId).get();
} catch (Exception e) {
throw new IllegalStateException(
"Failed to clean up resources for search request " + request, e);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,17 +174,16 @@ public Map<String, String> meta() {

@Override
public void cleanup(OpenSearchRequest request) {
request.clean(scrollId -> {/*
request.clean(scrollId -> {
try {
ClearScrollRequest clearRequest = new ClearScrollRequest();
clearRequest.addScrollId(scrollId);
client.clearScroll(clearRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
} catch (Exception e) {
throw new IllegalStateException(
"Failed to clean up resources for search request " + request, e);
}*/
}
});

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class ContinueScrollRequest implements OpenSearchRequest {
@Getter
private final OpenSearchExprValueFactory exprValueFactory;

@EqualsAndHashCode.Exclude
private boolean scrollFinished = false;

public ContinueScrollRequest(String scrollId, OpenSearchExprValueFactory exprValueFactory) {
this.initialScrollId = scrollId;
this.exprValueFactory = exprValueFactory;
Expand All @@ -44,9 +47,9 @@ public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchA

// TODO if terminated_early - something went wrong, e.g. no scroll returned.
var response = new OpenSearchResponse(openSearchResponse, exprValueFactory);
if (!response.isEmpty()) {
responseScrollId = openSearchResponse.getScrollId();
} // else - last empty page, we should ignore the scroll even if it is returned
// on the last empty page, we should close the scroll
scrollFinished = response.isEmpty();
responseScrollId = openSearchResponse.getScrollId();
return response;
}

Expand All @@ -63,6 +66,7 @@ public SearchSourceBuilder getSourceBuilder() {

@Override
public String toCursor() {
return responseScrollId;
// on the last page, we shouldn't return the scroll to user, it is kept for closing (clean)
return scrollFinished ? null : responseScrollId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchA
}

var response = new OpenSearchResponse(openSearchResponse, exprValueFactory);
if (!response.isEmpty()) {
setScrollId(openSearchResponse.getScrollId());
} // else - last empty page, we should ignore the scroll even if it is returned
setScrollId(openSearchResponse.getScrollId());
return response;
}

Expand All @@ -99,6 +97,7 @@ public void clean(Consumer<String> cleanAction) {
try {
if (isScrollStarted()) {
cleanAction.accept(getScrollId());
setScrollId(null);
}
} finally {
reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class OpenSearchPagedIndexScan extends TableScanOperator {
@ToString.Include
private OpenSearchRequest request;
private Iterator<ExprValue> iterator;
private boolean needClean = false;

public OpenSearchPagedIndexScan(OpenSearchClient client,
PagedRequestBuilder requestBuilder) {
Expand Down Expand Up @@ -56,15 +57,18 @@ public void open() {
if (!response.isEmpty()) {
iterator = response.iterator();
} else {
needClean = true;
iterator = Collections.emptyIterator();
}
}

@Override
public void close() {
super.close();

client.cleanup(request);
if (needClean) {
// clean on the last page only, to prevent closing the scroll/cursor in the middle of paging.
client.cleanup(request);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.search.TotalHits;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.opensearch.sql.opensearch.response.OpenSearchResponse;

@ExtendWith(MockitoExtension.class)
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
class OpenSearchNodeClientTest {

private static final String TEST_MAPPING_FILE = "mappings/accounts.json";
Expand Down Expand Up @@ -104,7 +106,7 @@ void setUp() {
}

@Test
void isIndexExist() {
void is_index_exist() {
when(nodeClient.admin().indices()
.exists(any(IndicesExistsRequest.class)).actionGet())
.thenReturn(new IndicesExistsResponse(true));
Expand All @@ -113,7 +115,7 @@ void isIndexExist() {
}

@Test
void isIndexNotExist() {
void is_index_not_exist() {
String indexName = "test";
when(nodeClient.admin().indices()
.exists(any(IndicesExistsRequest.class)).actionGet())
Expand All @@ -123,14 +125,14 @@ void isIndexNotExist() {
}

@Test
void isIndexExistWithException() {
void is_index_exist_with_exception() {
when(nodeClient.admin().indices().exists(any())).thenThrow(RuntimeException.class);

assertThrows(IllegalStateException.class, () -> client.exists("test"));
}

@Test
void createIndex() {
void create_index() {
String indexName = "test";
Map<String, Object> mappings = ImmutableMap.of(
"properties",
Expand All @@ -143,15 +145,15 @@ void createIndex() {
}

@Test
void createIndexWithException() {
void create_index_with_exception() {
when(nodeClient.admin().indices().create(any())).thenThrow(RuntimeException.class);

assertThrows(IllegalStateException.class,
() -> client.createIndex("test", ImmutableMap.of()));
}

@Test
void getIndexMappings() throws IOException {
void get_index_mappings() throws IOException {
URL url = Resources.getResource(TEST_MAPPING_FILE);
String mappings = Resources.toString(url, Charsets.UTF_8);
String indexName = "test";
Expand Down Expand Up @@ -183,7 +185,7 @@ void getIndexMappings() throws IOException {
}

@Test
void getIndexMappingsWithEmptyMapping() {
void get_index_mappings_with_empty_mapping() {
String indexName = "test";
mockNodeClientIndicesMappings(indexName, "");
Map<String, IndexMapping> indexMappings = client.getIndexMappings(indexName);
Expand All @@ -194,15 +196,15 @@ void getIndexMappingsWithEmptyMapping() {
}

@Test
void getIndexMappingsWithIOException() {
void get_index_mappings_with_IOException() {
String indexName = "test";
when(nodeClient.admin().indices()).thenThrow(RuntimeException.class);

assertThrows(IllegalStateException.class, () -> client.getIndexMappings(indexName));
}

@Test
void getIndexMappingsWithNonExistIndex() {
void get_index_mappings_with_non_exist_index() {
when(nodeClient.admin().indices()
.prepareGetMappings(any())
.setLocal(anyBoolean())
Expand All @@ -213,7 +215,7 @@ void getIndexMappingsWithNonExistIndex() {
}

@Test
void getIndexMaxResultWindows() throws IOException {
void get_index_max_result_windows() throws IOException {
URL url = Resources.getResource(TEST_MAPPING_SETTINGS_FILE);
String indexMetadata = Resources.toString(url, Charsets.UTF_8);
String indexName = "accounts";
Expand All @@ -227,7 +229,7 @@ void getIndexMaxResultWindows() throws IOException {
}

@Test
void getIndexMaxResultWindowsWithDefaultSettings() throws IOException {
void get_index_max_result_windows_with_default_settings() throws IOException {
URL url = Resources.getResource(TEST_MAPPING_FILE);
String indexMetadata = Resources.toString(url, Charsets.UTF_8);
String indexName = "accounts";
Expand All @@ -241,7 +243,7 @@ void getIndexMaxResultWindowsWithDefaultSettings() throws IOException {
}

@Test
void getIndexMaxResultWindowsWithIOException() {
void get_index_max_result_windows_with_IOException() {
String indexName = "test";
when(nodeClient.admin().indices()).thenThrow(RuntimeException.class);

Expand All @@ -250,7 +252,7 @@ void getIndexMaxResultWindowsWithIOException() {

/** Jacoco enforce this constant lambda be tested. */
@Test
void testAllFieldsPredicate() {
void test_all_fields_predicate() {
assertTrue(OpenSearchNodeClient.ALL_FIELDS.apply("any_index").test("any_field"));
}

Expand All @@ -272,8 +274,7 @@ void search() {
// Mock second scroll request followed
SearchResponse scrollResponse = mock(SearchResponse.class);
when(nodeClient.searchScroll(any()).actionGet()).thenReturn(scrollResponse);
// TODO commented out because scroll clean-up is disabled
//when(scrollResponse.getScrollId()).thenReturn("scroll456");
when(scrollResponse.getScrollId()).thenReturn("scroll456");
when(scrollResponse.getHits()).thenReturn(SearchHits.empty());

// Verify response for first scroll request
Expand Down Expand Up @@ -314,23 +315,30 @@ void cleanup() {
client.cleanup(request);
assertFalse(request.isScrollStarted());

/* TODO: Scroll cleaning is temporary disabled
InOrder inOrder = Mockito.inOrder(nodeClient, requestBuilder);
inOrder.verify(nodeClient).prepareClearScroll();
inOrder.verify(requestBuilder).addScrollId("scroll123");
inOrder.verify(requestBuilder).get();
*/
}

@Test
void cleanupWithoutScrollId() {
void cleanup_without_scrollId() {
OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory);
client.cleanup(request);
verify(nodeClient, never()).prepareClearScroll();
}

@Test
void getIndices() {
void cleanup_rethrows_exception() {
when(nodeClient.prepareClearScroll()).thenThrow(new RuntimeException());

OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory);
request.setScrollId("scroll123");
assertThrows(IllegalStateException.class, () -> client.cleanup(request));
}

@Test
void get_indices() {
AliasMetadata aliasMetadata = mock(AliasMetadata.class);
ImmutableOpenMap.Builder<String, List<AliasMetadata>> builder = ImmutableOpenMap.builder();
builder.fPut("index",Arrays.asList(aliasMetadata));
Expand Down
Loading

0 comments on commit 1b5ab7e

Please sign in to comment.