Skip to content

Commit

Permalink
Merge branch 'apache:main' into apachegh-35709-doc-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dgreiss authored Jun 2, 2023
2 parents 8a656b1 + 5a55fb4 commit fc17eaf
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 34 deletions.
10 changes: 10 additions & 0 deletions go/arrow/array/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,14 @@ var (
_ arrow.Array = (*LargeList)(nil)
_ Builder = (*ListBuilder)(nil)
_ Builder = (*LargeListBuilder)(nil)

_ ListLike = (*List)(nil)
_ ListLike = (*LargeList)(nil)
_ ListLike = (*FixedSizeList)(nil)
_ ListLike = (*Map)(nil)

_ ListLikeBuilder = (*ListBuilder)(nil)
_ ListLikeBuilder = (*LargeListBuilder)(nil)
_ ListLikeBuilder = (*FixedSizeListBuilder)(nil)
_ ListLikeBuilder = (*MapBuilder)(nil)
)
72 changes: 46 additions & 26 deletions go/arrow/datatype_nested.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ import (
"github.com/apache/arrow/go/v13/arrow/internal/debug"
)

type NestedType interface {
DataType
type (
NestedType interface {
DataType

// Fields method provides a copy of NestedType fields
// (so it can be safely mutated and will not result in updating the NestedType).
Fields() []Field
}
// Fields method provides a copy of NestedType fields
// (so it can be safely mutated and will not result in updating the NestedType).
Fields() []Field
}

ListLikeType interface {
DataType
Elem() DataType
}
)

// ListType describes a nested type in which each array slot contains
// a variable-size sequence of values, all having the same relative type.
Expand Down Expand Up @@ -97,11 +104,11 @@ func (t *ListType) ElemField() Field {

func (t *ListType) Fields() []Field { return []Field{t.ElemField()} }

func (ListType) Layout() DataTypeLayout {
func (*ListType) Layout() DataTypeLayout {
return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int32SizeBytes)}}
}

func (ListType) OffsetTypeTraits() OffsetTraits { return Int32Traits }
func (*ListType) OffsetTypeTraits() OffsetTraits { return Int32Traits }

type LargeListType struct {
ListType
Expand All @@ -121,11 +128,11 @@ func (t *LargeListType) Fingerprint() string {
return ""
}

func (LargeListType) Layout() DataTypeLayout {
func (*LargeListType) Layout() DataTypeLayout {
return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int64SizeBytes)}}
}

func (LargeListType) OffsetTypeTraits() OffsetTraits { return Int64Traits }
func (*LargeListType) OffsetTypeTraits() OffsetTraits { return Int64Traits }

func LargeListOfField(f Field) *LargeListType {
if f.Type == nil {
Expand All @@ -134,18 +141,18 @@ func LargeListOfField(f Field) *LargeListType {
return &LargeListType{ListType{elem: f}}
}

// ListOf returns the list type with element type t.
// For example, if t represents int32, ListOf(t) represents []int32.
// LargeListOf returns the list type with element type t.
// For example, if t represents int32, LargeListOf(t) represents []int32.
//
// ListOf panics if t is nil or invalid. NullableElem defaults to true
// LargeListOf panics if t is nil or invalid. NullableElem defaults to true
func LargeListOf(t DataType) *LargeListType {
if t == nil {
panic("arrow: nil DataType")
}
return &LargeListType{ListType{elem: Field{Name: "item", Type: t, Nullable: true}}}
}

// ListOfNonNullable is like ListOf but NullableElem defaults to false, indicating
// LargeListOfNonNullable is like ListOf but NullableElem defaults to false, indicating
// that the child type should be marked as non-nullable.
func LargeListOfNonNullable(t DataType) *LargeListType {
if t == nil {
Expand Down Expand Up @@ -230,7 +237,7 @@ func (t *FixedSizeListType) Fingerprint() string {

func (t *FixedSizeListType) Fields() []Field { return []Field{t.ElemField()} }

func (FixedSizeListType) Layout() DataTypeLayout {
func (*FixedSizeListType) Layout() DataTypeLayout {
return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap()}}
}

Expand Down Expand Up @@ -330,7 +337,7 @@ func (t *StructType) Fingerprint() string {
return b.String()
}

func (StructType) Layout() DataTypeLayout {
func (*StructType) Layout() DataTypeLayout {
return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap()}}
}

Expand Down Expand Up @@ -389,12 +396,10 @@ func (t *MapType) KeyType() DataType { return t.KeyField().Type }
func (t *MapType) ItemField() Field { return t.value.Elem().(*StructType).Field(1) }
func (t *MapType) ItemType() DataType { return t.ItemField().Type }
func (t *MapType) ValueType() *StructType { return t.value.Elem().(*StructType) }
func (t *MapType) ValueField() Field {
return Field{
Name: "entries",
Type: t.ValueType(),
}
}
func (t *MapType) ValueField() Field { return Field{Name: "entries", Type: t.ValueType()} }

// Elem returns the MapType's element type (if treating MapType as ListLikeType)
func (t *MapType) Elem() DataType { return t.ValueType() }

func (t *MapType) SetItemNullable(nullable bool) {
t.value.Elem().(*StructType).fields[1].Nullable = nullable
Expand All @@ -420,7 +425,7 @@ func (t *MapType) Layout() DataTypeLayout {
return t.value.Layout()
}

func (MapType) OffsetTypeTraits() OffsetTraits { return Int32Traits }
func (*MapType) OffsetTypeTraits() OffsetTraits { return Int32Traits }

type (
// UnionTypeCode is an alias to int8 which is the type of the ids
Expand Down Expand Up @@ -502,14 +507,14 @@ func (t *unionType) init(fields []Field, typeCodes []UnionTypeCode) {

// Fields method provides a copy of union type fields
// (so it can be safely mutated and will not result in updating the union type).
func (t unionType) Fields() []Field {
func (t *unionType) Fields() []Field {
fields := make([]Field, len(t.children))
copy(fields, t.children)
return fields
}

func (t unionType) TypeCodes() []UnionTypeCode { return t.typeCodes }
func (t unionType) ChildIDs() []int { return t.childIDs[:] }
func (t *unionType) TypeCodes() []UnionTypeCode { return t.typeCodes }
func (t *unionType) ChildIDs() []int { return t.childIDs[:] }

func (t *unionType) validate(fields []Field, typeCodes []UnionTypeCode, _ UnionMode) error {
if len(fields) != len(typeCodes) {
Expand Down Expand Up @@ -767,7 +772,22 @@ func (f Field) String() string {

var (
_ DataType = (*ListType)(nil)
_ DataType = (*LargeListType)(nil)
_ DataType = (*FixedSizeListType)(nil)
_ DataType = (*StructType)(nil)
_ DataType = (*MapType)(nil)
_ DataType = (*DenseUnionType)(nil)
_ DataType = (*SparseUnionType)(nil)

_ NestedType = (*ListType)(nil)
_ NestedType = (*LargeListType)(nil)
_ NestedType = (*FixedSizeListType)(nil)
_ NestedType = (*MapType)(nil)
_ NestedType = (*DenseUnionType)(nil)
_ NestedType = (*SparseUnionType)(nil)

_ ListLikeType = (*ListType)(nil)
_ ListLikeType = (*LargeListType)(nil)
_ ListLikeType = (*FixedSizeListType)(nil)
_ ListLikeType = (*MapType)(nil)
)
7 changes: 1 addition & 6 deletions go/arrow/ipc/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,12 +595,7 @@ func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) arrow.ArrayData {
return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0)
}

type listLike interface {
arrow.DataType
Elem() arrow.DataType
}

func (ctx *arrayLoaderContext) loadList(dt listLike) arrow.ArrayData {
func (ctx *arrayLoaderContext) loadList(dt arrow.ListLikeType) arrow.ArrayData {
field, buffers := ctx.loadCommon(dt.ID(), 2)
buffers = append(buffers, ctx.buffer())
defer releaseBuffers(buffers)
Expand Down
48 changes: 47 additions & 1 deletion java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <mutex>
#include <utility>
#include <unordered_map>

#include "arrow/array.h"
Expand All @@ -25,6 +26,7 @@
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/engine/substrait/util.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
Expand Down Expand Up @@ -569,7 +571,7 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe
* Signature: (Ljava/lang/String;II)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I(
JNIEnv* env, jobject, jstring uri, jint file_format_id) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
Expand All @@ -582,6 +584,50 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
* Signature: ([Ljava/lang/String;II)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I(
JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) {
JNI_METHOD_START

std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
arrow::dataset::FileSystemFactoryOptions options;

std::vector<std::string> uri_vec = ToStringVector(env, uris);
if (uri_vec.size() == 0) {
JniThrow("No URIs provided.");
}

// If not all URIs, throw exception
if (auto elem = std::find_if_not(uri_vec.begin(), uri_vec.end(), arrow::fs::internal::IsLikelyUri);
elem != uri_vec.end()) {
JniThrow("Unrecognized file type in URI: " + *elem);
}

std::vector<std::string> output_paths;
std::string first_path;
// We know that uri_vec isn't empty, from the conditional above
auto fs = JniGetOrThrow(arrow::fs::FileSystemFromUri(uri_vec[0], &first_path));
output_paths.push_back(first_path);

std::transform(uri_vec.begin() + 1, uri_vec.end(), std::back_inserter(output_paths),
[&](const auto& s) -> std::string {
auto result = JniGetOrThrow(fs->PathFromUri(s));
return std::move(result);
});

std::shared_ptr<arrow::dataset::DatasetFactory> d =
JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(
std::move(fs), std::move(output_paths), file_format, options));
return CreateNativeRef(d);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: writeFromScannerToFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memo
super(allocator, memoryPool, createNative(format, uri));
}

public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format,
String[] uris) {
super(allocator, memoryPool, createNative(format, uris));
}

private static long createNative(FileFormat format, String uri) {
return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id());
}

private static long createNative(FileFormat format, String[] uris) {
return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class JniWrapper {

private static final JniWrapper INSTANCE = new JniWrapper();

public static JniWrapper get() {
JniLoader.get().ensureLoaded();
return INSTANCE;
Expand All @@ -45,6 +45,17 @@ private JniWrapper() {
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat);

/**
* Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a
* intermediate shared_ptr of the factory instance.
*
* @param uris List of file uris to read, each path pointing to an individual file
* @param fileFormat file format ID
* @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
*/
public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat);

/**
* Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally
* depends on C++ write API: FileSystemDataset::Write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,49 @@ public void testBaseParquetRead() throws Exception {
AutoCloseables.close(factory);
}

@Test
public void testMultipleParquetReadFromUris() throws Exception {
ParquetWriteSupport writeSupport1 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
1, "a");
ParquetWriteSupport writeSupport2 = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(),
2, "b");
String expectedJsonUnordered = "[[1,\"a\"],[2,\"b\"]]";

ScanOptions options = new ScanOptions(1);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{writeSupport1.getOutputURI(), writeSupport2.getOutputURI()});
Schema schema = inferResultSchemaFromFactory(factory, options);
List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);

assertScanBatchesProduced(factory, options);
assertEquals(2, datum.size());
datum.forEach(batch -> assertEquals(1, batch.getLength()));
checkParquetReadResult(schema, expectedJsonUnordered, datum);

AutoCloseables.close(datum);
AutoCloseables.close(factory);
}


@Test
public void testMultipleParquetInvalidUri() throws Exception {
RuntimeException exc = assertThrows(RuntimeException.class,
() -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{"https://example.com", "file:///test/location"}));
Assertions.assertEquals("Unrecognized filesystem type in URI: https://example.com", exc.getMessage());
}

@Test
public void testMultipleParquetMultipleFilesystemTypes() throws Exception {
RuntimeException exc = assertThrows(RuntimeException.class,
() -> new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
FileFormat.PARQUET, new String[]{"file:///test/location", "s3:///test/bucket/file" }));
Assertions.assertTrue(
exc.getMessage().startsWith("The filesystem expected a URI with one of the schemes (file) but received s3"
)
);
}

@Test
public void testParquetProjectSingleColumn() throws Exception {
ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
Expand Down

0 comments on commit fc17eaf

Please sign in to comment.