Skip to content

Commit

Permalink
v1.0.65 (#90)
Browse files Browse the repository at this point in the history
* update README

* update test replications

* clean up

* connection data pointer fix

* createSchemaIfNotExists fix

* some formatting

* decimal precision/scale improvement

* improve precision logic

* improve max_decimals logic

* set max_decimal

* update max_decimals

* improve scale/precision

* fix ddlMaxDecScale

* fix MySQL col.Stats.MaxDecLen

* fix precision/scale sqlserver

* update snowflake decimal type logic

* improve CastVal
  • Loading branch information
flarco authored Dec 16, 2023
1 parent a0f2ddc commit e5cb5a5
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 19 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ scoop install sling
sling -h
```

#### Binary on Linux

```powershell
curl -LO 'https://github.com/slingdata-io/sling-cli/releases/latest/download/sling_linux_amd64.tar.gz' \
&& tar xf sling_linux_amd64.tar.gz \
&& rm -f sling_linux_amd64.tar.gz \
&& chmod +x sling
# You're good to go!
sling -h
```

### Compiling From Source

#### Linux or Mac
Expand Down
8 changes: 4 additions & 4 deletions cmd/sling/sling_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {
}

if len(selectStreams) > 0 && !g.IsMatched(selectStreams, name) {
g.Debug("skipping stream `%s` since it is not selected", name)
g.Debug("skipping stream %s since it is not selected", name)
continue
}
counter++
Expand Down Expand Up @@ -347,16 +347,16 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {
println()

if stream.Disabled {
g.Info("[%d / %d] skipping stream `%s` since it is disabled", counter, streamCnt, name)
g.Info("[%d / %d] skipping stream %s since it is disabled", counter, streamCnt, name)
continue
} else {
g.Info("[%d / %d] running stream `%s`", counter, streamCnt, name)
g.Info("[%d / %d] running stream %s", counter, streamCnt, name)
}

telemetryMap["run_mode"] = "replication"
err = runTask(&cfg, &replication)
if err != nil {
err = g.Error(err, "error for stream `%s`", name)
err = g.Error(err, "error for stream %s", name)
g.LogError(err)
eG.Capture(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/sling/sling_prompt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func completer(in prompt.Document) []prompt.Suggest {
localSuggestions = append(localSuggestions, prompt.Suggest{Text: conn.Name, Description: conn.Description})
}
return prompt.FilterHasPrefix(localSuggestions, w, true)
case g.In(prevWord, cast.ToSlice(stringFlags)...):
case g.In(prevWord, stringFlags...):
return []prompt.Suggest{}
}

Expand Down
8 changes: 6 additions & 2 deletions cmd/sling/tests/replications/r.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ target: aws_s3

defaults:
mode: full-refresh
object: 'test1k/{YEAR}'
object: 'test/{stream_schema}.{stream_table}'
target_options:
file_max_rows: 1000000 # multiple files
compression: gzip
Expand All @@ -13,7 +13,11 @@ streams:
sql: |
select * from public.test1k
where date_trunc('year', create_date) = '{YEAR}-01-01'
object: 'test1k/{YEAR}'

public.accounts:
public.plans:

env:
# pass in env vars, e.g. YEAR=2005
YEAR: $YEAR
14 changes: 14 additions & 0 deletions cmd/sling/tests/replications/r.12.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
source: postgres
target: snowflake

defaults:
mode: incremental
object: 'public.{stream_schema}_{stream_table}'
target_options:
add_new_columns: true
datetime_format: 2006-01-02 15:04:05.000000 -07

streams:
public.test1k:
primary_key: [id]
update_key: create_date
20 changes: 15 additions & 5 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ func (cfg *Config) SetDefault() {
cfg.Target.Options.AdjustColumnType = g.Bool(false)
}

// set max_decimals
switch cfg.TgtConn.Type {
case dbio.TypeDbBigQuery, dbio.TypeDbBigTable:
cfg.Source.Options.MaxDecimals = g.Int(9)
cfg.Target.Options.MaxDecimals = g.Int(9)
case dbio.TypeDbClickhouse:
cfg.Source.Options.MaxDecimals = g.Int(11)
cfg.Target.Options.MaxDecimals = g.Int(11)
}

// set vars
for k, v := range cfg.Env {
os.Setenv(k, v)
Expand Down Expand Up @@ -336,7 +346,7 @@ func (cfg *Config) Prepare() (err error) {
if cfg.Target.Data == nil || len(cfg.Target.Data) == 0 {
cfg.Target.Data = g.M()
if c, ok := connsMap[strings.ToLower(cfg.Target.Conn)]; ok {
cfg.TgtConn = c.Connection
cfg.TgtConn = *c.Connection.Copy()
} else if !strings.Contains(cfg.Target.Conn, "://") && cfg.Target.Conn != "" && cfg.TgtConn.Data == nil {
return g.Error("could not find connection %s", cfg.Target.Conn)
} else if cfg.TgtConn.Data == nil {
Expand Down Expand Up @@ -385,7 +395,7 @@ func (cfg *Config) Prepare() (err error) {
if cfg.Source.Data == nil || len(cfg.Source.Data) == 0 {
cfg.Source.Data = g.M()
if c, ok := connsMap[strings.ToLower(cfg.Source.Conn)]; ok {
cfg.SrcConn = c.Connection
cfg.SrcConn = *c.Connection.Copy()
} else if !strings.Contains(cfg.Source.Conn, "://") && cfg.Source.Conn != "" && cfg.SrcConn.Data == nil {
return g.Error("could not find connection %s", cfg.Source.Conn)
} else if cfg.SrcConn.Data == nil {
Expand Down Expand Up @@ -770,21 +780,21 @@ var SourceFileOptionsDefault = SourceOptions{
DatetimeFormat: "AUTO",
SkipBlankLines: g.Bool(false),
Delimiter: ",",
MaxDecimals: g.Int(9),
MaxDecimals: g.Int(-1),
}

var SourceDBOptionsDefault = SourceOptions{
EmptyAsNull: g.Bool(true),
NullIf: g.String("NULL"),
DatetimeFormat: "AUTO",
MaxDecimals: g.Int(9),
MaxDecimals: g.Int(-1),
}

var SourceAPIOptionsDefault = SourceOptions{
EmptyAsNull: g.Bool(true),
NullIf: g.String("NULL"),
DatetimeFormat: "AUTO",
MaxDecimals: g.Int(9),
MaxDecimals: g.Int(-1),
}

var TargetFileOptionsDefault = TargetOptions{
Expand Down
8 changes: 4 additions & 4 deletions core/sling/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type TaskExecution struct {
prevByteCount uint64
lastIncrement time.Time // the time of last row increment (to determine stalling)

Replication *ReplicationConfig
ProgressHist []string `json:"progress_hist"`
PBar *ProgressBar `json:"-"`
ProcStatsStart g.ProcStats `json:"-"` // process stats at beginning
Replication *ReplicationConfig `json:"replication"`
ProgressHist []string `json:"progress_hist"`
PBar *ProgressBar `json:"-"`
ProcStatsStart g.ProcStats `json:"-"` // process stats at beginning
cleanupFuncs []func()
}

Expand Down
2 changes: 1 addition & 1 deletion core/sling/task_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func createSchemaIfNotExists(conn database.Connection, schemaName string) (creat
schemas = lo.Map(schemas, func(v string, i int) string { return strings.ToLower(v) })
schemaName = strings.ToLower(schemaName)
if schemaName == "" {
schemaName = conn.GetProp("schema")
schemaName = strings.ToLower(conn.GetProp("schema"))
if schemaName == "" {
return false, g.Error("did not specify schema. Please specify schema in object name.")
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/fatih/color v1.13.0
github.com/flarco/dbio v0.4.40
github.com/flarco/g v0.1.66
github.com/flarco/dbio v0.4.50
github.com/flarco/g v0.1.67
github.com/getsentry/sentry-go v0.11.0
github.com/google/uuid v1.3.0
github.com/integrii/flaggy v1.5.2
Expand Down

0 comments on commit e5cb5a5

Please sign in to comment.