Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error propagation to the reader #573

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ example/output/*

# exception to the rule
!example/output/.gitkeep
vendor/
.vscode/
.github/
22 changes: 12 additions & 10 deletions reader/columnbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,17 @@ func (cbt *ColumnBufferType) ReadPageForSkip() (*layout.Page, error) {
}
}

func (cbt *ColumnBufferType) SkipRows(num int64) int64 {
func (cbt *ColumnBufferType) SkipRows(num int64) (int64, error) {
var (
err error
page *layout.Page
)

for cbt.DataTableNumRows < num && err == nil {
page, err = cbt.ReadPageForSkip()
if err != nil {
return 0, err
}
}

if num > cbt.DataTableNumRows {
Expand All @@ -207,7 +210,7 @@ func (cbt *ColumnBufferType) SkipRows(num int64) int64 {

if page != nil {
if err = page.GetValueFromRawData(cbt.SchemaHandler); err != nil {
return 0
return 0, err
}

page.Decode(cbt.DictPage)
Expand All @@ -226,18 +229,17 @@ func (cbt *ColumnBufferType) SkipRows(num int64) int64 {
cbt.DataTable.Merge(tmp)
}

return num
return num, nil
}

func (cbt *ColumnBufferType) ReadRows(num int64) (*layout.Table, int64) {
if cbt.Footer.NumRows == 0 {
return &layout.Table{}, 0
}

func (cbt *ColumnBufferType) ReadRows(num int64) (*layout.Table, int64, error) {
var err error

for cbt.DataTableNumRows < num && err == nil {
err = cbt.ReadPage()
if err != nil {
return nil, 0, err
}
}

if cbt.DataTableNumRows < 0 {
Expand All @@ -252,11 +254,11 @@ func (cbt *ColumnBufferType) ReadRows(num int64) (*layout.Table, int64) {
res := cbt.DataTable.Pop(num)
cbt.DataTableNumRows -= num

if cbt.DataTableNumRows <= 0 { //release previous slice memory
if cbt.DataTableNumRows <= 0 { // release previous slice memory
tmp := cbt.DataTable
cbt.DataTable = layout.NewTableFromTable(tmp)
cbt.DataTable.Merge(tmp)
}
return res, num
return res, num, nil

}
4 changes: 2 additions & 2 deletions reader/columnreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (pr *ParquetReader) ReadColumnByPath(pathStr string, num int64) (values []i
}

if cb, ok := pr.ColumnBuffers[pathStr]; ok {
table, _ := cb.ReadRows(int64(num))
return table.Values, table.RepetitionLevels, table.DefinitionLevels, nil
table, _, err := cb.ReadRows(int64(num))
return table.Values, table.RepetitionLevels, table.DefinitionLevels, err
}
return []interface{}{}, []int32{}, []int32{}, errPathNotFound
}
Expand Down
45 changes: 27 additions & 18 deletions reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package reader
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"reflect"
"strings"
Expand Down Expand Up @@ -30,7 +32,7 @@ type ParquetReader struct {
ObjPartialType reflect.Type
}

//Create a parquet reader: obj is a object with schema tags or a JSON schema string
// Create a parquet reader: obj is a object with schema tags or a JSON schema string
func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64) (*ParquetReader, error) {
var err error
res := new(ParquetReader)
Expand Down Expand Up @@ -95,7 +97,7 @@ func (pr *ParquetReader) SetSchemaHandlerFromJSON(jsonSchema string) error {
return nil
}

//Rename schema name to inname
// Rename schema name to inname
func (pr *ParquetReader) RenameSchema() {
for i := 0; i < len(pr.SchemaHandler.Infos); i++ {
pr.Footer.Schema[i].Name = pr.SchemaHandler.Infos[i].InName
Expand All @@ -118,7 +120,7 @@ func (pr *ParquetReader) GetNumRows() int64 {
return pr.Footer.GetNumRows()
}

//Get the footer size
// Get the footer size
func (pr *ParquetReader) GetFooterSize() (uint32, error) {
var err error
buf := make([]byte, 4)
Expand All @@ -132,7 +134,7 @@ func (pr *ParquetReader) GetFooterSize() (uint32, error) {
return size, err
}

//Read footer from parquet file
// Read footer from parquet file
func (pr *ParquetReader) ReadFooter() error {
size, err := pr.GetFooterSize()
if err != nil {
Expand All @@ -143,13 +145,11 @@ func (pr *ParquetReader) ReadFooter() error {
}
pr.Footer = parquet.NewFileMetaData()
pf := thrift.NewTCompactProtocolFactory()
thriftReader := thrift.NewStreamTransportR(pr.PFile)
bufferReader := thrift.NewTBufferedTransport(thriftReader, int(size))
protocol := pf.GetProtocol(bufferReader)
protocol := pf.GetProtocol(thrift.NewStreamTransportR(pr.PFile))
return pr.Footer.Read(context.TODO(), protocol)
}

//Skip rows of parquet file
// Skip rows of parquet file
func (pr *ParquetReader) SkipRows(num int64) error {
var err error
if num <= 0 {
Expand Down Expand Up @@ -195,7 +195,7 @@ func (pr *ParquetReader) SkipRows(num int64) error {
return err
}

//Read rows of parquet file and unmarshal all to dst
// Read rows of parquet file and unmarshal all to dst
func (pr *ParquetReader) Read(dstInterface interface{}) error {
return pr.read(dstInterface, "")
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (pr *ParquetReader) ReadByNumber(maxReadNumber int) ([]interface{}, error)
return ret, nil
}

//Read rows of parquet file and unmarshal all to dst
// Read rows of parquet file and unmarshal all to dst
func (pr *ParquetReader) ReadPartial(dstInterface interface{}, prefixPath string) error {
prefixPath, err := pr.SchemaHandler.ConvertToInPathStr(prefixPath)
if err != nil {
Expand Down Expand Up @@ -262,7 +262,7 @@ func (pr *ParquetReader) ReadPartialByNumber(maxReadNumber int, prefixPath strin
return ret, nil
}

//Read rows of parquet file with a prefixPath
// Read rows of parquet file with a prefixPath
func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error {
var err error
tmap := make(map[string]*layout.Table)
Expand All @@ -273,7 +273,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error
return nil
}

doneChan := make(chan int, pr.NP)
doneChan := make(chan error, pr.NP)
taskChan := make(chan string, len(pr.ColumnBuffers))
stopChan := make(chan int)

Expand All @@ -285,7 +285,11 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error
return
case pathStr := <-taskChan:
cb := pr.ColumnBuffers[pathStr]
table, _ := cb.ReadRows(int64(num))
table, _, err := cb.ReadRows(int64(num))
if err != nil {
doneChan <- err
return
}
locker.Lock()
if _, ok := tmap[pathStr]; ok {
tmap[pathStr].Merge(table)
Expand All @@ -294,7 +298,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error
tmap[pathStr].Merge(table)
}
locker.Unlock()
doneChan <- 0
doneChan <- nil
}
}
}()
Expand All @@ -307,14 +311,19 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error
readNum++
}
}
errs := make([]error, 0)
for i := 0; i < readNum; i++ {
<-doneChan
err := <-doneChan
if err != nil {
errs = append(errs, err)
}
}

for i := int64(0); i < pr.NP; i++ {
stopChan <- 0
}

if err = errors.Join(errs...); err != nil {
return fmt.Errorf("error reading parquet file: %w", err)
}
dstList := make([]interface{}, pr.NP)
delta := (int64(num) + pr.NP - 1) / pr.NP

Expand Down Expand Up @@ -352,7 +361,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error
return err
}

//Stop Read
// Stop Read
func (pr *ParquetReader) ReadStop() {
for _, cb := range pr.ColumnBuffers {
if cb != nil {
Expand Down