Skip to content

Commit

Permalink
v1.0.68 (#99)
Browse files Browse the repository at this point in the history
* Update BG Truncate

* add transform "parse_bit"

* add "parse_bit" transform as default for MySQL

* fix comments

* use github.com/parquet-go/parquet-go

* split out stream_file_ext

* improve stream_file_ext

* add file stream wildcard matching

* improve GetFormatMap

* add tests

* update flarco/dbio

* fix stream_file_folder

* improve logging

* add expand env_var for dbt connections

* add target object name in log with row count

* fix jsonStream.nextFunc

* add custom

* fix bigquery default credentails

* clean up

* improve error for config

* update flarco/dbio

* fix test r.07.yaml
  • Loading branch information
flarco authored Jan 10, 2024
1 parent bb455a2 commit 2b87350
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 600 deletions.
13 changes: 13 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# These are supported funding model platforms

github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
patreon: # Replace with a single Patreon username
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
otechie: # Replace with a single Otechie username
lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
custom: ['https://www.paypal.com/donate/?hosted_button_id=98DL44Z6JJVWS']
10 changes: 10 additions & 0 deletions cmd/sling/tests/replications/r.07.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
source: local://
target: SQLITE

defaults:
mode: full-refresh
object: 'main.{stream_file_folder}_{stream_file_name}'

streams:
file://./cmd/sling/tests/files/parquet/*:
file://./cmd/sling/tests/files/*.csv:
9 changes: 9 additions & 0 deletions cmd/sling/tests/replications/r.08.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source: DO_SPACES
target: SQLITE

defaults:
mode: full-refresh
object: 'main.do_{stream_file_name}'

streams:
s3://ocral/test.fs.write/*:
9 changes: 9 additions & 0 deletions cmd/sling/tests/replications/r.09.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source: postgres
target: sqlite

defaults:
mode: full-refresh
object: 'main.pg_{stream_table}'

streams:
public.my_table*:
61 changes: 47 additions & 14 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"database/sql/driver"
"io"
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
Expand Down Expand Up @@ -122,6 +121,13 @@ func (cfg *Config) SetDefault() {
cfg.Target.Options.MaxDecimals = g.Int(11)
}

// set default transforms
switch cfg.SrcConn.Type {
case dbio.TypeDbMySQL:
// parse_bit for MySQL
cfg.Source.Options.Transforms = append(cfg.Source.Options.Transforms, "parse_bit")
}

// set vars
for k, v := range cfg.Env {
os.Setenv(k, v)
Expand All @@ -141,7 +147,8 @@ func (cfg *Config) SetDefault() {
// Unmarshal parse a configuration file path or config text
func (cfg *Config) Unmarshal(cfgStr string) error {
cfgBytes := []byte(cfgStr)
if _, err := os.Stat(cfgStr); err == nil {
_, errStat := os.Stat(cfgStr)
if errStat == nil {
cfgFile, err := os.Open(cfgStr)
if err != nil {
return g.Error(err, "Unable to open cfgStr: "+cfgStr)
Expand All @@ -155,7 +162,10 @@ func (cfg *Config) Unmarshal(cfgStr string) error {

err := yaml.Unmarshal(cfgBytes, cfg)
if err != nil {
return g.Error(err, "Error parsing cfgBytes")
if errStat != nil {
return g.Error(errStat, "Error parsing config. Invalid path or raw config provided")
}
return g.Error(err, "Error parsing config")
}

if cfg.Env == nil {
Expand Down Expand Up @@ -575,37 +585,60 @@ func (cfg *Config) GetFormatMap() (m map[string]any, err error) {
}
m["stream_name"] = strings.ToLower(cfg.Source.Stream)

filePath := cleanUp(strings.TrimPrefix(url.Path(), "/"))
filePath := strings.TrimPrefix(url.Path(), "/")
pathArr := strings.Split(strings.TrimSuffix(url.Path(), "/"), "/")
fileName := cleanUp(pathArr[len(pathArr)-1])
fileFolder := cleanUp(lo.Ternary(len(pathArr) > 1, pathArr[len(pathArr)-2], ""))
fileName := pathArr[len(pathArr)-1]
fileFolder := lo.Ternary(len(pathArr) > 1, pathArr[len(pathArr)-2], "")

switch cfg.SrcConn.Type {
case dbio.TypeFileS3, dbio.TypeFileGoogle:
m["source_bucket"] = cfg.SrcConn.Data["bucket"]
if fileFolder != "" {
m["stream_file_folder"] = fileFolder
m["stream_file_folder"] = cleanUp(fileFolder)
}
m["stream_file_name"] = fileName
m["stream_file_name"] = cleanUp(fileName)
case dbio.TypeFileAzure:
m["source_account"] = cfg.SrcConn.Data["account"]
m["source_container"] = cfg.SrcConn.Data["container"]
if fileFolder != "" {
m["stream_file_folder"] = fileFolder
m["stream_file_folder"] = cleanUp(fileFolder)
}
m["stream_file_name"] = fileName
filePath = strings.TrimPrefix(filePath, cast.ToString(m["source_container"])+"_")
m["stream_file_name"] = cleanUp(fileName)
filePath = strings.TrimPrefix(cleanUp(filePath), cast.ToString(m["source_container"])+"_")
case dbio.TypeFileLocal:
path := strings.TrimPrefix(cfg.Source.Stream, "file://")
path = strings.ReplaceAll(path, `\`, `/`)
path = strings.TrimSuffix(path, "/")
path = strings.TrimSuffix(path, "\\")
pathArr = strings.Split(path, "/")

fileName = pathArr[len(pathArr)-1]
fileFolder = lo.Ternary(len(pathArr) > 1, pathArr[len(pathArr)-2], "")

fileFolder, fileName := filepath.Split(path)
m["stream_file_folder"] = cleanUp(strings.TrimPrefix(fileFolder, "/"))
m["stream_file_name"] = cleanUp(strings.TrimPrefix(fileName, "/"))
filePath = cleanUp(strings.TrimPrefix(path, "/"))
}
if filePath != "" {
m["stream_file_path"] = filePath
m["stream_file_path"] = cleanUp(filePath)
}

if fileNameArr := strings.Split(fileName, "."); len(fileNameArr) > 1 {
// remove extension
m["stream_file_ext"] = fileNameArr[len(fileNameArr)-1]
if len(fileNameArr) >= 3 {
// in case of compression (2 extension tokens)
for _, suff := range []string{"gz", "zst", "snappy"} {
if m["stream_file_ext"] == suff {
m["stream_file_ext"] = fileNameArr[len(fileNameArr)-2] + "_" + fileNameArr[len(fileNameArr)-1]
break
}
}
}

m["stream_file_name"] = strings.TrimSuffix(
cast.ToString(m["stream_file_name"]),
"_"+cast.ToString(m["stream_file_ext"]),
)
}
}

Expand Down
83 changes: 73 additions & 10 deletions core/sling/replication.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sling

import (
"context"
"database/sql/driver"
"io"
"os"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
wildcardNames := []string{}
for name := range rd.Streams {
if name == "*" {
return g.Error("Must specify schema when using wildcard: 'my_schema.*', not '*'")
return g.Error("Must specify schema or path when using wildcard: 'my_schema.*', 'file://./my_folder/*', not '*'")
} else if strings.Contains(name, "*") {
wildcardNames = append(wildcardNames, name)
}
Expand All @@ -92,12 +93,28 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
return strings.ToLower(c.Connection.Name)
})
c, ok := connsMap[strings.ToLower(rd.Source)]
if !ok || !c.Connection.Type.IsDb() {
// wildcards only apply to database source connections
return
if !ok {
if strings.EqualFold(rd.Source, "local://") || strings.EqualFold(rd.Source, "file://") {
c = connection.LocalFileConnEntry()
} else {
return
}
}

g.Debug("processing wildcards for %s", rd.Source)
if c.Connection.Type.IsDb() {
return rd.ProcessWildcardsDatabase(c, wildcardNames)
}

if c.Connection.Type.IsFile() {
return rd.ProcessWildcardsFile(c, wildcardNames)
}

return g.Error("invalid connection for wildcards: %s", rd.Source)
}

func (rd *ReplicationConfig) ProcessWildcardsDatabase(c connection.ConnEntry, wildcardNames []string) (err error) {

g.DebugLow("processing wildcards for %s", rd.Source)

conn, err := c.Connection.AsDatabase()
if err != nil {
Expand All @@ -114,7 +131,7 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
continue
}

if schemaT.Name == "*" {
if strings.Contains(schemaT.Name, "*") {
// get all tables in schema
g.Debug("getting tables for %s", wildcardName)
data, err := conn.GetTables(schemaT.Schema)
Expand All @@ -134,10 +151,15 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
}

// add to stream map
newCfg := ReplicationStreamConfig{}
g.Unmarshal(g.Marshal(rd.Streams[wildcardName]), &newCfg) // copy config over
rd.Streams[table.FullName()] = &newCfg
rd.streamsOrdered = append(rd.streamsOrdered, table.FullName())
if g.WildCardMatch(
strings.ToLower(table.FullName()),
[]string{strings.ToLower(schemaT.FullName())},
) {
newCfg := ReplicationStreamConfig{}
g.Unmarshal(g.Marshal(rd.Streams[wildcardName]), &newCfg) // copy config over
rd.Streams[table.FullName()] = &newCfg
rd.streamsOrdered = append(rd.streamsOrdered, table.FullName())
}
}

// delete * from stream map
Expand All @@ -148,6 +170,47 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {

}
}
return
}

func (rd *ReplicationConfig) ProcessWildcardsFile(c connection.ConnEntry, wildcardNames []string) (err error) {
g.DebugLow("processing wildcards for %s", rd.Source)

fs, err := c.Connection.AsFile()
if err != nil {
return g.Error(err, "could not init connection for wildcard processing: %s", rd.Source)
} else if err = fs.Init(context.Background()); err != nil {
return g.Error(err, "could not connect to file system for wildcard processing: %s", rd.Source)
}

for _, wildcardName := range wildcardNames {
nameParts := strings.Split(wildcardName, "/")
lastPart := nameParts[len(nameParts)-1]

if strings.Contains(lastPart, "*") {
parent := strings.TrimSuffix(wildcardName, lastPart)

paths, err := fs.ListRecursive(parent)
if err != nil {
return g.Error(err, "could not list %s", parent)
}

for _, path := range paths {
if g.WildCardMatch(path, []string{lastPart}) && !rd.HasStream(path) {
newCfg := ReplicationStreamConfig{}
g.Unmarshal(g.Marshal(rd.Streams[wildcardName]), &newCfg) // copy config over
rd.Streams[path] = &newCfg
rd.streamsOrdered = append(rd.streamsOrdered, path)
}
}

// delete from stream map
delete(rd.Streams, wildcardName)
rd.streamsOrdered = lo.Filter(rd.streamsOrdered, func(v string, i int) bool {
return v != wildcardName && v != parent
})
}
}

return
}
Expand Down
22 changes: 22 additions & 0 deletions core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,28 @@ func (t *TaskExecution) isUsingPool() bool {
return cast.ToBool(os.Getenv("SLING_CLI")) && t.Config.ReplicationMode
}

func (t *TaskExecution) getTargetObjectValue() string {

switch t.Type {
case FileToDB:
return t.Config.Target.Object
case DbToDb:
return t.Config.Target.Object
case DbToFile:
if t.Config.Options.StdOut {
return "stdout"
}
return t.Config.TgtConn.URL()
case FileToFile:
if t.Config.Options.StdOut {
return "stdout"
}
return t.Config.TgtConn.URL()
}

return ""
}

func (t *TaskExecution) AddCleanupTask(f func()) {
t.Context.Mux.Lock()
defer t.Context.Mux.Unlock()
Expand Down
15 changes: 9 additions & 6 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"net/http"
"os"
"runtime"
"strings"
"time"

_ "net/http/pprof"

"github.com/slingdata-io/sling-cli/core"
"github.com/slingdata-io/sling-cli/core/env"

"github.com/flarco/dbio"
Expand Down Expand Up @@ -72,6 +74,7 @@ func (t *TaskExecution) Execute() error {
// update into store
StoreUpdate(t)

g.DebugLow("Sling version: %s (%s %s)", core.Version, runtime.GOOS, runtime.GOARCH)
g.DebugLow("type is %s", t.Type)
g.Debug("using source options: %s", g.Marshal(t.Config.Source.Options))
g.Debug("using target options: %s", g.Marshal(t.Config.Target.Options))
Expand Down Expand Up @@ -280,7 +283,7 @@ func (t *TaskExecution) runDbToFile() (err error) {
return
}

t.SetProgress("wrote %d rows [%s r/s]", cnt, getRate(cnt))
t.SetProgress("wrote %d rows [%s r/s] to %s", cnt, getRate(cnt), t.getTargetObjectValue())

err = t.df.Err()
return
Expand Down Expand Up @@ -326,7 +329,7 @@ func (t *TaskExecution) runAPIToFile() (err error) {
return
}

t.SetProgress("wrote %d rows [%s r/s]", cnt, getRate(cnt))
t.SetProgress("wrote %d rows [%s r/s] to %s", cnt, getRate(cnt), t.getTargetObjectValue())

err = t.df.Err()
return
Expand Down Expand Up @@ -423,7 +426,7 @@ func (t *TaskExecution) runAPIToDB() (err error) {
}

elapsed := int(time.Since(start).Seconds())
t.SetProgress("inserted %d rows in %d secs [%s r/s]", cnt, elapsed, getRate(cnt))
t.SetProgress("inserted %d rows into %s in %d secs [%s r/s]", cnt, t.getTargetObjectValue(), elapsed, getRate(cnt))

if err != nil {
err = g.Error(t.df.Err(), "error in transfer")
Expand Down Expand Up @@ -504,7 +507,7 @@ func (t *TaskExecution) runFileToDB() (err error) {
}

elapsed := int(time.Since(start).Seconds())
t.SetProgress("inserted %d rows in %d secs [%s r/s]", cnt, elapsed, getRate(cnt))
t.SetProgress("inserted %d rows into %s in %d secs [%s r/s]", cnt, t.getTargetObjectValue(), elapsed, getRate(cnt))

if err != nil {
err = g.Error(t.df.Err(), "error in transfer")
Expand Down Expand Up @@ -550,7 +553,7 @@ func (t *TaskExecution) runFileToFile() (err error) {
return
}

t.SetProgress("wrote %d rows [%s r/s]", cnt, getRate(cnt))
t.SetProgress("wrote %d rows to %s [%s r/s]", cnt, t.getTargetObjectValue(), getRate(cnt))

if t.df.Err() != nil {
err = g.Error(t.df.Err(), "Error in runFileToFile")
Expand Down Expand Up @@ -643,7 +646,7 @@ func (t *TaskExecution) runDbToDb() (err error) {
bytesStr = "[" + val + "]"
}
elapsed := int(time.Since(start).Seconds())
t.SetProgress("inserted %d rows in %d secs [%s r/s] %s", cnt, elapsed, getRate(cnt), bytesStr)
t.SetProgress("inserted %d rows into %s in %d secs [%s r/s] %s", cnt, t.getTargetObjectValue(), elapsed, getRate(cnt), bytesStr)

if t.df.Err() != nil {
err = g.Error(t.df.Err(), "Error running runDbToDb")
Expand Down
Loading

0 comments on commit 2b87350

Please sign in to comment.