Skip to content

Commit

Permalink
ARROW-17274: [GO] Remove panic from parquet.file.RowGroupReader.Colum…
Browse files Browse the repository at this point in the history
…n(index int) (#13767)

Remove panic from `parquet.file.RowGroupReader.Column(index int)`

- parquet.file.RowGroupReader.Column(index int) panics if the the provided column index is invalid.
- Return an error as the rest of the functions in the codebase return an error as well.  For example GetColumnPageReader returns (PageReader, error) 
- fixed usage in tests

https://github.com/apache/arrow/blob/master/go/parquet/file/row_group_reader.go#L58



Authored-by: ggodik <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
ggodik authored Aug 1, 2022
1 parent 877ed5b commit c517318
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 24 deletions.
24 changes: 18 additions & 6 deletions go/parquet/cmd/parquet_reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,12 @@ func main() {
scanners := make([]*Dumper, len(selectedColumns))
fields := make([]string, len(selectedColumns))
for idx, c := range selectedColumns {
scanners[idx] = createDumper(rgr.Column(c))
fields[idx] = rgr.Column(c).Descriptor().Path()
col, err := rgr.Column(c)
if err != nil {
log.Fatalf("unable to fetch column=%d err=%s", c, err)
}
scanners[idx] = createDumper(col)
fields[idx] = col.Descriptor().Path()
}

var line string
Expand Down Expand Up @@ -283,8 +287,12 @@ func main() {
if idx > 0 {
fmt.Fprint(dataOut, ",")
}
scanners[idx] = createDumper(rgr.Column(c))
fmt.Fprintf(dataOut, "%q", rgr.Column(c).Descriptor().Path())
col, err := rgr.Column(c)
if err != nil {
log.Fatalf("unable to fetch col=%d err=%s", c, err)
}
scanners[idx] = createDumper(col)
fmt.Fprintf(dataOut, "%q", col.Descriptor().Path())
}
fmt.Fprintln(dataOut)

Expand Down Expand Up @@ -334,8 +342,12 @@ func main() {

scanners := make([]*Dumper, len(selectedColumns))
for idx, c := range selectedColumns {
scanners[idx] = createDumper(rgr.Column(c))
fmt.Fprintf(dataOut, fmt.Sprintf("%%-%ds|", colwidth), rgr.Column(c).Descriptor().Name())
col, err := rgr.Column(c)
if err != nil {
log.Fatalf("unable to fetch column=%d err=%s", c, err)
}
scanners[idx] = createDumper(col)
fmt.Fprintf(dataOut, fmt.Sprintf("%%-%ds|", colwidth), col.Descriptor().Name())
}
fmt.Fprintln(dataOut)

Expand Down
35 changes: 28 additions & 7 deletions go/parquet/encryption_read_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
rowsRead := int64(0)

// get col reader for boolean column
colReader := rowGroupReader.Column(0)
colReader, err := rowGroupReader.Column(0)
if err != nil {
panic(err)
}
boolReader := colReader.(*file.BooleanColumnChunkReader)

// get column chunk metadata for boolean column
Expand All @@ -210,7 +213,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
d.EqualValues(i, boolMd.NumValues())

// Get column reader for int32 column
colReader = rowGroupReader.Column(1)
colReader, err = rowGroupReader.Column(1)
if err != nil {
panic(err)
}
int32reader := colReader.(*file.Int32ColumnChunkReader)

int32md, _ := rgMeta.ColumnChunk(1)
Expand All @@ -232,7 +238,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
d.EqualValues(i, int32md.NumValues())

// Get column reader for int64 column
colReader = rowGroupReader.Column(2)
colReader, err = rowGroupReader.Column(2)
if err != nil {
panic(err)
}
int64reader := colReader.(*file.Int64ColumnChunkReader)

int64md, _ := rgMeta.ColumnChunk(2)
Expand Down Expand Up @@ -265,7 +274,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
d.EqualValues(i, int64md.NumValues())

// Get column reader for int96 column
colReader = rowGroupReader.Column(3)
colReader, err = rowGroupReader.Column(3)
if err != nil {
panic(err)
}
int96reader := colReader.(*file.Int96ColumnChunkReader)

int96md, _ := rgMeta.ColumnChunk(3)
Expand Down Expand Up @@ -297,7 +309,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
// try to read them during the plaintext test.
if props.FileDecryptProps != nil {
// Get column reader for the float column
colReader = rowGroupReader.Column(4)
colReader, err = rowGroupReader.Column(4)
if err != nil {
panic(err)
}
floatReader := colReader.(*file.Float32ColumnChunkReader)

floatmd, _ := rgMeta.ColumnChunk(4)
Expand All @@ -320,7 +335,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
d.EqualValues(i, floatmd.NumValues())

// Get column reader for the double column
colReader = rowGroupReader.Column(5)
colReader, err = rowGroupReader.Column(5)
if err != nil {
panic(err)
}
dblReader := colReader.(*file.Float64ColumnChunkReader)

dblmd, _ := rgMeta.ColumnChunk(5)
Expand All @@ -343,7 +361,10 @@ func (d *TestDecryptionSuite) decryptFile(filename string, decryptConfigNum int)
d.EqualValues(i, dblmd.NumValues())
}

colReader = rowGroupReader.Column(6)
colReader, err = rowGroupReader.Column(6)
if err != nil {
panic(err)
}
bareader := colReader.(*file.ByteArrayColumnChunkReader)

bamd, _ := rgMeta.ColumnChunk(6)
Expand Down
11 changes: 8 additions & 3 deletions go/parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (t *SerializeTestSuite) fileSerializeTest(codec compress.Compression, expec
t.False(chunk.HasIndexPage())
t.DefLevelsOut = make([]int16, t.rowsPerRG)
t.RepLevelsOut = make([]int16, t.rowsPerRG)
colReader := rgr.Column(i)
colReader, err := rgr.Column(i)
t.NoError(err)
t.SetupValuesOut(int64(t.rowsPerRG))
valuesRead = t.ReadBatch(colReader, int64(t.rowsPerRG), 0, t.DefLevelsOut, t.RepLevelsOut)
t.EqualValues(t.rowsPerRG, valuesRead)
Expand Down Expand Up @@ -310,7 +311,9 @@ func TestBufferedMultiPageDisabledDictionary(t *testing.T) {
assert.EqualValues(t, valueCount, rgr.NumRows())

var totalRead int64
colReader := rgr.Column(0).(*file.Int32ColumnChunkReader)
col, err := rgr.Column(0)
assert.NoError(t, err)
colReader := col.(*file.Int32ColumnChunkReader)
for colReader.HasNext() {
total, _, _ := colReader.ReadBatch(valueCount-totalRead, valuesOut[totalRead:], nil, nil)
totalRead += total
Expand Down Expand Up @@ -350,7 +353,9 @@ func TestAllNulls(t *testing.T) {
assert.NoError(t, err)

rgr := reader.RowGroup(0)
cr := rgr.Column(0).(*file.Int32ColumnChunkReader)
col, err := rgr.Column(0)
assert.NoError(t, err)
cr := col.(*file.Int32ColumnChunkReader)

defLevels[0] = -1
defLevels[1] = -1
Expand Down
8 changes: 4 additions & 4 deletions go/parquet/file/row_group_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ func (r *RowGroupReader) ByteSize() int64 { return r.rgMetadata.TotalByteSize()
// Column returns a column reader for the requested (0-indexed) column
//
// panics if passed a column not in the range [0, NumColumns)
func (r *RowGroupReader) Column(i int) ColumnChunkReader {
func (r *RowGroupReader) Column(i int) (ColumnChunkReader, error) {
if i >= r.NumColumns() || i < 0 {
panic(fmt.Errorf("parquet: trying to read column index %d but row group metadata only has %d columns", i, r.rgMetadata.NumColumns()))
return nil, fmt.Errorf("parquet: trying to read column index %d but row group metadata only has %d columns", i, r.rgMetadata.NumColumns())
}

descr := r.fileMetadata.Schema.Column(i)
pageRdr, err := r.GetColumnPageReader(i)
if err != nil {
panic(fmt.Errorf("parquet: unable to initialize page reader: %w", err))
return nil, fmt.Errorf("parquet: unable to initialize page reader: %w", err)
}
return NewColumnReader(descr, pageRdr, r.props.Allocator())
return NewColumnReader(descr, pageRdr, r.props.Allocator()), nil
}

func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
Expand Down
12 changes: 8 additions & 4 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ func TestWriteArrowCols(t *testing.T) {
var (
total int64
read int
err error
defLevelsOut = make([]int16, int(expected.NumRows()))
arr = expected.Column(i).Data().Chunk(0)
)
switch expected.Schema().Field(i).Type.(arrow.FixedWidthDataType).BitWidth() {
case 32:
colReader := rgr.Column(i).(*file.Int32ColumnChunkReader)
col, err := rgr.Column(i)
assert.NoError(t, err)
colReader := col.(*file.Int32ColumnChunkReader)
vals := make([]int32, int(expected.NumRows()))
total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil)
require.NoError(t, err)
Expand All @@ -191,7 +192,9 @@ func TestWriteArrowCols(t *testing.T) {
}
}
case 64:
colReader := rgr.Column(i).(*file.Int64ColumnChunkReader)
col, err := rgr.Column(i)
assert.NoError(t, err)
colReader := col.(*file.Int64ColumnChunkReader)
vals := make([]int64, int(expected.NumRows()))
total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -258,7 +261,8 @@ func TestWriteArrowInt96(t *testing.T) {
assert.EqualValues(t, 1, reader.NumRowGroups())

rgr := reader.RowGroup(0)
tsRdr := rgr.Column(3)
tsRdr, err := rgr.Column(3)
assert.NoError(t, err)
assert.Equal(t, parquet.Types.Int96, tsRdr.Type())

rdr := tsRdr.(*file.Int96ColumnChunkReader)
Expand Down

0 comments on commit c517318

Please sign in to comment.