Skip to content

Commit

Permalink
apacheGH-37845: [Go][Parquet] Check the number of logical fields inst…
Browse files Browse the repository at this point in the history
…ead of physical columns (apache#37846)

### Rationale for this change

This makes it so trying to read with a column chunk reader consistently returns an error if the index is outside the bounds of the logical fields (currently it panics in some cases and returns an error in others).

### What changes are included in this PR?

This makes it so the column chunk reader checks the number of logical fields instead of the number of physical columns when checking if an index is out of range.

### Are these changes tested?

The new test will panics without the accompanying code change.

### Are there any user-facing changes?

Applications that used to panic will now have an error to handle instead.

* Closes: apache#37845

Authored-by: Tim Schaub <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
tschaub authored Sep 26, 2023
1 parent 5978729 commit 2895af4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
4 changes: 2 additions & 2 deletions go/parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in
}

func (fr *FileReader) getColumnReader(ctx context.Context, i int, colFactory itrFactory) (*ColumnReader, error) {
if i < 0 || i >= fr.rdr.MetaData().Schema.NumColumns() {
return nil, fmt.Errorf("invalid column index chosen %d, there are only %d columns", i, fr.rdr.MetaData().Schema.NumColumns())
if i < 0 || i >= len(fr.Manifest.Fields) {
return nil, fmt.Errorf("invalid column index chosen %d, there are only %d columns", i, len(fr.Manifest.Fields))
}

ctx = context.WithValue(ctx, rdrCtxKey{}, readerCtx{
Expand Down
67 changes: 67 additions & 0 deletions go/parquet/pqarrow/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package pqarrow_test
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"testing"

"github.com/apache/arrow/go/v14/arrow"
Expand Down Expand Up @@ -216,3 +218,68 @@ func TestFileReaderWriterMetadata(t *testing.T) {
assert.Equal(t, []string{"foo", "bar"}, kvMeta.Keys())
assert.Equal(t, []string{"bar", "baz"}, kvMeta.Values())
}

func TestFileReaderColumnChunkBoundsErrors(t *testing.T) {
schema := arrow.NewSchema([]arrow.Field{
{Name: "zero", Type: arrow.PrimitiveTypes.Float64},
{Name: "g", Type: arrow.StructOf(
arrow.Field{Name: "one", Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "two", Type: arrow.PrimitiveTypes.Float64},
arrow.Field{Name: "three", Type: arrow.PrimitiveTypes.Float64},
)},
}, nil)

// generate Parquet data with four columns
// that are represented by two logical fields
data := `[
{
"zero": 1,
"g": {
"one": 1,
"two": 1,
"three": 1
}
},
{
"zero": 2,
"g": {
"one": 2,
"two": 2,
"three": 2
}
}
]`

record, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema, strings.NewReader(data))
require.NoError(t, err)

output := &bytes.Buffer{}
writer, err := pqarrow.NewFileWriter(schema, output, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
require.NoError(t, err)

require.NoError(t, writer.Write(record))
require.NoError(t, writer.Close())

fileReader, err := file.NewParquetReader(bytes.NewReader(output.Bytes()))
require.NoError(t, err)

arrowReader, err := pqarrow.NewFileReader(fileReader, pqarrow.ArrowReadProperties{BatchSize: 1024}, memory.DefaultAllocator)
require.NoError(t, err)

// assert that errors are returned for indexes outside the bounds of the logical fields (instead of the physical columns)
ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)
assert.Greater(t, fileReader.NumRowGroups(), 0)
for rowGroupIndex := 0; rowGroupIndex < fileReader.NumRowGroups(); rowGroupIndex += 1 {
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
for fieldNum := 0; fieldNum < schema.NumFields(); fieldNum += 1 {
_, err := rowGroupReader.Column(fieldNum).Read(ctx)
assert.NoError(t, err, "reading field num: %d", fieldNum)
}

_, subZeroErr := rowGroupReader.Column(-1).Read(ctx)
assert.Error(t, subZeroErr)

_, tooHighErr := rowGroupReader.Column(schema.NumFields()).Read(ctx)
assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields()))
}
}

0 comments on commit 2895af4

Please sign in to comment.