diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index d61d980aac0e3..ddec6c5307e90 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -611,4 +611,14 @@ var ( _ arrow.Array = (*LargeList)(nil) _ Builder = (*ListBuilder)(nil) _ Builder = (*LargeListBuilder)(nil) + + _ ListLike = (*List)(nil) + _ ListLike = (*LargeList)(nil) + _ ListLike = (*FixedSizeList)(nil) + _ ListLike = (*Map)(nil) + + _ ListLikeBuilder = (*ListBuilder)(nil) + _ ListLikeBuilder = (*LargeListBuilder)(nil) + _ ListLikeBuilder = (*FixedSizeListBuilder)(nil) + _ ListLikeBuilder = (*MapBuilder)(nil) ) diff --git a/go/arrow/datatype_nested.go b/go/arrow/datatype_nested.go index 368527b4b2de9..314172a21b47f 100644 --- a/go/arrow/datatype_nested.go +++ b/go/arrow/datatype_nested.go @@ -25,13 +25,20 @@ import ( "github.com/apache/arrow/go/v13/arrow/internal/debug" ) -type NestedType interface { - DataType +type ( + NestedType interface { + DataType - // Fields method provides a copy of NestedType fields - // (so it can be safely mutated and will not result in updating the NestedType). - Fields() []Field -} + // Fields method provides a copy of NestedType fields + // (so it can be safely mutated and will not result in updating the NestedType). + Fields() []Field + } + + ListLikeType interface { + DataType + Elem() DataType + } +) // ListType describes a nested type in which each array slot contains // a variable-size sequence of values, all having the same relative type. @@ -97,11 +104,11 @@ func (t *ListType) ElemField() Field { func (t *ListType) Fields() []Field { return []Field{t.ElemField()} } -func (ListType) Layout() DataTypeLayout { +func (*ListType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int32SizeBytes)}} } -func (ListType) OffsetTypeTraits() OffsetTraits { return Int32Traits } +func (*ListType) OffsetTypeTraits() OffsetTraits { return Int32Traits } type LargeListType struct { ListType @@ -121,11 +128,11 @@ func (t *LargeListType) Fingerprint() string { return "" } -func (LargeListType) Layout() DataTypeLayout { +func (*LargeListType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int64SizeBytes)}} } -func (LargeListType) OffsetTypeTraits() OffsetTraits { return Int64Traits } +func (*LargeListType) OffsetTypeTraits() OffsetTraits { return Int64Traits } func LargeListOfField(f Field) *LargeListType { if f.Type == nil { @@ -134,10 +141,10 @@ func LargeListOfField(f Field) *LargeListType { return &LargeListType{ListType{elem: f}} } -// ListOf returns the list type with element type t. -// For example, if t represents int32, ListOf(t) represents []int32. +// LargeListOf returns the list type with element type t. +// For example, if t represents int32, LargeListOf(t) represents []int32. // -// ListOf panics if t is nil or invalid. NullableElem defaults to true +// LargeListOf panics if t is nil or invalid. NullableElem defaults to true func LargeListOf(t DataType) *LargeListType { if t == nil { panic("arrow: nil DataType") @@ -145,7 +152,7 @@ func LargeListOf(t DataType) *LargeListType { return &LargeListType{ListType{elem: Field{Name: "item", Type: t, Nullable: true}}} } -// ListOfNonNullable is like ListOf but NullableElem defaults to false, indicating +// LargeListOfNonNullable is like ListOf but NullableElem defaults to false, indicating // that the child type should be marked as non-nullable. func LargeListOfNonNullable(t DataType) *LargeListType { if t == nil { @@ -230,7 +237,7 @@ func (t *FixedSizeListType) Fingerprint() string { func (t *FixedSizeListType) Fields() []Field { return []Field{t.ElemField()} } -func (FixedSizeListType) Layout() DataTypeLayout { +func (*FixedSizeListType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap()}} } @@ -330,7 +337,7 @@ func (t *StructType) Fingerprint() string { return b.String() } -func (StructType) Layout() DataTypeLayout { +func (*StructType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap()}} } @@ -389,12 +396,10 @@ func (t *MapType) KeyType() DataType { return t.KeyField().Type } func (t *MapType) ItemField() Field { return t.value.Elem().(*StructType).Field(1) } func (t *MapType) ItemType() DataType { return t.ItemField().Type } func (t *MapType) ValueType() *StructType { return t.value.Elem().(*StructType) } -func (t *MapType) ValueField() Field { - return Field{ - Name: "entries", - Type: t.ValueType(), - } -} +func (t *MapType) ValueField() Field { return Field{Name: "entries", Type: t.ValueType()} } + +// Elem returns the MapType's element type (if treating MapType as ListLikeType) +func (t *MapType) Elem() DataType { return t.ValueType() } func (t *MapType) SetItemNullable(nullable bool) { t.value.Elem().(*StructType).fields[1].Nullable = nullable @@ -420,7 +425,7 @@ func (t *MapType) Layout() DataTypeLayout { return t.value.Layout() } -func (MapType) OffsetTypeTraits() OffsetTraits { return Int32Traits } +func (*MapType) OffsetTypeTraits() OffsetTraits { return Int32Traits } type ( // UnionTypeCode is an alias to int8 which is the type of the ids @@ -502,14 +507,14 @@ func (t *unionType) init(fields []Field, typeCodes []UnionTypeCode) { // Fields method provides a copy of union type fields // (so it can be safely mutated and will not result in updating the union type). -func (t unionType) Fields() []Field { +func (t *unionType) Fields() []Field { fields := make([]Field, len(t.children)) copy(fields, t.children) return fields } -func (t unionType) TypeCodes() []UnionTypeCode { return t.typeCodes } -func (t unionType) ChildIDs() []int { return t.childIDs[:] } +func (t *unionType) TypeCodes() []UnionTypeCode { return t.typeCodes } +func (t *unionType) ChildIDs() []int { return t.childIDs[:] } func (t *unionType) validate(fields []Field, typeCodes []UnionTypeCode, _ UnionMode) error { if len(fields) != len(typeCodes) { @@ -767,7 +772,22 @@ func (f Field) String() string { var ( _ DataType = (*ListType)(nil) + _ DataType = (*LargeListType)(nil) _ DataType = (*FixedSizeListType)(nil) _ DataType = (*StructType)(nil) _ DataType = (*MapType)(nil) + _ DataType = (*DenseUnionType)(nil) + _ DataType = (*SparseUnionType)(nil) + + _ NestedType = (*ListType)(nil) + _ NestedType = (*LargeListType)(nil) + _ NestedType = (*FixedSizeListType)(nil) + _ NestedType = (*MapType)(nil) + _ NestedType = (*DenseUnionType)(nil) + _ NestedType = (*SparseUnionType)(nil) + + _ ListLikeType = (*ListType)(nil) + _ ListLikeType = (*LargeListType)(nil) + _ ListLikeType = (*FixedSizeListType)(nil) + _ ListLikeType = (*MapType)(nil) ) diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 658d79d330684..a561352960a66 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -595,12 +595,7 @@ func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) arrow.ArrayData { return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0) } -type listLike interface { - arrow.DataType - Elem() arrow.DataType -} - -func (ctx *arrayLoaderContext) loadList(dt listLike) arrow.ArrayData { +func (ctx *arrayLoaderContext) loadList(dt arrow.ListLikeType) arrow.ArrayData { field, buffers := ctx.loadCommon(dt.ID(), 2) buffers = append(buffers, ctx.buffer()) defer releaseBuffers(buffers) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 48191eac495d0..cba2b4a0dbf42 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include "arrow/array.h" @@ -25,6 +26,7 @@ #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" +#include "arrow/filesystem/path_util.h" #include "arrow/engine/substrait/util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" @@ -569,7 +571,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe * Signature: (Ljava/lang/String;II)J */ JNIEXPORT jlong JNICALL -Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( JNIEnv* env, jobject, jstring uri, jint file_format_id) { JNI_METHOD_START std::shared_ptr file_format = @@ -582,6 +584,50 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( JNI_METHOD_END(-1L) } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeFileSystemDatasetFactory + * Signature: ([Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( + JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { + JNI_METHOD_START + + std::shared_ptr file_format = + JniGetOrThrow(GetFileFormat(file_format_id)); + arrow::dataset::FileSystemFactoryOptions options; + + std::vector uri_vec = ToStringVector(env, uris); + if (uri_vec.size() == 0) { + JniThrow("No URIs provided."); + } + + // If not all URIs, throw exception + if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri); + elem != uri_vec.end()) { + JniThrow("Unrecognized file type in URI: " + *elem); + } + + std::vector output_paths; + std::string first_path; + // We know that uri_vec isn't empty, from the conditional above + auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path)); + output_paths.push_back(first_path); + + std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths), + [&](const auto& s) -> std::string { + auto result = JniGetOrThrow(fs->PathFromUri(s)); + return std::move(result); + }); + + std::shared_ptr d = + JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( + std::move(fs), std::move(output_paths), file_format, options)); + return CreateNativeRef(d); + JNI_METHOD_END(-1L) +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: writeFromScannerToFile diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index 1268d11fe10a9..aa315690592ee 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo super(allocator, memoryPool, createNative(format, uri)); } + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, + String[] uris) { + super(allocator, memoryPool, createNative(format, uris)); + } + private static long createNative(FileFormat format, String uri) { return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); } + private static long createNative(FileFormat format, String[] uris) { + return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); + } + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index 18560a46a5c8d..c3a1a4e58a140 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -25,7 +25,7 @@ public class JniWrapper { private static final JniWrapper INSTANCE = new JniWrapper(); - + public static JniWrapper get() { JniLoader.get().ensureLoaded(); return INSTANCE; @@ -45,6 +45,17 @@ private JniWrapper() { */ public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + /** + * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a + * intermediate shared_ptr of the factory instance. + * + * @param uris List of file uris to read, each path pointing to an individual file + * @param fileFormat file format ID + * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. + * @see FileFormat + */ + public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); + /** * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally * depends on C++ write API: FileSystemDataset::Write. diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index b8a13937a8aad..735b3ae61106d 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -101,6 +101,49 @@ public void testBaseParquetRead() throws Exception { AutoCloseables.close(factory); } + @Test + public void testMultipleParquetReadFromUris() throws Exception { + ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 1, "a"); + ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), + 2, "b"); + String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]"; + + ScanOptions options = new ScanOptions(1); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()}); + Schema schema = inferResultSchemaFromFactory(factory, options); + List datum = collectResultFromFactory(factory, options); + + assertScanBatchesProduced(factory, options); + assertEquals(2, datum.size()); + datum.forEach(batch -> assertEquals(1, batch.getLength())); + checkParquetReadResult(schema, expectedJsonUnordered, datum); + + AutoCloseables.close(datum); + AutoCloseables.close(factory); + } + + + @Test + public void testMultipleParquetInvalidUri() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"})); + Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage()); + } + + @Test + public void testMultipleParquetMultipleFilesystemTypes() throws Exception { + RuntimeException exc = assertThrows(RuntimeException.class, + () -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(), + FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" })); + Assertions.assertTrue( + exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3" + ) + ); + } + @Test public void testParquetProjectSingleColumn() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");