Skip to content

Commit

Permalink
v1.0.70 (#120)
Browse files Browse the repository at this point in the history
* Fix file partitioning notation (*), update clickhouse array type mapping

* add Replace0x00

* improve column length logic

* improve error helper

* add ReplaceNonPrint

* update clickhouse type mappings, handle array/maps

* handle dots in column names

* add TableDDL check

* typo

* clean up

* add incremental custom sql limit

* improve castVal for decimals

* fix CastVal for int casting

* improve clickhouse decimals

* improve clickhouse decimal casting
  • Loading branch information
flarco authored Jan 19, 2024
1 parent 5af9f29 commit b0b85f5
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 15 deletions.
4 changes: 2 additions & 2 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func Test1Replication(t *testing.T) {
os.Setenv("SLING_CLI", "TRUE")
os.Setenv("SLING_LOADED_AT_COLUMN", "TRUE")
os.Setenv("CONCURENCY_LIMIT", "2")
replicationCfgPath := "/Users/fritz/Downloads/mlops.slack.replication.local.yml"
replicationCfgPath := g.UserHomeDir() + "/Downloads/mlops.slack.replication.local.yml"
err := runReplication(replicationCfgPath)
if g.AssertNoError(t, err) {
return
Expand All @@ -624,6 +624,6 @@ func TestExtract(t *testing.T) {

printUpdateAvailable()

err := ExtractTarGz("/Users/fritz/Downloads/sling/sling_1.0.44_darwin_all.tar.gz", "/Users/fritz/Downloads/sling")
err := ExtractTarGz(g.UserHomeDir()+"/Downloads/sling/sling_1.0.44_darwin_all.tar.gz", g.UserHomeDir()+"/Downloads/sling")
g.AssertNoError(t, err)
}
42 changes: 41 additions & 1 deletion core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,44 @@ func (t *TaskExecution) sourceOptionsMap() (options map[string]any) {
g.Warn("Config.Source.Options.Columns not handled: %T", t.Config.Source.Options.Columns)
}

// parse lenght, precision, scale
regexExtract := `[a-zA-Z]+ *\( *(\d+) *(, *\d+)* *\)`
for i, col := range columns {
colType := strings.TrimSpace(string(col.Type))
if !strings.HasSuffix(colType, ")") {
continue
}

// fix type value
parts := strings.Split(colType, "(")
col.Type = iop.ColumnType(strings.TrimSpace(parts[0]))

matches := g.Matches(colType, regexExtract)
if len(matches) == 1 {
vals := matches[0].Group

if len(vals) > 0 {
vals[0] = strings.TrimSpace(vals[0])
// grab length or precision
if col.Type.IsString() {
col.Stats.MaxLen = cast.ToInt(vals[0])
} else if col.Type.IsNumber() {
col.DbPrecision = cast.ToInt(vals[0])
}
}

if len(vals) > 1 {
vals[1] = strings.TrimSpace(strings.ReplaceAll(vals[1], ",", ""))
// grab scale
if col.Type.IsNumber() {
col.DbScale = cast.ToInt(vals[1])
}
}

columns[i] = col
}
}

// set as string so that StreamProcessor parses it
options["columns"] = g.Marshal(iop.NewColumns(columns...))
}
Expand Down Expand Up @@ -414,10 +452,12 @@ func ErrorHelper(err error) (helpString string) {
helpString = "Perhaps specifying `encrypt=true` and `TrustServerCertificate=true` properties could help? See https://docs.slingdata.io/connections/database-connections/sqlserver"
case strings.Contains(errString, "ssl is not enabled on the server"):
helpString = "Perhaps setting the 'sslmode' option could help? See https://docs.slingdata.io/connections/database-connections/postgres"
case strings.Contains(errString, "invalid input syntax for type") || (strings.Contains(errString, " value ") && strings.Contains(errString, "is not recognized")) || strings.Contains(errString, "invalid character value"):
case strings.Contains(errString, "invalid input syntax for type") || (strings.Contains(errString, " value ") && strings.Contains(errString, "is not recognized")) || strings.Contains(errString, "invalid character value") && strings.Contains(errString, " exceeds "):
helpString = "Perhaps setting a higher 'SAMPLE_SIZE' environment variable could help? This represents the number of records to process in order to infer column types (especially for file sources). The default is 900. Try 2000 or even higher.\nYou can also manually specify the column types with the `columns` source option. See https://docs.slingdata.io/sling-cli/run/configuration#source"
case strings.Contains(errString, "bcp import"):
helpString = "If facing issues with Microsoft's BCP, try disabling Bulk Loading with `use_bulk=false`. See https://docs.slingdata.io/sling-cli/run/configuration#target"
case strings.Contains(errString, "[AppendRow]: converting"):
helpString = "Perhaps using the `adjust_column_type: true` target option could help? See https://docs.slingdata.io/sling-cli/run/configuration#target"
}
}
return
Expand Down
12 changes: 12 additions & 0 deletions core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
"update_key", srcConn.Quote(cfg.Source.UpdateKey, false),
"incremental_value", cfg.IncrementalVal,
)

if cfg.Source.Limit() > 0 {
sTable.SQL = g.R(
srcConn.Template().Core["limit"],
"sql", sTable.SQL,
"limit", cast.ToString(cfg.Source.Limit()),
)
}
}
} else if cfg.Source.Limit() > 0 && sTable.SQL == "" {
sTable.SQL = g.R(
Expand Down Expand Up @@ -163,6 +171,8 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
return t.df, err
}

g.Trace("%#v", df.Columns.Types())

return
}

Expand Down Expand Up @@ -241,5 +251,7 @@ func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
return df, g.Error("Could not read columns")
}

g.Trace("%#v", df.Columns.Types())

return
}
10 changes: 8 additions & 2 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
df.Columns = sampleData.Columns
}

// check table ddl
if cfg.Target.Options.TableDDL != "" && !strings.Contains(cfg.Target.Options.TableDDL, targetTable) {
err = g.Error("The Table DDL provided needs to contains the exact object table name: %s", targetTable)
return
}

_, err = createTableIfNotExists(
tgtConn,
sampleData,
Expand Down Expand Up @@ -248,9 +254,9 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
if err != nil {
tgtConn.Rollback()
if cast.ToBool(os.Getenv("SLING_CLI")) && cfg.sourceIsFile() {
err = g.Error(err, "could not insert into %s.", targetTable)
err = g.Error(err, "could not insert into %s.", cfg.Target.Options.TableTmp)
} else {
err = g.Error(err, "could not insert into "+targetTable)
err = g.Error(err, "could not insert into "+cfg.Target.Options.TableTmp)
}
return
}
Expand Down
31 changes: 22 additions & 9 deletions core/sling/tranforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sling
import (
"fmt"
"strings"
"unicode"

"github.com/flarco/dbio/iop"
"github.com/flarco/g"
Expand All @@ -18,15 +19,17 @@ var decWindows1250 = charmap.Windows1250.NewDecoder()
var decWindows1252 = charmap.Windows1252.NewDecoder()

var transforms = map[string]iop.TransformFunc{
"replace_accents": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.ReplaceAccents(sp, val) },
"trim_space": func(sp *iop.StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil },
"parse_uuid": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseUUID(sp, val) },
"parse_bit": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseBit(sp, val) },
"decode_latin1": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_1, val) },
"decode_latin5": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_5, val) },
"decode_latin9": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_15, val) },
"decode_windows1250": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1250, val) },
"decode_windows1252": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1252, val) },
"replace_accents": func(sp *iop.StreamProcessor, val string) (string, error) { return iop.ReplaceAccents(sp, val) },
"replace_0x00": func(sp *iop.StreamProcessor, val string) (string, error) { return Replace0x00(sp, val) },
"replace_non_printable": func(sp *iop.StreamProcessor, val string) (string, error) { return ReplaceNonPrint(sp, val) },
"trim_space": func(sp *iop.StreamProcessor, val string) (string, error) { return strings.TrimSpace(val), nil },
"parse_uuid": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseUUID(sp, val) },
"parse_bit": func(sp *iop.StreamProcessor, val string) (string, error) { return ParseBit(sp, val) },
"decode_latin1": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_1, val) },
"decode_latin5": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_5, val) },
"decode_latin9": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decISO8859_15, val) },
"decode_windows1250": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1250, val) },
"decode_windows1252": func(sp *iop.StreamProcessor, val string) (string, error) { return Decode(sp, decWindows1252, val) },
}

func init() {
Expand Down Expand Up @@ -61,3 +64,13 @@ func ParseBit(sp *iop.StreamProcessor, val string) (string, error) {
}
return val, nil
}

func Replace0x00(sp *iop.StreamProcessor, val string) (string, error) {
return strings.ReplaceAll(val, "\x00", ""), nil // replace the NUL character
}

func ReplaceNonPrint(sp *iop.StreamProcessor, val string) (string, error) {
return strings.TrimFunc(val, func(r rune) bool {
return !unicode.IsGraphic(r)
}), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/fatih/color v1.13.0
github.com/flarco/dbio v0.4.71
github.com/flarco/dbio v0.4.79
github.com/flarco/g v0.1.67
github.com/getsentry/sentry-go v0.11.0
github.com/google/uuid v1.5.0
Expand Down

0 comments on commit b0b85f5

Please sign in to comment.