From 3f53904a3bf45c87642084f1d5c862434221bfe1 Mon Sep 17 00:00:00 2001 From: andreaskulicke Date: Sat, 20 Apr 2024 00:30:54 +0200 Subject: [PATCH] Fix pagination for many columns (#2440) (#2441) Signed-off-by: Andreas Kulicke --- .../storage/scan/OpenSearchIndexScan.java | 13 +++++-- .../storage/scan/OpenSearchIndexScanTest.java | 35 +++++++++++++++++-- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index b2e9319bb1..b1e4ccc463 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -108,7 +108,14 @@ public OpenSearchIndexScan() {} public void readExternal(ObjectInput in) throws IOException { int reqSize = in.readInt(); byte[] requestStream = new byte[reqSize]; - in.read(requestStream); + int read = 0; + do { + int currentRead = in.read(requestStream, read, reqSize - read); + if (currentRead == -1) { + throw new IOException(); + } + read += currentRead; + } while (read < reqSize); var engine = (OpenSearchStorageEngine) @@ -137,8 +144,8 @@ public void writeExternal(ObjectOutput out) throws IOException { var reqAsBytes = reqOut.bytes().toBytesRef().bytes; // 3. Write out the byte[] to object output stream. - out.writeInt(reqAsBytes.length); - out.write(reqAsBytes); + out.writeInt(reqOut.size()); + out.write(reqAsBytes, 0, reqOut.size()); out.writeInt(maxResponseSize); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index ac1e9038fb..f813d8f551 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -18,18 +18,25 @@ import static org.opensearch.search.sort.SortOrder.ASC; import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.SneakyThrows; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; @@ -100,9 +107,10 @@ void throws_no_cursor_exception() { } } - @Test @SneakyThrows - void serialize() { + @ParameterizedTest + @ValueSource(ints = {0, 150}) + void serialize(Integer numberOfIncludes) { var searchSourceBuilder = new SearchSourceBuilder().size(4); var factory = mock(OpenSearchExprValueFactory.class); @@ -110,9 +118,14 @@ void serialize() { var index = mock(OpenSearchIndex.class); when(engine.getClient()).thenReturn(client); when(engine.getTable(any(), any())).thenReturn(index); + var includes = + Stream.iterate(1, i -> i + 1) + .limit(numberOfIncludes) + .map(i -> "column" + i) + .collect(Collectors.toList()); var request = new OpenSearchScrollRequest( - INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory, List.of()); + INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory, includes); request.setScrollId("valid-id"); // make a response, so OpenSearchResponse::isEmpty would return true and unset needClean var response = mock(SearchResponse.class); @@ -131,6 +144,22 @@ void serialize() { } } + @SneakyThrows + @Test + void throws_io_exception_if_too_short() { + var request = mock(OpenSearchRequest.class); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + ObjectOutputStream objectOutput = new ObjectOutputStream(output); + objectOutput.writeInt(4); + objectOutput.flush(); + ObjectInputStream objectInput = + new ObjectInputStream(new ByteArrayInputStream(output.toByteArray())); + + try (var indexScan = new OpenSearchIndexScan(client, QUERY_SIZE, request)) { + assertThrows(IOException.class, () -> indexScan.readExternal(objectInput)); + } + } + @Test void plan_for_serialization() { var request = mock(OpenSearchRequest.class);