Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38503: [Go][Parquet] Make the arrow column writer internal #38727

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions go/parquet/pqarrow/encode_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ func nullableRoot(manifest *SchemaManifest, field *SchemaField) bool {
return nullable
}

// ArrowColumnWriter is a convenience object for easily writing arrow data to a specific
// arrowColumnWriter is a convenience object for easily writing arrow data to a specific
// set of columns in a parquet file. Since a single arrow array can itself be a nested type
// consisting of multiple columns of data, this will write to all of the appropriate leaves in
// the parquet file, allowing easy writing of nested columns.
type ArrowColumnWriter struct {
type arrowColumnWriter struct {
builders []*multipathLevelBuilder
leafCount int
colIdx int
rgw file.RowGroupWriter
}

// NewArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
// newArrowColumnWriter returns a new writer using the chunked array to determine the number of leaf columns,
// and the provided schema manifest to determine the paths for writing the columns.
//
// Using an arrow column writer is a convenience to avoid having to process the arrow array yourself
// and determine the correct definition and repetition levels manually.
func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (ArrowColumnWriter, error) {
func newArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *SchemaManifest, rgw file.RowGroupWriter, leafColIdx int) (arrowColumnWriter, error) {
if data.Len() == 0 {
return ArrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
return arrowColumnWriter{leafCount: calcLeafCount(data.DataType()), rgw: rgw}, nil
}

var (
Expand All @@ -109,7 +109,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
}

if absPos >= int64(data.Len()) {
return ArrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
return arrowColumnWriter{}, errors.New("cannot write data at offset past end of chunked array")
}

leafCount := calcLeafCount(data.DataType())
Expand All @@ -120,7 +120,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch

schemaField, err := manifest.GetColumnField(leafColIdx)
if err != nil {
return ArrowColumnWriter{}, err
return arrowColumnWriter{}, err
}
isNullable = nullableRoot(manifest, schemaField)

Expand All @@ -138,10 +138,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
if arrToWrite.Len() > 0 {
bldr, err := newMultipathLevelBuilder(arrToWrite, isNullable)
if err != nil {
return ArrowColumnWriter{}, nil
return arrowColumnWriter{}, nil
}
if leafCount != bldr.leafCount() {
return ArrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
return arrowColumnWriter{}, fmt.Errorf("data type leaf_count != builder leafcount: %d - %d", leafCount, bldr.leafCount())
}
builders = append(builders, bldr)
}
Expand All @@ -153,14 +153,10 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size int64, manifest *Sch
values += chunkWriteSize
}

return ArrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
return arrowColumnWriter{builders: builders, leafCount: leafCount, rgw: rgw, colIdx: leafColIdx}, nil
}

func (acw *ArrowColumnWriter) LeafCount() int {
return acw.leafCount
}

func (acw *ArrowColumnWriter) Write(ctx context.Context) error {
func (acw *arrowColumnWriter) Write(ctx context.Context) error {
arrCtx := arrowCtxFromContext(ctx)
for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
var (
Expand Down
80 changes: 32 additions & 48 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,24 @@ func TestWriteArrowCols(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()

psc, err := pqarrow.ToParquet(tbl.Schema(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
require.NoError(t, err)

manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4))))

srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
fileWriter, err := pqarrow.NewFileWriter(
tbl.Schema(),
sink,
parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)),
)
require.NoError(t, err)

colIdx := 0
fileWriter.NewRowGroup()
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
colChunk := tbl.Column(int(i)).Data()
err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
require.NoError(t, fileWriter.Close())

expected := makeDateTimeTypesTable(mem, true, false)
defer expected.Release()
Expand Down Expand Up @@ -235,31 +231,24 @@ func TestWriteArrowInt96(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()

props := pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem))

psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props)
require.NoError(t, err)

manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()

writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))

srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
fileWriter, err := pqarrow.NewFileWriter(
tbl.Schema(),
sink,
parquet.NewWriterProperties(parquet.WithAllocator(mem)),
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), pqarrow.WithAllocator(mem)),
)
require.NoError(t, err)

colIdx := 0
fileWriter.NewRowGroup()
for i := int64(0); i < tbl.NumCols(); i++ {
acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, colIdx)
colChunk := tbl.Column(int(i)).Data()
err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx += acw.LeafCount()
}
require.NoError(t, srgw.Close())
require.NoError(t, writer.Close())
require.NoError(t, fileWriter.Close())

expected := makeDateTimeTypesTable(mem, false, false)
defer expected.Release()
Expand Down Expand Up @@ -296,33 +285,28 @@ func TestWriteArrowInt96(t *testing.T) {
func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
wrprops := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props)
require.NoError(t, err)

manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
fileWriter, err := pqarrow.NewFileWriter(
tbl.Schema(),
sink,
parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)),
props,
)
require.NoError(t, err)

writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(wrprops))
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)

offset := int64(0)
for offset < tbl.NumRows() {
sz := utils.Min(rowGroupSize, tbl.NumRows()-offset)
srgw := writer.AppendRowGroup()
colIdx := 0
fileWriter.NewRowGroup()
for i := 0; i < int(tbl.NumCols()); i++ {
col := tbl.Column(i)
acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, colIdx)
colChunk := tbl.Column(i).Data()
err := fileWriter.WriteColumnChunked(colChunk, 0, int64(colChunk.Len()))
require.NoError(t, err)
require.NoError(t, acw.Write(ctx))
colIdx = colIdx + acw.LeafCount()
}
srgw.Close()
offset += sz
}
writer.Close()

require.NoError(t, fileWriter.Close())
return sink.Finish()
}

Expand Down
2 changes: 1 addition & 1 deletion go/parquet/pqarrow/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (fw *FileWriter) Close() error {
// building of writing columns to a file via arrow data without needing to already have
// a record or table.
func (fw *FileWriter) WriteColumnChunked(data *arrow.Chunked, offset, size int64) error {
acw, err := NewArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
acw, err := newArrowColumnWriter(data, offset, size, fw.manifest, fw.rgw, fw.colIdx)
if err != nil {
return err
}
Expand Down
Loading