From 5450820a1f95a58f0563ef6590fc53eec39df39f Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sun, 29 Dec 2024 23:50:37 -0300 Subject: [PATCH] Use duckdb for parquet reading --- core/dbio/filesys/fs.go | 83 ++++++++++++++++++++++++++++++- core/dbio/filesys/fs_file_node.go | 33 ++++++++++++ core/dbio/iop/datastream.go | 10 ++++ core/dbio/iop/duckdb.go | 10 +++- core/dbio/templates/duckdb.yaml | 4 +- 5 files changed, 135 insertions(+), 5 deletions(-) diff --git a/core/dbio/filesys/fs.go b/core/dbio/filesys/fs.go index 70cfd782..94c3c882 100755 --- a/core/dbio/filesys/fs.go +++ b/core/dbio/filesys/fs.go @@ -51,6 +51,7 @@ type FileSysClient interface { WriteDataflowReady(df *iop.Dataflow, url string, fileReadyChn chan FileReady, sc iop.StreamConfig) (bw int64, err error) GetProp(key string, keys ...string) (val string) SetProp(key string, val string) + Props() map[string]string MkdirAll(path string) (err error) GetPath(uri string) (path string, err error) Query(uri, sql string) (data iop.Dataset, err error) @@ -565,7 +566,7 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi } var nodes FileNodes - if Cfg.ShouldUseDuckDB() { + if g.In(Cfg.Format, dbio.FileTypeIceberg, dbio.FileTypeDelta) { nodes = FileNodes{FileNode{URI: url}} } else { g.Trace("listing path: %s", url) @@ -576,7 +577,15 @@ func (fs *BaseFileSysClient) ReadDataflow(url string, cfg ...iop.FileStreamConfi } } - df, err = GetDataflow(fs.Self(), nodes, Cfg) + if Cfg.Format == dbio.FileTypeNone { + Cfg.Format = nodes.InferFormat() + } + + if g.In(Cfg.Format, dbio.FileTypeParquet) && Cfg.ComputeWithDuckDB() { + df, err = GetDataflowViaDuckDB(fs.Self(), url, nodes, Cfg) + } else { + df, err = GetDataflow(fs.Self(), nodes, Cfg) + } if err != nil { err = g.Error(err, "error getting dataflow") return @@ -1057,6 +1066,76 @@ func GetDataflow(fs FileSysClient, nodes FileNodes, cfg iop.FileStreamConfig) (d return df, nil } +// GetDataflowViaDuckDB returns a dataflow from specified paths in specified FileSysClient +func GetDataflowViaDuckDB(fs FileSysClient, uri string, nodes FileNodes, cfg iop.FileStreamConfig) (df *iop.Dataflow, err error) { + if len(nodes.Files()) == 0 { + err = g.Error("Provided 0 files for: %#v", nodes) + return + } + + df = iop.NewDataflowContext(fs.Context().Ctx, cfg.Limit) + dsCh := make(chan *iop.Datastream) + fs.setDf(df) + + go func() { + defer close(dsCh) + + ds := iop.NewDatastreamContext(fs.Context().Ctx, nil) + ds.SafeInference = true + ds.SetMetadata(fs.GetProp("METADATA")) + ds.Metadata.StreamURL.Value = uri + ds.SetConfig(fs.Props()) + + go func() { + // recover from panic + defer func() { + if r := recover(); r != nil { + err := g.Error("panic occurred! %#v\n%s", r, string(debug.Stack())) + ds.Context.CaptureErr(err) + } + }() + + // manage concurrency + defer fs.Context().Wg.Read.Done() + fs.Context().Wg.Read.Add() + + g.Debug("reading datastream from %s [format=%s, nodes=%d]", uri, cfg.Format, len(nodes.Files())) + + uris := strings.Join(nodes.Files().URIs(), iop.DuckDbURISeparator) + + // no reader needed for iceberg, delta, duckdb will handle it + cfg.Props = map[string]string{"fs_props": g.Marshal(fs.Props())} + switch cfg.Format { + case dbio.FileTypeParquet: + err = ds.ConsumeParquetReaderDuckDb(uris, cfg) + case dbio.FileTypeCsv: + err = ds.ConsumeCsvReaderDuckDb(uris, cfg) + default: + err = g.Error("unhandled format %s", cfg.Format) + } + + if err != nil { + ds.Context.CaptureErr(g.Error(err, "Error consuming reader for %s", uri)) + } + + }() + + dsCh <- ds + + }() + + go df.PushStreamChan(dsCh) + + // wait for first ds to start streaming. + // columns need to be populated + err = df.WaitReady() + if err != nil { + return df, g.Error(err) + } + + return df, nil +} + // MakeDatastream create a datastream from a reader func MakeDatastream(reader io.Reader, cfg map[string]string) (ds *iop.Datastream, err error) { diff --git a/core/dbio/filesys/fs_file_node.go b/core/dbio/filesys/fs_file_node.go index 9dd75d70..a072cb2b 100644 --- a/core/dbio/filesys/fs_file_node.go +++ b/core/dbio/filesys/fs_file_node.go @@ -10,6 +10,7 @@ import ( "github.com/flarco/g" "github.com/flarco/g/net" "github.com/gobwas/glob" + "github.com/samber/lo" "github.com/slingdata-io/sling-cli/core/dbio" "github.com/slingdata-io/sling-cli/core/dbio/iop" "github.com/spf13/cast" @@ -212,6 +213,38 @@ func (fns FileNodes) Folders() (nodes FileNodes) { return } +// InferFormat returns the most common file format +func (fns FileNodes) InferFormat() (format dbio.FileType) { + typeCntMap := map[dbio.FileType]int{ + dbio.FileTypeCsv: 0, + dbio.FileTypeJson: 0, + dbio.FileTypeParquet: 0, + } + + for _, fileType := range lo.Keys(typeCntMap) { + ext := fileType.Ext() + for _, node := range fns { + if node.IsDir { + continue + } + path := node.Path() + if strings.HasSuffix(path, ext) || strings.Contains(path, ext+".") { + typeCntMap[fileType]++ + } + } + } + + max := 0 + for fileType, cnt := range typeCntMap { + if cnt > max { + max = cnt + format = fileType + } + } + + return +} + // Columns returns a column map func (fns FileNodes) Columns() map[string]iop.Column { columns := map[string]iop.Column{} diff --git a/core/dbio/iop/datastream.go b/core/dbio/iop/datastream.go index e6af17c0..3b92ad7c 100644 --- a/core/dbio/iop/datastream.go +++ b/core/dbio/iop/datastream.go @@ -85,7 +85,17 @@ type FileStreamConfig struct { Props map[string]string `json:"props"` } +func (sc *FileStreamConfig) ComputeWithDuckDB() bool { + if val := os.Getenv("SLING_DUCKDB_COMPUTE"); val != "" { + return cast.ToBool(val) + } + return true +} + func (sc *FileStreamConfig) ShouldUseDuckDB() bool { + if val := sc.ComputeWithDuckDB(); !val { + return val + } return g.In(sc.Format, dbio.FileTypeIceberg, dbio.FileTypeDelta) || sc.SQL != "" } diff --git a/core/dbio/iop/duckdb.go b/core/dbio/iop/duckdb.go index ac190697..d4ee48ef 100644 --- a/core/dbio/iop/duckdb.go +++ b/core/dbio/iop/duckdb.go @@ -29,6 +29,7 @@ var ( duckDbReadOnlyHint = "/* -readonly */" duckDbSOFMarker = "___start_of_duckdb_result___" duckDbEOFMarker = "___end_of_duckdb_result___" + DuckDbURISeparator = "|-|+|" ) // DuckDb is a Duck DB compute layer @@ -120,7 +121,7 @@ func (duck *DuckDb) PrepareFsSecretAndURI(uri string) string { switch scheme { case dbio.TypeFileLocal: - return strings.TrimPrefix(uri, "file://") + return strings.ReplaceAll(uri, "file://", "") case dbio.TypeFileS3: secretKeyMap = map[string]string{ @@ -959,6 +960,11 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre where := "" incrementalWhereCond := "1=1" + uris := strings.Split(uri, DuckDbURISeparator) + for i, val := range uris { + uris[i] = g.F("'%s'", val) // add quotes + } + if fsc.IncrementalKey != "" && fsc.IncrementalValue != "" { incrementalWhereCond = g.F("%s > %s", dbio.TypeDbDuckDb.Quote(fsc.IncrementalKey), fsc.IncrementalValue) where = g.F("where %s", incrementalWhereCond) @@ -975,6 +981,7 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre "incremental_where_cond", incrementalWhereCond, "incremental_value", fsc.IncrementalValue, "uri", uri, + "uris", strings.Join(uris, ", "), ) if fsc.Limit > 0 { @@ -999,6 +1006,7 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre g.R(selectStreamScanner, "stream_scanner", streamScanner), "fields", strings.Join(fields, ","), "uri", uri, + "uris", strings.Join(uris, ", "), "where", where, )) diff --git a/core/dbio/templates/duckdb.yaml b/core/dbio/templates/duckdb.yaml index ec529d4d..a2af753e 100755 --- a/core/dbio/templates/duckdb.yaml +++ b/core/dbio/templates/duckdb.yaml @@ -174,9 +174,9 @@ function: iceberg_scanner: iceberg_scan('{uri}', allow_moved_paths = true) delta_scanner: delta_scan('{uri}') - parquet_scanner: parquet_scan('{uri}') + parquet_scanner: read_parquet([{uris}]) # csv_scanner: read_csv('{uri}', delim='{delimiter}', header={header}, columns={columns}, max_line_size=134217728, parallel=true, quote='{quote}', escape='{escape}', nullstr='{null_if}') - csv_scanner: read_csv('{uri}', delim='{delimiter}', header={header}, max_line_size=134217728, parallel=true, quote='{quote}', escape='{escape}', nullstr='{null_if}') + csv_scanner: read_csv([{uris}], delim='{delimiter}', header={header}, max_line_size=134217728, parallel=true, quote='{quote}', escape='{escape}', nullstr='{null_if}') variable: bool_as: integer