From b0b85f55544aae3707cece30ccc29dbd8d558aa0 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Fri, 19 Jan 2024 07:34:37 -0300 Subject: [PATCH] v1.0.70 (#120) * Fix file partitioning notation (*), update clickhouse array type mapping * add Replace0x00 * improve column length logic * improve error helper * add ReplaceNonPrint * update clickhouse type mappings, handle array/maps * handle dots in column names * add TableDDL check * typo * clean up * add incremental custom sql limit * improve castVal for decimals * fix CastVal for int casting * improve clickhouse decimals * improve clickhouse decimal casting --- cmd/sling/sling_test.go | 4 ++-- core/sling/task.go | 42 +++++++++++++++++++++++++++++++++++- core/sling/task_run_read.go | 12 +++++++++++ core/sling/task_run_write.go | 10 +++++++-- core/sling/tranforms.go | 31 ++++++++++++++++++-------- go.mod | 2 +- 6 files changed, 86 insertions(+), 15 deletions(-) diff --git a/cmd/sling/sling_test.go b/cmd/sling/sling_test.go index 90b41ff2..c5b7580d 100755 --- a/cmd/sling/sling_test.go +++ b/cmd/sling/sling_test.go @@ -609,7 +609,7 @@ func Test1Replication(t *testing.T) { os.Setenv("SLING_CLI", "TRUE") os.Setenv("SLING_LOADED_AT_COLUMN", "TRUE") os.Setenv("CONCURENCY_LIMIT", "2") - replicationCfgPath := "/Users/fritz/Downloads/mlops.slack.replication.local.yml" + replicationCfgPath := g.UserHomeDir() + "/Downloads/mlops.slack.replication.local.yml" err := runReplication(replicationCfgPath) if g.AssertNoError(t, err) { return @@ -624,6 +624,6 @@ func TestExtract(t *testing.T) { printUpdateAvailable() - err := ExtractTarGz("/Users/fritz/Downloads/sling/sling_1.0.44_darwin_all.tar.gz", "/Users/fritz/Downloads/sling") + err := ExtractTarGz(g.UserHomeDir()+"/Downloads/sling/sling_1.0.44_darwin_all.tar.gz", g.UserHomeDir()+"/Downloads/sling") g.AssertNoError(t, err) } diff --git a/core/sling/task.go b/core/sling/task.go index a1aae3b5..5e6e0fb2 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -347,6 +347,44 @@ func (t *TaskExecution) sourceOptionsMap() (options map[string]any) { g.Warn("Config.Source.Options.Columns not handled: %T", t.Config.Source.Options.Columns) } + // parse lenght, precision, scale + regexExtract := `[a-zA-Z]+ *\( *(\d+) *(, *\d+)* *\)` + for i, col := range columns { + colType := strings.TrimSpace(string(col.Type)) + if !strings.HasSuffix(colType, ")") { + continue + } + + // fix type value + parts := strings.Split(colType, "(") + col.Type = iop.ColumnType(strings.TrimSpace(parts[0])) + + matches := g.Matches(colType, regexExtract) + if len(matches) == 1 { + vals := matches[0].Group + + if len(vals) > 0 { + vals[0] = strings.TrimSpace(vals[0]) + // grab length or precision + if col.Type.IsString() { + col.Stats.MaxLen = cast.ToInt(vals[0]) + } else if col.Type.IsNumber() { + col.DbPrecision = cast.ToInt(vals[0]) + } + } + + if len(vals) > 1 { + vals[1] = strings.TrimSpace(strings.ReplaceAll(vals[1], ",", "")) + // grab scale + if col.Type.IsNumber() { + col.DbScale = cast.ToInt(vals[1]) + } + } + + columns[i] = col + } + } + // set as string so that StreamProcessor parses it options["columns"] = g.Marshal(iop.NewColumns(columns...)) } @@ -414,10 +452,12 @@ func ErrorHelper(err error) (helpString string) { helpString = "Perhaps specifying `encrypt=true` and `TrustServerCertificate=true` properties could help? See https://docs.slingdata.io/connections/database-connections/sqlserver" case strings.Contains(errString, "ssl is not enabled on the server"): helpString = "Perhaps setting the 'sslmode' option could help? See https://docs.slingdata.io/connections/database-connections/postgres" - case strings.Contains(errString, "invalid input syntax for type") || (strings.Contains(errString, " value ") && strings.Contains(errString, "is not recognized")) || strings.Contains(errString, "invalid character value"): + case strings.Contains(errString, "invalid input syntax for type") || (strings.Contains(errString, " value ") && strings.Contains(errString, "is not recognized")) || strings.Contains(errString, "invalid character value") && strings.Contains(errString, " exceeds "): helpString = "Perhaps setting a higher 'SAMPLE_SIZE' environment variable could help? This represents the number of records to process in order to infer column types (especially for file sources). The default is 900. Try 2000 or even higher.\nYou can also manually specify the column types with the `columns` source option. See https://docs.slingdata.io/sling-cli/run/configuration#source" case strings.Contains(errString, "bcp import"): helpString = "If facing issues with Microsoft's BCP, try disabling Bulk Loading with `use_bulk=false`. See https://docs.slingdata.io/sling-cli/run/configuration#target" + case strings.Contains(errString, "[AppendRow]: converting"): + helpString = "Perhaps using the `adjust_column_type: true` target option could help? See https://docs.slingdata.io/sling-cli/run/configuration#target" } } return diff --git a/core/sling/task_run_read.go b/core/sling/task_run_read.go index cbac81db..b80937d1 100644 --- a/core/sling/task_run_read.go +++ b/core/sling/task_run_read.go @@ -134,6 +134,14 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df "update_key", srcConn.Quote(cfg.Source.UpdateKey, false), "incremental_value", cfg.IncrementalVal, ) + + if cfg.Source.Limit() > 0 { + sTable.SQL = g.R( + srcConn.Template().Core["limit"], + "sql", sTable.SQL, + "limit", cast.ToString(cfg.Source.Limit()), + ) + } } } else if cfg.Source.Limit() > 0 && sTable.SQL == "" { sTable.SQL = g.R( @@ -163,6 +171,8 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df return t.df, err } + g.Trace("%#v", df.Columns.Types()) + return } @@ -241,5 +251,7 @@ func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error) return df, g.Error("Could not read columns") } + g.Trace("%#v", df.Columns.Types()) + return } diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index 07fafadf..1ef4dd3b 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -156,6 +156,12 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas df.Columns = sampleData.Columns } + // check table ddl + if cfg.Target.Options.TableDDL != "" && !strings.Contains(cfg.Target.Options.TableDDL, targetTable) { + err = g.Error("The Table DDL provided needs to contains the exact object table name: %s", targetTable) + return + } + _, err = createTableIfNotExists( tgtConn, sampleData, @@ -248,9 +254,9 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas if err != nil { tgtConn.Rollback() if cast.ToBool(os.Getenv("SLING_CLI")) && cfg.sourceIsFile() { - err = g.Error(err, "could not insert into %s.", targetTable) + err = g.Error(err, "could not insert into %s.", cfg.Target.Options.TableTmp) } else { - err = g.Error(err, "could not insert into "+targetTable) + err = g.Error(err, "could not insert into "+cfg.Target.Options.TableTmp) } return } diff --git a/core/sling/tranforms.go b/core/sling/tranforms.go index ff125740..494d8998 100644 --- a/core/sling/tranforms.go +++ b/core/sling/tranforms.go @@ -3,6 +3,7 @@ package sling import ( "fmt" "strings" + "unicode" "github.com/flarco/dbio/iop" "github.com/flarco/g" @@ -18,15 +19,17 @@ var decWindows1250 = charmap.Windows1250.NewDecoder() var decWindows1252 = charmap.Windows1252.NewDecoder() var transforms = map[string]iop.TransformFunc{ - "replace_accents": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.ReplaceAccents(sp, val) }, - "trim_space": func(sp *iop.StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil }, - "parse_uuid": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseUUID(sp, val) }, - "parse_bit": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseBit(sp, val) }, - "decode_latin1": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_1, val) }, - "decode_latin5": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_5, val) }, - "decode_latin9": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_15, val) }, - "decode_windows1250": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1250, val) }, - "decode_windows1252": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1252, val) }, + "replace_accents": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.ReplaceAccents(sp, val) }, + "replace_0x00": func(sp *iop.StreamProcessor, val string) (string, error) { return Replace0x00(sp, val) }, + "replace_non_printable": func(sp *iop.StreamProcessor, val string) (string, error) { return ReplaceNonPrint(sp, val) }, + "trim_space": func(sp *iop.StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil }, + "parse_uuid": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseUUID(sp, val) }, + "parse_bit": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseBit(sp, val) }, + "decode_latin1": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_1, val) }, + "decode_latin5": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_5, val) }, + "decode_latin9": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_15, val) }, + "decode_windows1250": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1250, val) }, + "decode_windows1252": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1252, val) }, } func init() { @@ -61,3 +64,13 @@ func ParseBit(sp *iop.StreamProcessor, val string) (string, error) { } return val, nil } + +func Replace0x00(sp *iop.StreamProcessor, val string) (string, error) { + return strings.ReplaceAll(val, "\x00", ""), nil // replace the NUL character +} + +func ReplaceNonPrint(sp *iop.StreamProcessor, val string) (string, error) { + return strings.TrimFunc(val, func(r rune) bool { + return !unicode.IsGraphic(r) + }), nil +} diff --git a/go.mod b/go.mod index 375c90aa..23a0d07b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/denisbrodbeck/machineid v1.0.1 github.com/dustin/go-humanize v1.0.1 github.com/fatih/color v1.13.0 - github.com/flarco/dbio v0.4.71 + github.com/flarco/dbio v0.4.79 github.com/flarco/g v0.1.67 github.com/getsentry/sentry-go v0.11.0 github.com/google/uuid v1.5.0