From 13e13c425c2426bd975ee99ef94620ee7bca670c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 9 Nov 2021 10:56:04 +0800 Subject: [PATCH 1/2] IllegalStateException reading data with struct type --- .../arrow/dataset/jni/NativeScanner.java | 13 +++- .../dataset/file/TestFileSystemDataset.java | 62 ++++++++++++++++++ .../resources/data/struct_example.parquet | Bin 0 -> 1250 bytes 3 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 java/dataset/src/test/resources/data/struct_example.parquet diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index a8374f1deb3e8..56029b593617d 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -48,6 +48,8 @@ public class NativeScanner implements Scanner { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock writeLock = lock.writeLock(); private final Lock readLock = lock.readLock(); + + private Schema schema = null; private boolean closed = false; public NativeScanner(NativeContext context, ScanOptions options, long scannerId) { @@ -92,7 +94,7 @@ public boolean hasNext() { } peek = UnsafeRecordBatchSerializer.deserializeUnsafe(context.getAllocator(), bytes); if (options.getColumns() != null) { - Preconditions.checkState(peek.getNodes().size() == options.getColumns().length); + Preconditions.checkState(schema().getFields().size() == options.getColumns().length); } return true; } @@ -122,12 +124,19 @@ public Iterable scan() { @Override public Schema schema() { + if (schema != null) { + return schema; + } readLock.lock(); + if (schema != null) { + return schema; + } try { if (closed) { throw new NativeInstanceReleasedException(); } - return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + schema = SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + return schema; } catch (IOException e) { throw new RuntimeException(e); } finally { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index a2ebe67b8b1d2..3480a3cd8c6cd 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -200,6 +200,68 @@ public void testCsvReadTab() throws Exception { AutoCloseables.close(vsr, allocator); } + + @Test + public void testStructTypeRead() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[] {"_1"}, Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + + VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + AutoCloseables.close(vsr, allocator); + } + + @Test + public void testStructTypeReadWithEmptyProjector() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[0], Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + Schema scannerSchema = nativeScanner.schema(); + VectorSchemaRoot vsr = VectorSchemaRoot.create(scannerSchema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + assertEquals(0, scannerSchema.getFields().size()); + AutoCloseables.close(vsr, allocator); + } + @Test public void testReadPartialFile() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); diff --git a/java/dataset/src/test/resources/data/struct_example.parquet b/java/dataset/src/test/resources/data/struct_example.parquet new file mode 100644 index 0000000000000000000000000000000000000000..cf94819defc3a294b4e132238a1fcc4f1e03f62c GIT binary patch literal 1250 zcmb8vQE!t_6bJA#3Y5*(t-6b#s1?`RDMKhNEz4efGd`Hb_@prnEp#^0b^|)IEPL@I znCOQvKAFV_KY<^^58$)U{>!>)GIg%$FWlUFIOm=VVe`Qgn-TjcvcsIg0t`haX^Ldi z(NB?P1Cpushgqz?&ndJ3fDk`>u>{ZLK$v>4!2a}UAIc}WJ4VsXh0KOXhFL)&wFC= z8E;!`fo!OQ0}W_`3oU4aSDLN||HVNZ@J_w-W;*z*2G;(;Xtc#yGYyY%YV@#>Z2R%6 zPSSfB4t>T^lT)~k&3IybB^w^i)EUP_YVryn4U!abtbpZw7=^=dlvar3K-E;EDrt;Y znnh{PJ}{4)i1op3TlN!^Zo0Qd45y?(u;;42eVQR+X z*XA%x{lHIszOE-}Jn5ynq4x)26eRlk^L>5f52s5V8`DQ&=M2}Q@n{P- zk0w#%cOxW2mC(?SH=wWY9~>CJ3e`^+^5fXw!}RQNJw4g&Aj{kvk5YdyN*<%aQTBr~ z%Dp%{fRsnG<60ZW<=Od`ak!Rgmuub0APUNU+Y9>bzUMjht{e7U*Kq>NwOlW3T0O7Z Y@>*6m><0F#YG_3G(;;4nO?=&c0$&vG1poj5 literal 0 HcmV?d00001 From b4a837bfdd69782d2728c47ba00291eed19fbaf7 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 9 Nov 2021 11:10:46 +0800 Subject: [PATCH 2/2] fixup --- .../java/org/apache/arrow/dataset/jni/NativeScanner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index 56029b593617d..8745dd7cba034 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -128,10 +128,10 @@ public Schema schema() { return schema; } readLock.lock(); - if (schema != null) { - return schema; - } try { + if (schema != null) { + return schema; + } if (closed) { throw new NativeInstanceReleasedException(); }