Skip to content

Commit

Permalink
Use duckdb for parquet reading
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Dec 30, 2024
1 parent 8db9dc9 commit 5450820
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 5 deletions.
83 changes: 81 additions & 2 deletions core/dbio/filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type FileSysClient interface {
WriteDataflowReady(df *iop.Dataflow, url string, fileReadyChn chan FileReady, sc iop.StreamConfig) (bw int64, err error)
GetProp(key string, keys ...string) (val string)
SetProp(key string, val string)
Props() map[string]string
MkdirAll(path string) (err error)
GetPath(uri string) (path string, err error)
Query(uri, sql string) (data iop.Dataset, err error)
Expand Down Expand Up @@ -565,7 +566,7 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi
}

var nodes FileNodes
if Cfg.ShouldUseDuckDB() {
if g.In(Cfg.Format, dbio.FileTypeIceberg, dbio.FileTypeDelta) {
nodes = FileNodes{FileNode{URI: url}}
} else {
g.Trace("listing path: %s", url)
Expand All @@ -576,7 +577,15 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi
}
}

df, err = GetDataflow(fs.Self(), nodes, Cfg)
if Cfg.Format == dbio.FileTypeNone {
Cfg.Format = nodes.InferFormat()
}

if g.In(Cfg.Format, dbio.FileTypeParquet) && Cfg.ComputeWithDuckDB() {
df, err = GetDataflowViaDuckDB(fs.Self(), url, nodes, Cfg)
} else {
df, err = GetDataflow(fs.Self(), nodes, Cfg)
}
if err != nil {
err = g.Error(err, "error getting dataflow")
return
Expand Down Expand Up @@ -1057,6 +1066,76 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg iop.FileStreamConfig) (d
return df, nil
}

// GetDataflowViaDuckDB returns a dataflow from specified paths in specified FileSysClient
func GetDataflowViaDuckDB(fs FileSysClient, uri string, nodes FileNodes, cfg iop.FileStreamConfig) (df *iop.Dataflow, err error) {
if len(nodes.Files()) == 0 {
err = g.Error("Provided 0 files for: %#v", nodes)
return
}

df = iop.NewDataflowContext(fs.Context().Ctx, cfg.Limit)
dsCh := make(chan *iop.Datastream)
fs.setDf(df)

go func() {
defer close(dsCh)

ds := iop.NewDatastreamContext(fs.Context().Ctx, nil)
ds.SafeInference = true
ds.SetMetadata(fs.GetProp("METADATA"))
ds.Metadata.StreamURL.Value = uri
ds.SetConfig(fs.Props())

go func() {
// recover from panic
defer func() {
if r := recover(); r != nil {
err := g.Error("panic occurred! %#v\n%s", r, string(debug.Stack()))
ds.Context.CaptureErr(err)
}
}()

// manage concurrency
defer fs.Context().Wg.Read.Done()
fs.Context().Wg.Read.Add()

g.Debug("reading datastream from %s [format=%s, nodes=%d]", uri, cfg.Format, len(nodes.Files()))

uris := strings.Join(nodes.Files().URIs(), iop.DuckDbURISeparator)

// no reader needed for iceberg, delta, duckdb will handle it
cfg.Props = map[string]string{"fs_props": g.Marshal(fs.Props())}
switch cfg.Format {
case dbio.FileTypeParquet:
err = ds.ConsumeParquetReaderDuckDb(uris, cfg)
case dbio.FileTypeCsv:
err = ds.ConsumeCsvReaderDuckDb(uris, cfg)
default:
err = g.Error("unhandled format %s", cfg.Format)
}

if err != nil {
ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", uri))
}

}()

dsCh <- ds

}()

go df.PushStreamChan(dsCh)

// wait for first ds to start streaming.
// columns need to be populated
err = df.WaitReady()
if err != nil {
return df, g.Error(err)
}

return df, nil
}

// MakeDatastream create a datastream from a reader
func MakeDatastream(reader io.Reader, cfg map[string]string) (ds *iop.Datastream, err error) {

Expand Down
33 changes: 33 additions & 0 deletions core/dbio/filesys/fs_file_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/flarco/g"
"github.com/flarco/g/net"
"github.com/gobwas/glob"
"github.com/samber/lo"
"github.com/slingdata-io/sling-cli/core/dbio"
"github.com/slingdata-io/sling-cli/core/dbio/iop"
"github.com/spf13/cast"
Expand Down Expand Up @@ -212,6 +213,38 @@ func (fns FileNodes) Folders() (nodes FileNodes) {
return
}

// InferFormat returns the most common file format
func (fns FileNodes) InferFormat() (format dbio.FileType) {
typeCntMap := map[dbio.FileType]int{
dbio.FileTypeCsv: 0,
dbio.FileTypeJson: 0,
dbio.FileTypeParquet: 0,
}

for _, fileType := range lo.Keys(typeCntMap) {
ext := fileType.Ext()
for _, node := range fns {
if node.IsDir {
continue
}
path := node.Path()
if strings.HasSuffix(path, ext) || strings.Contains(path, ext+".") {
typeCntMap[fileType]++
}
}
}

max := 0
for fileType, cnt := range typeCntMap {
if cnt > max {
max = cnt
format = fileType
}
}

return
}

// Columns returns a column map
func (fns FileNodes) Columns() map[string]iop.Column {
columns := map[string]iop.Column{}
Expand Down
10 changes: 10 additions & 0 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,17 @@ type FileStreamConfig struct {
Props map[string]string `json:"props"`
}

func (sc *FileStreamConfig) ComputeWithDuckDB() bool {
if val := os.Getenv("SLING_DUCKDB_COMPUTE"); val != "" {
return cast.ToBool(val)
}
return true
}

func (sc *FileStreamConfig) ShouldUseDuckDB() bool {
if val := sc.ComputeWithDuckDB(); !val {
return val
}
return g.In(sc.Format, dbio.FileTypeIceberg, dbio.FileTypeDelta) || sc.SQL != ""
}

Expand Down
10 changes: 9 additions & 1 deletion core/dbio/iop/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
duckDbReadOnlyHint = "/* -readonly */"
duckDbSOFMarker = "___start_of_duckdb_result___"
duckDbEOFMarker = "___end_of_duckdb_result___"
DuckDbURISeparator = "|-|+|"
)

// DuckDb is a Duck DB compute layer
Expand Down Expand Up @@ -120,7 +121,7 @@ func (duck *DuckDb) PrepareFsSecretAndURI(uri string) string {

switch scheme {
case dbio.TypeFileLocal:
return strings.TrimPrefix(uri, "file://")
return strings.ReplaceAll(uri, "file://", "")

case dbio.TypeFileS3:
secretKeyMap = map[string]string{
Expand Down Expand Up @@ -959,6 +960,11 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre
where := ""
incrementalWhereCond := "1=1"

uris := strings.Split(uri, DuckDbURISeparator)
for i, val := range uris {
uris[i] = g.F("'%s'", val) // add quotes
}

if fsc.IncrementalKey != "" && fsc.IncrementalValue != "" {
incrementalWhereCond = g.F("%s > %s", dbio.TypeDbDuckDb.Quote(fsc.IncrementalKey), fsc.IncrementalValue)
where = g.F("where %s", incrementalWhereCond)
Expand All @@ -975,6 +981,7 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre
"incremental_where_cond", incrementalWhereCond,
"incremental_value", fsc.IncrementalValue,
"uri", uri,
"uris", strings.Join(uris, ", "),
)

if fsc.Limit > 0 {
Expand All @@ -999,6 +1006,7 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre
g.R(selectStreamScanner, "stream_scanner", streamScanner),
"fields", strings.Join(fields, ","),
"uri", uri,
"uris", strings.Join(uris, ", "),
"where", where,
))

Expand Down
4 changes: 2 additions & 2 deletions core/dbio/templates/duckdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ function:

iceberg_scanner: iceberg_scan('{uri}', allow_moved_paths = true)
delta_scanner: delta_scan('{uri}')
parquet_scanner: parquet_scan('{uri}')
parquet_scanner: read_parquet([{uris}])
# csv_scanner: read_csv('{uri}', delim='{delimiter}', header={header}, columns={columns}, max_line_size=134217728, parallel=true, quote='{quote}', escape='{escape}', nullstr='{null_if}')
csv_scanner: read_csv('{uri}', delim='{delimiter}', header={header}, max_line_size=134217728, parallel=true, quote='{quote}', escape='{escape}', nullstr='{null_if}')
csv_scanner: read_csv([{uris}], delim='{delimiter}', header={header}, max_line_size=134217728, parallel=true, quote='{quote}', escape='{escape}', nullstr='{null_if}')

variable:
bool_as: integer
Expand Down

0 comments on commit 5450820

Please sign in to comment.