Skip to content

Commit

Permalink
V1.0.61 (#67)
Browse files Browse the repository at this point in the history
* apply quotes in update-key expression

* improve ErrorHelper

* improve applyColumnCasing

* add source/target options debug log

* allow all columns coersion

* improve sourceOptionsMap to handle map[any]any
  • Loading branch information
flarco authored Nov 27, 2023
1 parent 2a6417a commit 70da2cd
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 58 deletions.
25 changes: 1 addition & 24 deletions cmd/sling/sling.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -494,7 +493,7 @@ func cliInit() int {
// eid := sentry.CaptureException(err)
}

if eh := errorHelper(err); eh != "" {
if eh := sling.ErrorHelper(err); eh != "" {
println()
println(color.MagentaString(eh))
println()
Expand Down Expand Up @@ -525,25 +524,3 @@ func getErrString(err error) (errString string) {
}
return
}

func errorHelper(err error) (helpString string) {
if err != nil {
errString := strings.ToLower(err.Error())
E, ok := err.(*g.ErrType)
if ok && E.Debug() != "" {
errString = strings.ToLower(E.Full())
}

switch {
case strings.Contains(errString, "utf8") || strings.Contains(errString, "ascii"):
helpString = "Perhaps the 'transforms' source option could help with encodings? See https://docs.slingdata.io/sling-cli/running-tasks#advanced-options"
case strings.Contains(errString, "failed to verify certificate"):
helpString = "Perhaps specifying `encrypt=true` and `TrustServerCertificate=true` properties could help? See https://docs.slingdata.io/connections/database-connections/sqlserver"
case strings.Contains(errString, "ssl is not enabled on the server"):
helpString = "Perhaps setting the 'sslmode' option could help? See https://docs.slingdata.io/connections/database-connections/postgres"
case strings.Contains(errString, "invalid input syntax for type"):
helpString = "Perhaps setting a higher 'SAMPLE_SIZE' environment variable could help? This represents the number of records to process in order to infer column types (especially for file sources). The default is 900. Try 2000 or even higher."
}
}
return
}
10 changes: 5 additions & 5 deletions core/sling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,23 @@ func TestColumnCasing(t *testing.T) {
targetCasing := TargetColumnCasing

df.Columns = iop.NewColumns(iop.Column{Name: "myCol"})
applyColumnCasing(df, dbio.TypeDbSnowflake, &sourceCasing)
applyColumnCasingToDf(df, dbio.TypeDbSnowflake, &sourceCasing)
assert.Equal(t, "myCol", df.Columns[0].Name)

df.Columns = iop.NewColumns(iop.Column{Name: "myCol"}, iop.Column{Name: "hey-hey"})
applyColumnCasing(df, dbio.TypeDbSnowflake, &snakeCasing)
applyColumnCasingToDf(df, dbio.TypeDbSnowflake, &snakeCasing)
assert.Equal(t, "MY_COL", df.Columns[0].Name)
assert.Equal(t, "HEY_HEY", df.Columns[1].Name)

df.Columns = iop.NewColumns(iop.Column{Name: "myCol"})
applyColumnCasing(df, dbio.TypeDbSnowflake, &targetCasing)
applyColumnCasingToDf(df, dbio.TypeDbSnowflake, &targetCasing)
assert.Equal(t, "MYCOL", df.Columns[0].Name)

df.Columns = iop.NewColumns(iop.Column{Name: "DHL OriginalTracking-Number"})
applyColumnCasing(df, dbio.TypeDbDuckDb, &targetCasing)
applyColumnCasingToDf(df, dbio.TypeDbDuckDb, &targetCasing)
assert.Equal(t, "dhl_originaltracking_number", df.Columns[0].Name)

df.Columns = iop.NewColumns(iop.Column{Name: "DHL OriginalTracking-Number"})
applyColumnCasing(df, dbio.TypeDbDuckDb, &snakeCasing)
applyColumnCasingToDf(df, dbio.TypeDbDuckDb, &snakeCasing)
assert.Equal(t, "dhl_original_tracking_number", df.Columns[0].Name)
}
72 changes: 51 additions & 21 deletions core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,14 @@ func (t *TaskExecution) sourceOptionsMap() (options map[string]any) {
}
columns = append(columns, col)
}
case map[any]any:
for colName, colType := range colsCasted {
col := iop.Column{
Name: cast.ToString(colName),
Type: iop.ColumnType(cast.ToString(colType)),
}
columns = append(columns, col)
}
case []map[string]any:
for _, colItem := range colsCasted {
col := iop.Column{}
Expand All @@ -305,7 +313,7 @@ func (t *TaskExecution) sourceOptionsMap() (options map[string]any) {
case iop.Columns:
columns = colsCasted
default:
g.Warn("Config.Source.Options.Columns not handled")
g.Warn("Config.Source.Options.Columns not handled: %T", t.Config.Source.Options.Columns)
}

// set as string so that StreamProcessor parses it
Expand All @@ -321,41 +329,63 @@ func (t *TaskExecution) sourceOptionsMap() (options map[string]any) {
var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")

// apply column casing
func applyColumnCasing(df *iop.Dataflow, connType dbio.Type, casing *ColumnCasing) {
func applyColumnCasingToDf(df *iop.Dataflow, connType dbio.Type, casing *ColumnCasing) {

if casing == nil || *casing == SourceColumnCasing {
return
}

applyCase := func(name string, toSnake bool) string {
// convert to snake case
if toSnake {
name = matchAllCap.ReplaceAllString(name, "${1}_${2}")
}

// clean up other weird chars
name = iop.CleanName(name)

// lower case for target file system
if connType.DBNameUpperCase() {
return strings.ToUpper(name)
}
return strings.ToLower(name)
}

// convert to target system casing
for i := range df.Columns {
df.Columns[i].Name = applyCase(df.Columns[i].Name, *casing == SnakeColumnCasing)
df.Columns[i].Name = applyColumnCasing(df.Columns[i].Name, *casing == SnakeColumnCasing, connType)
}

// propagate names
for _, ds := range df.Streams {
for i := range ds.Columns {
ds.Columns[i].Name = applyCase(ds.Columns[i].Name, *casing == SnakeColumnCasing)
ds.Columns[i].Name = applyColumnCasing(ds.Columns[i].Name, *casing == SnakeColumnCasing, connType)
}

for i := range ds.CurrentBatch.Columns {
ds.CurrentBatch.Columns[i].Name = applyCase(ds.CurrentBatch.Columns[i].Name, *casing == SnakeColumnCasing)
ds.CurrentBatch.Columns[i].Name = applyColumnCasing(ds.CurrentBatch.Columns[i].Name, *casing == SnakeColumnCasing, connType)
}
}
}

func applyColumnCasing(name string, toSnake bool, connType dbio.Type) string {
// convert to snake case
if toSnake {
name = matchAllCap.ReplaceAllString(name, "${1}_${2}")
}

// clean up other weird chars
name = iop.CleanName(name)

// lower case for target file system
if connType.DBNameUpperCase() {
return strings.ToUpper(name)
}
return strings.ToLower(name)
}

func ErrorHelper(err error) (helpString string) {
if err != nil {
errString := strings.ToLower(err.Error())
E, ok := err.(*g.ErrType)
if ok && E.Debug() != "" {
errString = strings.ToLower(E.Full())
}

switch {
case strings.Contains(errString, "utf8") || strings.Contains(errString, "ascii"):
helpString = "Perhaps the 'transforms' source option could help with encodings? See https://docs.slingdata.io/sling-cli/running-tasks#advanced-options"
case strings.Contains(errString, "failed to verify certificate"):
helpString = "Perhaps specifying `encrypt=true` and `TrustServerCertificate=true` properties could help? See https://docs.slingdata.io/connections/database-connections/sqlserver"
case strings.Contains(errString, "ssl is not enabled on the server"):
helpString = "Perhaps setting the 'sslmode' option could help? See https://docs.slingdata.io/connections/database-connections/postgres"
case strings.Contains(errString, "invalid input syntax for type") || (strings.Contains(errString, " value ") && strings.Contains(errString, "is not recognized")):
helpString = "Perhaps setting a higher 'SAMPLE_SIZE' environment variable could help? This represents the number of records to process in order to infer column types (especially for file sources). The default is 900. Try 2000 or even higher.\nYou can also manually specify the column types with the `columns` source option. See https://docs.slingdata.io/sling-cli/running-tasks#advanced-options"
}
}
return
}
9 changes: 7 additions & 2 deletions core/sling/task_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,14 @@ func getIncrementalValue(cfg *Config, tgtConn database.Connection, srcConnVarMap
return
}

tgtUpdateKey := cfg.Source.UpdateKey
if cc := cfg.Target.Options.ColumnCasing; cc != nil && *cc != SourceColumnCasing {
tgtUpdateKey = applyColumnCasing(tgtUpdateKey, *cc == SnakeColumnCasing, tgtConn.GetType())
}

sql := g.F(
"select max(%s) as max_val from %s",
tgtConn.Quote(cfg.Source.UpdateKey),
tgtConn.Quote(tgtUpdateKey),
table.FDQN(),
)

Expand All @@ -197,7 +202,7 @@ func getIncrementalValue(cfg *Config, tgtConn database.Connection, srcConnVarMap
// set val to blank for full load
return "", nil
}
err = g.Error(err, "could not get max value for "+cfg.Source.UpdateKey)
err = g.Error(err, "could not get max value for "+tgtUpdateKey)
return
}
if len(data.Rows) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (t *TaskExecution) Execute() error {
}

g.DebugLow("type is %s", t.Type)
g.Debug("using source options: %s", g.Marshal(t.Config.Source.Options))
g.Debug("using target options: %s", g.Marshal(t.Config.Target.Options))

switch t.Type {
case DbSQL:
t.Err = t.runDbSQL()
Expand Down
4 changes: 2 additions & 2 deletions core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df

incrementalWhereCond = g.R(
"{update_key} {gt} {value}",
"update_key", cfg.Source.UpdateKey,
"update_key", srcConn.Quote(cfg.Source.UpdateKey),
"value", cfg.IncrementalVal,
"gt", greaterThan,
)
Expand All @@ -92,7 +92,7 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
sTable.SQL = g.R(
sTable.SQL,
"incremental_where_cond", incrementalWhereCond,
"update_key", cfg.Source.UpdateKey,
"update_key", srcConn.Quote(cfg.Source.UpdateKey),
"incremental_value", cfg.IncrementalVal,
)
}
Expand Down
6 changes: 3 additions & 3 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64,
}

// apply column casing
applyColumnCasing(df, fs.FsType(), t.Config.Target.Options.ColumnCasing)
applyColumnCasingToDf(df, fs.FsType(), t.Config.Target.Options.ColumnCasing)

bw, err = fs.WriteDataflow(df, cfg.TgtConn.URL())
if err != nil {
Expand All @@ -52,7 +52,7 @@ func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64,
cnt = df.Count()
} else if cfg.Options.StdOut {
// apply column casing
applyColumnCasing(df, dbio.TypeFileLocal, t.Config.Target.Options.ColumnCasing)
applyColumnCasingToDf(df, dbio.TypeFileLocal, t.Config.Target.Options.ColumnCasing)

options := map[string]string{"delimiter": ","}
g.Unmarshal(g.Marshal(cfg.Target.Options), &options)
Expand Down Expand Up @@ -148,7 +148,7 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
}

// apply column casing
applyColumnCasing(df, tgtConn.GetType(), t.Config.Target.Options.ColumnCasing)
applyColumnCasingToDf(df, tgtConn.GetType(), t.Config.Target.Options.ColumnCasing)

sampleData := iop.NewDataset(df.Columns)
sampleData.Rows = df.Buffer
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/flarco/dbio v0.4.27
github.com/flarco/dbio v0.4.29
github.com/flarco/g v0.1.64
github.com/getsentry/sentry-go v0.11.0
github.com/google/uuid v1.3.0
Expand Down

0 comments on commit 70da2cd

Please sign in to comment.