Skip to content

Commit

Permalink
Merge pull request #583 from LeeHyeonHee/feature/read_exname
Browse files Browse the repository at this point in the history
(feature) Added column case sensitivity option when reading parquet read
  • Loading branch information
xitongsys authored May 20, 2024
2 parents 8ca067b + a435aa0 commit 75e935f
Showing 1 changed file with 40 additions and 14 deletions.
54 changes: 40 additions & 14 deletions reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/xitongsys/parquet-go/source"
)

type ParquetReaderOptions struct {
CaseInsensitive bool
}

type ParquetReader struct {
SchemaHandler *schema.SchemaHandler
NP int64 //parallel number
Expand All @@ -28,14 +32,23 @@ type ParquetReader struct {
//One reader can only read one type objects
ObjType reflect.Type
ObjPartialType reflect.Type

//Determines whether case sensitivity is enabled
CaseInsensitive bool
}

//Create a parquet reader: obj is a object with schema tags or a JSON schema string
func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64) (*ParquetReader, error) {
// Create a parquet reader: obj is a object with schema tags or a JSON schema string
func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64, opts ...ParquetReaderOptions) (*ParquetReader, error) {
var caseInsensitive bool
if len(opts) > 0 {
caseInsensitive = opts[0].CaseInsensitive
}

var err error
res := new(ParquetReader)
res.NP = np
res.PFile = pFile
res.CaseInsensitive = caseInsensitive
if err = res.ReadFooter(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,19 +108,32 @@ func (pr *ParquetReader) SetSchemaHandlerFromJSON(jsonSchema string) error {
return nil
}

//Rename schema name to inname
// Rename schema name to inname
func (pr *ParquetReader) RenameSchema() {
for i := 0; i < len(pr.SchemaHandler.Infos); i++ {
pr.Footer.Schema[i].Name = pr.SchemaHandler.Infos[i].InName
}

exPathToInPath := make(map[string]string)
if pr.CaseInsensitive {
for exPath, inPath := range pr.SchemaHandler.ExPathToInPath {
lowerCaseKey := strings.ToLower(exPath)
exPathToInPath[lowerCaseKey] = inPath
}
} else {
exPathToInPath = pr.SchemaHandler.ExPathToInPath
}

for _, rowGroup := range pr.Footer.RowGroups {
for _, chunk := range rowGroup.Columns {
exPath := make([]string, 0)
exPath = append(exPath, pr.SchemaHandler.GetRootExName())
exPath = append(exPath, chunk.MetaData.GetPathInSchema()...)
exPath := append([]string{pr.SchemaHandler.GetRootExName()}, chunk.MetaData.GetPathInSchema()...)
exPathStr := common.PathToStr(exPath)

inPathStr := pr.SchemaHandler.ExPathToInPath[exPathStr]
if pr.CaseInsensitive {
exPathStr = strings.ToLower(exPathStr)
}

inPathStr := exPathToInPath[exPathStr]
inPath := common.StrToPath(inPathStr)[1:]
chunk.MetaData.PathInSchema = inPath
}
Expand All @@ -118,7 +144,7 @@ func (pr *ParquetReader) GetNumRows() int64 {
return pr.Footer.GetNumRows()
}

//Get the footer size
// Get the footer size
func (pr *ParquetReader) GetFooterSize() (uint32, error) {
var err error
buf := make([]byte, 4)
Expand All @@ -132,7 +158,7 @@ func (pr *ParquetReader) GetFooterSize() (uint32, error) {
return size, err
}

//Read footer from parquet file
// Read footer from parquet file
func (pr *ParquetReader) ReadFooter() error {
size, err := pr.GetFooterSize()
if err != nil {
Expand All @@ -149,7 +175,7 @@ func (pr *ParquetReader) ReadFooter() error {
return pr.Footer.Read(context.TODO(), protocol)
}

//Skip rows of parquet file
// Skip rows of parquet file
func (pr *ParquetReader) SkipRows(num int64) error {
var err error
if num <= 0 {
Expand Down Expand Up @@ -195,7 +221,7 @@ func (pr *ParquetReader) SkipRows(num int64) error {
return err
}

//Read rows of parquet file and unmarshal all to dst
// Read rows of parquet file and unmarshal all to dst
func (pr *ParquetReader) Read(dstInterface interface{}) error {
return pr.read(dstInterface, "")
}
Expand Down Expand Up @@ -226,7 +252,7 @@ func (pr *ParquetReader) ReadByNumber(maxReadNumber int) ([]interface{}, error)
return ret, nil
}

//Read rows of parquet file and unmarshal all to dst
// Read rows of parquet file and unmarshal all to dst
func (pr *ParquetReader) ReadPartial(dstInterface interface{}, prefixPath string) error {
prefixPath, err := pr.SchemaHandler.ConvertToInPathStr(prefixPath)
if err != nil {
Expand Down Expand Up @@ -262,7 +288,7 @@ func (pr *ParquetReader) ReadPartialByNumber(maxReadNumber int, prefixPath strin
return ret, nil
}

//Read rows of parquet file with a prefixPath
// Read rows of parquet file with a prefixPath
func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error {
var err error
tmap := make(map[string]*layout.Table)
Expand Down Expand Up @@ -352,7 +378,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error
return err
}

//Stop Read
// Stop Read
func (pr *ParquetReader) ReadStop() {
for _, cb := range pr.ColumnBuffers {
if cb != nil {
Expand Down

0 comments on commit 75e935f

Please sign in to comment.