Skip to content

Commit

Permalink
v1.0.62 (#73)
Browse files Browse the repository at this point in the history
* improve getIncrementalValue

* improve getIncrementalValue

* improve update-key column case detection

* allow BigQuery Default Credentials

* improve CompareChecksums
  • Loading branch information
flarco authored Nov 30, 2023
1 parent 70da2cd commit 6ec355d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
2 changes: 2 additions & 0 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ type Source struct {
UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"`
Options *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"`
Data map[string]interface{} `json:"data,omitempty" yaml:"data,omitempty"`

columns iop.Columns `json:"-" yaml:"-"`
}

func (s *Source) Limit() int {
Expand Down
25 changes: 22 additions & 3 deletions core/sling/task_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ func createTableIfNotExists(conn database.Connection, data iop.Dataset, tableNam
return true, nil
}

func pullSourceTableColumns(cfg *Config, srcConn database.Connection, table string) (cols iop.Columns, err error) {
cfg.Source.columns, err = srcConn.GetColumns(table)
if err != nil {
err = g.Error(err, "could not get column list for "+table)
return
}
return cfg.Source.columns, nil
}

func pullTargetTableColumns(cfg *Config, tgtConn database.Connection, force bool) (cols iop.Columns, err error) {
if len(cfg.Target.columns) == 0 || force {
cfg.Target.columns, err = tgtConn.GetColumns(cfg.Target.Object)
Expand Down Expand Up @@ -188,16 +197,26 @@ func getIncrementalValue(cfg *Config, tgtConn database.Connection, srcConnVarMap
tgtUpdateKey = applyColumnCasing(tgtUpdateKey, *cc == SnakeColumnCasing, tgtConn.GetType())
}

// get target columns to match update-key
// in case column casing needs adjustment
targetCols, _ := pullTargetTableColumns(cfg, tgtConn, false)
if updateCol := targetCols.GetColumn(tgtUpdateKey); updateCol.Name != "" {
tgtUpdateKey = updateCol.Name // overwrite with correct casing
}

sql := g.F(
"select max(%s) as max_val from %s",
tgtConn.Quote(tgtUpdateKey),
tgtConn.Quote(tgtUpdateKey, false),
table.FDQN(),
)

data, err := tgtConn.Query(sql)
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "exist") ||
strings.Contains(strings.ToLower(err.Error()), "not found") {
errMsg := strings.ToLower(err.Error())
if strings.Contains(errMsg, "exist") ||
strings.Contains(errMsg, "not found") ||
strings.Contains(errMsg, "no such table") ||
strings.Contains(errMsg, "invalid object") {
// table does not exists, will be create later
// set val to blank for full load
return "", nil
Expand Down
11 changes: 9 additions & 2 deletions core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
// default true value
incrementalWhereCond := "1=1"

// get source columns to match update-key
// in case column casing needs adjustment
sourceCols, _ := pullSourceTableColumns(t.Config, srcConn, sTable.FullName())
if updateCol := sourceCols.GetColumn(cfg.Source.UpdateKey); updateCol.Name != "" {
cfg.Source.UpdateKey = updateCol.Name // overwrite with correct casing
}

// select only records that have been modified after last max value
if cfg.IncrementalVal != "" {
// if primary key is defined, use greater than or equal
Expand All @@ -66,7 +73,7 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df

incrementalWhereCond = g.R(
"{update_key} {gt} {value}",
"update_key", srcConn.Quote(cfg.Source.UpdateKey),
"update_key", srcConn.Quote(cfg.Source.UpdateKey, false),
"value", cfg.IncrementalVal,
"gt", greaterThan,
)
Expand All @@ -92,7 +99,7 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
sTable.SQL = g.R(
sTable.SQL,
"incremental_where_cond", incrementalWhereCond,
"update_key", srcConn.Quote(cfg.Source.UpdateKey),
"update_key", srcConn.Quote(cfg.Source.UpdateKey, false),
"incremental_value", cfg.IncrementalVal,
)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/c-bata/go-prompt v0.2.6
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/flarco/dbio v0.4.29
github.com/flarco/dbio v0.4.32
github.com/flarco/g v0.1.64
github.com/getsentry/sentry-go v0.11.0
github.com/google/uuid v1.3.0
Expand Down

0 comments on commit 6ec355d

Please sign in to comment.