From 1b5ab7e0a37e17402243c7c7dc12bdbf7fc490e1 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 10 Mar 2023 13:01:14 -0800 Subject: [PATCH] Fix scroll cleaning. Signed-off-by: Yury-Fridlyand --- .../client/OpenSearchNodeClient.java | 11 +++- .../client/OpenSearchRestClient.java | 7 ++- .../request/ContinueScrollRequest.java | 12 +++-- .../request/OpenSearchScrollRequest.java | 5 +- .../scan/OpenSearchPagedIndexScan.java | 8 ++- .../client/OpenSearchNodeClientTest.java | 48 ++++++++++------- .../client/OpenSearchRestClientTest.java | 47 ++++++++-------- .../scan/OpenSearchPagedIndexScanTest.java | 53 ++++++++++++++----- 8 files changed, 119 insertions(+), 72 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index 30584581e3..e4f25dabbd 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -43,7 +43,7 @@ public class OpenSearchNodeClient implements OpenSearchClient { private final NodeClient client; /** - * Constructor of ElasticsearchNodeClient. + * Constructor of OpenSearchNodeClient. */ public OpenSearchNodeClient(NodeClient client) { this.client = client; @@ -172,7 +172,14 @@ public Map meta() { @Override public void cleanup(OpenSearchRequest request) { - request.clean(scrollId -> {}/* client.prepareClearScroll().addScrollId(scrollId).get() */); + request.clean(scrollId -> { + try { + client.prepareClearScroll().addScrollId(scrollId).get(); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to clean up resources for search request " + request, e); + } + }); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index 096899516f..41efc4e5ef 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -174,17 +174,16 @@ public Map meta() { @Override public void cleanup(OpenSearchRequest request) { - request.clean(scrollId -> {/* + request.clean(scrollId -> { try { ClearScrollRequest clearRequest = new ClearScrollRequest(); clearRequest.addScrollId(scrollId); client.clearScroll(clearRequest, RequestOptions.DEFAULT); - } catch (IOException e) { + } catch (Exception e) { throw new IllegalStateException( "Failed to clean up resources for search request " + request, e); - }*/ + } }); - } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinueScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinueScrollRequest.java index 1ec5960b21..cf51680b7e 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinueScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/ContinueScrollRequest.java @@ -31,6 +31,9 @@ public class ContinueScrollRequest implements OpenSearchRequest { @Getter private final OpenSearchExprValueFactory exprValueFactory; + @EqualsAndHashCode.Exclude + private boolean scrollFinished = false; + public ContinueScrollRequest(String scrollId, OpenSearchExprValueFactory exprValueFactory) { this.initialScrollId = scrollId; this.exprValueFactory = exprValueFactory; @@ -44,9 +47,9 @@ public OpenSearchResponse search(Function searchA // TODO if terminated_early - something went wrong, e.g. no scroll returned. var response = new OpenSearchResponse(openSearchResponse, exprValueFactory); - if (!response.isEmpty()) { - responseScrollId = openSearchResponse.getScrollId(); - } // else - last empty page, we should ignore the scroll even if it is returned + // on the last empty page, we should close the scroll + scrollFinished = response.isEmpty(); + responseScrollId = openSearchResponse.getScrollId(); return response; } @@ -63,6 +66,7 @@ public SearchSourceBuilder getSourceBuilder() { @Override public String toCursor() { - return responseScrollId; + // on the last page, we shouldn't return the scroll to user, it is kept for closing (clean) + return scrollFinished ? null : responseScrollId; } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index 8eef94c837..555d520d7f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -88,9 +88,7 @@ public OpenSearchResponse search(Function searchA } var response = new OpenSearchResponse(openSearchResponse, exprValueFactory); - if (!response.isEmpty()) { - setScrollId(openSearchResponse.getScrollId()); - } // else - last empty page, we should ignore the scroll even if it is returned + setScrollId(openSearchResponse.getScrollId()); return response; } @@ -99,6 +97,7 @@ public void clean(Consumer cleanAction) { try { if (isScrollStarted()) { cleanAction.accept(getScrollId()); + setScrollId(null); } } finally { reset(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java index 5ab2fca393..1dc455cfd2 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScan.java @@ -26,6 +26,7 @@ public class OpenSearchPagedIndexScan extends TableScanOperator { @ToString.Include private OpenSearchRequest request; private Iterator iterator; + private boolean needClean = false; public OpenSearchPagedIndexScan(OpenSearchClient client, PagedRequestBuilder requestBuilder) { @@ -56,6 +57,7 @@ public void open() { if (!response.isEmpty()) { iterator = response.iterator(); } else { + needClean = true; iterator = Collections.emptyIterator(); } } @@ -63,8 +65,10 @@ public void open() { @Override public void close() { super.close(); - - client.cleanup(request); + if (needClean) { + // clean on the last page only, to prevent closing the scroll/cursor in the middle of paging. + client.cleanup(request); + } } @Override diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java index ab4171ad22..206e1748f3 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClientTest.java @@ -34,7 +34,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.search.TotalHits; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InOrder; @@ -73,6 +74,7 @@ import org.opensearch.sql.opensearch.response.OpenSearchResponse; @ExtendWith(MockitoExtension.class) +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class OpenSearchNodeClientTest { private static final String TEST_MAPPING_FILE = "mappings/accounts.json"; @@ -104,7 +106,7 @@ void setUp() { } @Test - void isIndexExist() { + void is_index_exist() { when(nodeClient.admin().indices() .exists(any(IndicesExistsRequest.class)).actionGet()) .thenReturn(new IndicesExistsResponse(true)); @@ -113,7 +115,7 @@ void isIndexExist() { } @Test - void isIndexNotExist() { + void is_index_not_exist() { String indexName = "test"; when(nodeClient.admin().indices() .exists(any(IndicesExistsRequest.class)).actionGet()) @@ -123,14 +125,14 @@ void isIndexNotExist() { } @Test - void isIndexExistWithException() { + void is_index_exist_with_exception() { when(nodeClient.admin().indices().exists(any())).thenThrow(RuntimeException.class); assertThrows(IllegalStateException.class, () -> client.exists("test")); } @Test - void createIndex() { + void create_index() { String indexName = "test"; Map mappings = ImmutableMap.of( "properties", @@ -143,7 +145,7 @@ void createIndex() { } @Test - void createIndexWithException() { + void create_index_with_exception() { when(nodeClient.admin().indices().create(any())).thenThrow(RuntimeException.class); assertThrows(IllegalStateException.class, @@ -151,7 +153,7 @@ void createIndexWithException() { } @Test - void getIndexMappings() throws IOException { + void get_index_mappings() throws IOException { URL url = Resources.getResource(TEST_MAPPING_FILE); String mappings = Resources.toString(url, Charsets.UTF_8); String indexName = "test"; @@ -183,7 +185,7 @@ void getIndexMappings() throws IOException { } @Test - void getIndexMappingsWithEmptyMapping() { + void get_index_mappings_with_empty_mapping() { String indexName = "test"; mockNodeClientIndicesMappings(indexName, ""); Map indexMappings = client.getIndexMappings(indexName); @@ -194,7 +196,7 @@ void getIndexMappingsWithEmptyMapping() { } @Test - void getIndexMappingsWithIOException() { + void get_index_mappings_with_IOException() { String indexName = "test"; when(nodeClient.admin().indices()).thenThrow(RuntimeException.class); @@ -202,7 +204,7 @@ void getIndexMappingsWithIOException() { } @Test - void getIndexMappingsWithNonExistIndex() { + void get_index_mappings_with_non_exist_index() { when(nodeClient.admin().indices() .prepareGetMappings(any()) .setLocal(anyBoolean()) @@ -213,7 +215,7 @@ void getIndexMappingsWithNonExistIndex() { } @Test - void getIndexMaxResultWindows() throws IOException { + void get_index_max_result_windows() throws IOException { URL url = Resources.getResource(TEST_MAPPING_SETTINGS_FILE); String indexMetadata = Resources.toString(url, Charsets.UTF_8); String indexName = "accounts"; @@ -227,7 +229,7 @@ void getIndexMaxResultWindows() throws IOException { } @Test - void getIndexMaxResultWindowsWithDefaultSettings() throws IOException { + void get_index_max_result_windows_with_default_settings() throws IOException { URL url = Resources.getResource(TEST_MAPPING_FILE); String indexMetadata = Resources.toString(url, Charsets.UTF_8); String indexName = "accounts"; @@ -241,7 +243,7 @@ void getIndexMaxResultWindowsWithDefaultSettings() throws IOException { } @Test - void getIndexMaxResultWindowsWithIOException() { + void get_index_max_result_windows_with_IOException() { String indexName = "test"; when(nodeClient.admin().indices()).thenThrow(RuntimeException.class); @@ -250,7 +252,7 @@ void getIndexMaxResultWindowsWithIOException() { /** Jacoco enforce this constant lambda be tested. */ @Test - void testAllFieldsPredicate() { + void test_all_fields_predicate() { assertTrue(OpenSearchNodeClient.ALL_FIELDS.apply("any_index").test("any_field")); } @@ -272,8 +274,7 @@ void search() { // Mock second scroll request followed SearchResponse scrollResponse = mock(SearchResponse.class); when(nodeClient.searchScroll(any()).actionGet()).thenReturn(scrollResponse); - // TODO commented out because scroll clean-up is disabled - //when(scrollResponse.getScrollId()).thenReturn("scroll456"); + when(scrollResponse.getScrollId()).thenReturn("scroll456"); when(scrollResponse.getHits()).thenReturn(SearchHits.empty()); // Verify response for first scroll request @@ -314,23 +315,30 @@ void cleanup() { client.cleanup(request); assertFalse(request.isScrollStarted()); - /* TODO: Scroll cleaning is temporary disabled InOrder inOrder = Mockito.inOrder(nodeClient, requestBuilder); inOrder.verify(nodeClient).prepareClearScroll(); inOrder.verify(requestBuilder).addScrollId("scroll123"); inOrder.verify(requestBuilder).get(); - */ } @Test - void cleanupWithoutScrollId() { + void cleanup_without_scrollId() { OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); client.cleanup(request); verify(nodeClient, never()).prepareClearScroll(); } @Test - void getIndices() { + void cleanup_rethrows_exception() { + when(nodeClient.prepareClearScroll()).thenThrow(new RuntimeException()); + + OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); + request.setScrollId("scroll123"); + assertThrows(IllegalStateException.class, () -> client.cleanup(request)); + } + + @Test + void get_indices() { AliasMetadata aliasMetadata = mock(AliasMetadata.class); ImmutableOpenMap.Builder> builder = ImmutableOpenMap.builder(); builder.fPut("index",Arrays.asList(aliasMetadata)); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java index 67c6f51c98..9fcf7a0079 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java @@ -29,7 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.search.TotalHits; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -64,6 +65,7 @@ import org.opensearch.sql.opensearch.response.OpenSearchResponse; @ExtendWith(MockitoExtension.class) +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class OpenSearchRestClientTest { private static final String TEST_MAPPING_FILE = "mappings/accounts.json"; @@ -91,7 +93,7 @@ void setUp() { } @Test - void isIndexExist() throws IOException { + void is_index_exist() throws IOException { when(restClient.indices() .exists(any(), any())) // use any() because missing equals() in GetIndexRequest .thenReturn(true); @@ -100,7 +102,7 @@ void isIndexExist() throws IOException { } @Test - void isIndexNotExist() throws IOException { + void is_index_not_exist() throws IOException { when(restClient.indices() .exists(any(), any())) // use any() because missing equals() in GetIndexRequest .thenReturn(false); @@ -109,14 +111,14 @@ void isIndexNotExist() throws IOException { } @Test - void isIndexExistWithException() throws IOException { + void is_index_exist_with_exception() throws IOException { when(restClient.indices().exists(any(), any())).thenThrow(IOException.class); assertThrows(IllegalStateException.class, () -> client.exists("test")); } @Test - void createIndex() throws IOException { + void create_index() throws IOException { String indexName = "test"; Map mappings = ImmutableMap.of( "properties", @@ -129,7 +131,7 @@ void createIndex() throws IOException { } @Test - void createIndexWithIOException() throws IOException { + void create_index_with_IOException() throws IOException { when(restClient.indices().create(any(), any())).thenThrow(IOException.class); assertThrows(IllegalStateException.class, @@ -137,7 +139,7 @@ void createIndexWithIOException() throws IOException { } @Test - void getIndexMappings() throws IOException { + void get_index_mappings() throws IOException { URL url = Resources.getResource(TEST_MAPPING_FILE); String mappings = Resources.toString(url, Charsets.UTF_8); String indexName = "test"; @@ -173,14 +175,14 @@ void getIndexMappings() throws IOException { } @Test - void getIndexMappingsWithIOException() throws IOException { + void get_index_mappings_with_IOException() throws IOException { when(restClient.indices().getMapping(any(GetMappingsRequest.class), any())) .thenThrow(new IOException()); assertThrows(IllegalStateException.class, () -> client.getIndexMappings("test")); } @Test - void getIndexMaxResultWindowsSettings() throws IOException { + void get_index_max_result_windows_settings() throws IOException { String indexName = "test"; Integer maxResultWindow = 1000; @@ -204,7 +206,7 @@ void getIndexMaxResultWindowsSettings() throws IOException { } @Test - void getIndexMaxResultWindowsDefaultSettings() throws IOException { + void get_index_max_result_windows_default_settings() throws IOException { String indexName = "test"; Integer maxResultWindow = 10000; @@ -228,7 +230,7 @@ void getIndexMaxResultWindowsDefaultSettings() throws IOException { } @Test - void getIndexMaxResultWindowsWithIOException() throws IOException { + void get_index_max_result_windows_with_IOException() throws IOException { when(restClient.indices().getSettings(any(GetSettingsRequest.class), any())) .thenThrow(new IOException()); assertThrows(IllegalStateException.class, () -> client.getIndexMaxResultWindows("test")); @@ -252,8 +254,7 @@ void search() throws IOException { // Mock second scroll request followed SearchResponse scrollResponse = mock(SearchResponse.class); when(restClient.scroll(any(), any())).thenReturn(scrollResponse); - // TODO commented out because scroll clean-up is disabled - //when(scrollResponse.getScrollId()).thenReturn("scroll456"); + when(scrollResponse.getScrollId()).thenReturn("scroll456"); when(scrollResponse.getHits()).thenReturn(SearchHits.empty()); // Verify response for first scroll request @@ -273,7 +274,7 @@ void search() throws IOException { } @Test - void searchWithIOException() throws IOException { + void search_with_IOException() throws IOException { when(restClient.search(any(), any())).thenThrow(new IOException()); assertThrows( IllegalStateException.class, @@ -281,7 +282,7 @@ void searchWithIOException() throws IOException { } @Test - void scrollWithIOException() throws IOException { + void scroll_with_IOException() throws IOException { // Mock first scroll request SearchResponse searchResponse = mock(SearchResponse.class); when(restClient.search(any(), any())).thenReturn(searchResponse); @@ -318,21 +319,19 @@ void cleanup() throws IOException { OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); request.setScrollId("scroll123"); client.cleanup(request); - // TODO: Scroll cleaning is temporary disabled - //verify(restClient).clearScroll(any(), any()); + verify(restClient).clearScroll(any(), any()); assertFalse(request.isScrollStarted()); } @Test - void cleanupWithoutScrollId() throws IOException { + void cleanup_without_scrollId() throws IOException { OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); client.cleanup(request); verify(restClient, never()).clearScroll(any(), any()); } - @Disabled("TODO: Scroll cleaning is temporary disabled") @Test - void cleanupWithIOException() throws IOException { + void cleanup_with_IOException() throws IOException { when(restClient.clearScroll(any(), any())).thenThrow(new IOException()); OpenSearchScrollRequest request = new OpenSearchScrollRequest("test", factory); @@ -341,7 +340,7 @@ void cleanupWithIOException() throws IOException { } @Test - void getIndices() throws IOException { + void get_indices() throws IOException { when(restClient.indices().get(any(GetIndexRequest.class), any(RequestOptions.class))) .thenReturn(getIndexResponse); when(getIndexResponse.getIndices()).thenReturn(new String[] {"index"}); @@ -351,7 +350,7 @@ void getIndices() throws IOException { } @Test - void getIndicesWithIOException() throws IOException { + void get_indices_with_IOException() throws IOException { when(restClient.indices().get(any(GetIndexRequest.class), any(RequestOptions.class))) .thenThrow(new IOException()); assertThrows(IllegalStateException.class, () -> client.indices()); @@ -370,7 +369,7 @@ void meta() throws IOException { } @Test - void metaWithIOException() throws IOException { + void meta_with_IOException() throws IOException { when(restClient.cluster().getSettings(any(), any(RequestOptions.class))) .thenThrow(new IOException()); @@ -378,7 +377,7 @@ void metaWithIOException() throws IOException { } @Test - void mlWithException() { + void ml_with_exception() { assertThrows(UnsupportedOperationException.class, () -> client.getNodeClient()); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java index 9006a0573d..c13e63f01a 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchPagedIndexScanTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.opensearch.storage.scan; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -12,6 +13,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.withSettings; import static org.opensearch.sql.data.type.ExprCoreType.STRING; @@ -31,6 +33,7 @@ import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.request.InitialPageRequestBuilder; import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.request.PagedRequestBuilder; import org.opensearch.sql.opensearch.request.SubsequentPageRequestBuilder; @ExtendWith(MockitoExtension.class) @@ -64,19 +67,31 @@ void query_all_results_initial_scroll_request() { employee(2, "Smith", "HR"), employee(3, "Allen", "IT")}); - InitialPageRequestBuilder builder = new InitialPageRequestBuilder( + PagedRequestBuilder builder = new InitialPageRequestBuilder( new OpenSearchRequest.IndexName("test"), 3, settings, exprValueFactory); try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open(); - assertTrue(indexScan.hasNext()); - assertEquals(employee(1, "John", "IT"), indexScan.next()); + assertAll( + () -> assertTrue(indexScan.hasNext()), + () -> assertEquals(employee(1, "John", "IT"), indexScan.next()), + + () -> assertTrue(indexScan.hasNext()), + () -> assertEquals(employee(2, "Smith", "HR"), indexScan.next()), - assertTrue(indexScan.hasNext()); - assertEquals(employee(2, "Smith", "HR"), indexScan.next()); + () -> assertTrue(indexScan.hasNext()), + () -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()), + + () -> assertFalse(indexScan.hasNext()) + ); + } + // cleanup should be called on empty response only + verify(client, never()).cleanup(any()); - assertTrue(indexScan.hasNext()); - assertEquals(employee(3, "Allen", "IT"), indexScan.next()); + builder = new SubsequentPageRequestBuilder( + new OpenSearchRequest.IndexName("test"), "scroll", exprValueFactory); + try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { + indexScan.open(); assertFalse(indexScan.hasNext()); } @@ -95,14 +110,26 @@ void query_all_results_continuation_scroll_request() { try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { indexScan.open(); - assertTrue(indexScan.hasNext()); - assertEquals(employee(1, "John", "IT"), indexScan.next()); + assertAll( + () -> assertTrue(indexScan.hasNext()), + () -> assertEquals(employee(1, "John", "IT"), indexScan.next()), - assertTrue(indexScan.hasNext()); - assertEquals(employee(2, "Smith", "HR"), indexScan.next()); + () -> assertTrue(indexScan.hasNext()), + () -> assertEquals(employee(2, "Smith", "HR"), indexScan.next()), - assertTrue(indexScan.hasNext()); - assertEquals(employee(3, "Allen", "IT"), indexScan.next()); + () -> assertTrue(indexScan.hasNext()), + () -> assertEquals(employee(3, "Allen", "IT"), indexScan.next()), + + () -> assertFalse(indexScan.hasNext()) + ); + } + // cleanup should be called on empty response only + verify(client, never()).cleanup(any()); + + builder = new SubsequentPageRequestBuilder( + new OpenSearchRequest.IndexName("test"), "scroll", exprValueFactory); + try (OpenSearchPagedIndexScan indexScan = new OpenSearchPagedIndexScan(client, builder)) { + indexScan.open(); assertFalse(indexScan.hasNext()); }