Skip to content

Commit

Permalink
V1.0.63 (#77)
Browse files Browse the repository at this point in the history
* use FileTypeNone as default

* add avro reading

* update dbio

* improve replication HasStream

* clean up docs linking

* add r.11 test

* add  windows path fix

* rename source.columns to source.select to avoid confusion

* update test tasks

* update test names

* add sas7bdat file format

* add store history keeping

* improve store reliability

* update github.com/flarco/dbio v0.4.38

* improve store
  • Loading branch information
flarco authored Dec 4, 2023
1 parent 6ec355d commit 27fc1f3
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 38 deletions.
8 changes: 4 additions & 4 deletions cmd/sling/sling.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ var sentryOptions = sentry.ClientOptions{

var cliRun = &g.CliSC{
Name: "run",
Description: "Execute an ad-hoc task",
Description: "Execute a run",
AdditionalHelpPrepend: "\nSee more examples and configuration details at https://docs.slingdata.io/sling-cli/",
Flags: []g.Flag{
{
Name: "config",
ShortName: "c",
Type: "string",
Description: "The config string or file to use (JSON or YAML).",
Description: "The task config string or file to use (JSON or YAML).",
},
{
Name: "replication",
ShortName: "r",
Type: "string",
Description: "The replication config file to use to run multiple tasks (YAML).\n",
Description: "The replication config file to use (JSON or YAML).\n",
},
{
Name: "src-conn",
Expand Down Expand Up @@ -308,7 +308,7 @@ var cliConns = &g.CliSC{
Name: "key=value properties...",
ShortName: "",
Type: "string",
Description: "The key=value properties to set. See https://docs.slingdata.io/sling-cli/environment#local-connections",
Description: "The key=value properties to set. See https://docs.slingdata.io/sling-cli/environment#set-connections",
},
},
},
Expand Down
42 changes: 38 additions & 4 deletions cmd/sling/sling_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"os"
"os/exec"
"path/filepath"
"runtime/debug"
"strings"
"time"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/flarco/dbio/connection"
"github.com/slingdata-io/sling-cli/core/env"
"github.com/slingdata-io/sling-cli/core/sling"
"github.com/slingdata-io/sling-cli/core/store"

"github.com/flarco/g"
"github.com/spf13/cast"
Expand All @@ -36,6 +39,9 @@ func init() {
if masterServerURL == "" {
masterServerURL = "https://api.slingdata.io"
}

// init sqlite
store.InitDB()
}

func processRun(c *g.CliSC) (ok bool, err error) {
Expand Down Expand Up @@ -170,15 +176,15 @@ func processRun(c *g.CliSC) (ok bool, err error) {

// run task
telemetryMap["run_mode"] = "task"
err = runTask(cfg)
err = runTask(cfg, nil)
if err != nil {
return ok, g.Error(err, "failure running task (see docs @ https://docs.slingdata.io/sling-cli)")
}

return ok, nil
}

func runTask(cfg *sling.Config) (err error) {
func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error) {
var task *sling.TaskExecution

// track usage
Expand Down Expand Up @@ -242,7 +248,16 @@ func runTask(cfg *sling.Config) (err error) {
return
}

// try to get project_id
setProjectID(cfg.Env["SLING_CONFIG_PATH"])
cfg.Env["SLING_PROJECT_ID"] = projectID

task = sling.NewTask(0, cfg)
task.Replication = replication

// insert into store for history keeping
sling.StoreInsert(task)

if task.Err != nil {
err = g.Error(task.Err)
return
Expand Down Expand Up @@ -308,7 +323,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {
Source: sling.Source{
Conn: replication.Source,
Stream: name,
Columns: stream.Columns,
Select: stream.Select,
PrimaryKeyI: stream.PrimaryKey(),
UpdateKey: stream.UpdateKey,
},
Expand Down Expand Up @@ -339,7 +354,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) {
}

telemetryMap["run_mode"] = "replication"
err = runTask(&cfg)
err = runTask(&cfg, &replication)
if err != nil {
err = g.Error(err, "error for stream `%s`", name)
g.LogError(err)
Expand Down Expand Up @@ -480,3 +495,22 @@ func parsePayload(payload string, validate bool) (options map[string]any, err er

return options, nil
}

// setProjectID attempts to get the first sha of the repo
func setProjectID(cfgPath string) {
if cfgPath == "" {
return
}

cfgPath, _ = filepath.Abs(cfgPath)

if fs, err := os.Stat(cfgPath); err == nil && !fs.IsDir() {
// get first sha
cmd := exec.Command("git", "rev-list", "--max-parents=0", "HEAD")
cmd.Dir = filepath.Dir(cfgPath)
out, err := cmd.Output()
if err == nil {
projectID = strings.TrimSpace(string(out))
}
}
}
4 changes: 2 additions & 2 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func TestCfgPath(t *testing.T) {
g.AssertNoError(t, err)
}

func testOneTask(t *testing.T) {
func Test1Task(t *testing.T) {
os.Setenv("SLING_CLI", "TRUE")
config := &sling.Config{}
cfgStr := `
Expand All @@ -603,7 +603,7 @@ options:
}
}

func TestOneReplication(t *testing.T) {
func Test1Replication(t *testing.T) {
sling.ShowProgress = false
os.Setenv("_DEBUG", "LOW")
os.Setenv("SLING_CLI", "TRUE")
Expand Down
19 changes: 19 additions & 0 deletions cmd/sling/tests/replications/r.11.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
source: postgres
target: aws_s3

defaults:
mode: full-refresh
object: 'test1k/{YEAR}'
target_options:
file_max_rows: 1000000 # multiple files
compression: gzip

streams:
test1:
sql: |
select * from public.test1k
where date_trunc('year', create_date) = '{YEAR}-01-01'
env:
# pass in env vars, e.g. YEAR=2005
YEAR: $YEAR
2 changes: 1 addition & 1 deletion cmd/sling/tests/tasks/task.16.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"env":{},"mode":"full-refresh","options":{},"source":{"conn":"","options":{},"primary_key":[],"stream":"file://tests/files/test1.csv","update_key":""},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true}}}
{"env":{},"mode":"full-refresh","options":{},"source":{"conn":"","options":{},"primary_key":[],"stream":"file://tests/files/test1.csv","update_key":""},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true,"use_bulk":true}}}
2 changes: 1 addition & 1 deletion cmd/sling/tests/tasks/task.17.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"env":{"validation_cols":"0,1,2,3,4,6","validation_file":"file://tests/files/test1.result.csv"},"mode":"incremental","options":{},"source":{"conn":"","options":{},"primary_key":["id"],"stream":"file://tests/files/test1.upsert.csv","update_key":"create_dt"},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true}}}
{"env":{"validation_cols":"0,1,2,3,4,6","validation_file":"file://tests/files/test1.result.csv"},"mode":"incremental","options":{},"source":{"conn":"","options":{},"primary_key":["id"],"stream":"file://tests/files/test1.upsert.csv","update_key":"create_dt"},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true,"use_bulk":true}}}
33 changes: 26 additions & 7 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sling

import (
"database/sql/driver"
"io/ioutil"
"io"
"os"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -135,7 +135,7 @@ func (cfg *Config) Unmarshal(cfgStr string) error {
return g.Error(err, "Unable to open cfgStr: "+cfgStr)
}

cfgBytes, err = ioutil.ReadAll(cfgFile)
cfgBytes, err = io.ReadAll(cfgFile)
if err != nil {
return g.Error(err, "could not read from cfgFile")
}
Expand Down Expand Up @@ -166,6 +166,11 @@ func (cfg *Config) Unmarshal(cfgStr string) error {
cfg.TgtConn.Data = g.M()
}

// add config path
if _, err := os.Stat(cfgStr); err == nil && !cfg.ReplicationMode {
cfg.Env["SLING_CONFIG_PATH"] = cfgStr
}

return nil
}

Expand Down Expand Up @@ -221,7 +226,7 @@ func (cfg *Config) DetermineType() (Type JobType, err error) {
// need to loaded_at column for file incremental
cfg.MetadataLoadedAt = true
} else if cfg.Source.UpdateKey == "" && len(cfg.Source.PrimaryKey()) == 0 {
err = g.Error("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/configuration#mode")
err = g.Error("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/run/configuration")
return
}
} else if cfg.Mode == SnapshotMode {
Expand Down Expand Up @@ -318,6 +323,10 @@ func (cfg *Config) Prepare() (err error) {
}

// set from shell env variable, if value starts with $ and found
if cfg.Env == nil {
cfg.Env = map[string]string{}
}

for k, v := range cfg.Env {
cfg.Env[k] = os.ExpandEnv(v)
}
Expand Down Expand Up @@ -641,7 +650,17 @@ func (cfg *Config) Scan(value interface{}) error {

// Value return json value, implement driver.Valuer interface
func (cfg Config) Value() (driver.Value, error) {
return g.JSONValuer(cfg, "{}")
jBytes, err := json.Marshal(cfg)
if err != nil {
return nil, g.Error(err, "could not marshal")
}

out := string(jBytes)
out = strings.ReplaceAll(out, `,"_src_conn":{}`, ``)
out = strings.ReplaceAll(out, `,"_tgt_conn":{}`, ``)
out = strings.ReplaceAll(out, `,"primary_key":null`, ``)

return []byte(out), err
}

// ConfigOptions are configuration options
Expand All @@ -655,7 +674,7 @@ type ConfigOptions struct {
type Source struct {
Conn string `json:"conn" yaml:"conn"`
Stream string `json:"stream,omitempty" yaml:"stream,omitempty"`
Columns []string `json:"columns,omitempty" yaml:"columns,omitempty"`
Select []string `json:"select,omitempty" yaml:"select,omitempty"` // Select columns
PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,omitempty"`
UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"`
Options *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"`
Expand Down Expand Up @@ -790,7 +809,7 @@ var TargetFileOptionsDefault = TargetOptions{
cast.ToInt64(os.Getenv("FILE_MAX_BYTES")),
0,
),
Format: filesys.FileTypeCsv,
Format: filesys.FileTypeNone,
UseBulk: g.Bool(true),
AddNewColumns: g.Bool(true),
DatetimeFormat: "auto",
Expand Down Expand Up @@ -885,7 +904,7 @@ func (o *TargetOptions) SetDefaults(targetOptions TargetOptions) {
if o.Compression == nil {
o.Compression = targetOptions.Compression
}
if string(o.Format) == "" {
if o.Format == filesys.FileTypeNone {
o.Format = targetOptions.Format
}
if o.Concurrency == 0 {
Expand Down
55 changes: 45 additions & 10 deletions core/sling/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/flarco/dbio/connection"
"github.com/flarco/dbio/database"
"github.com/flarco/dbio/iop"
"github.com/flarco/g"
"github.com/samber/lo"
"github.com/spf13/cast"
Expand All @@ -23,6 +22,12 @@ type ReplicationConfig struct {
Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"`

streamsOrdered []string
originalCfg string
}

// OriginalCfg returns original config
func (rd *ReplicationConfig) OriginalCfg() string {
return rd.originalCfg
}

// Scan scan value into Jsonb, implements sql.Scanner interface
Expand All @@ -32,14 +37,41 @@ func (rd *ReplicationConfig) Scan(value interface{}) error {

// Value return json value, implement driver.Valuer interface
func (rd ReplicationConfig) Value() (driver.Value, error) {
return g.JSONValuer(rd, "{}")
if rd.OriginalCfg() != "" {
return []byte(rd.OriginalCfg()), nil
}

jBytes, err := json.Marshal(rd)
if err != nil {
return nil, g.Error(err, "could not marshal")
}

return jBytes, err
}

// StreamsOrdered returns the stream names as ordered in the YAML file
func (rd ReplicationConfig) StreamsOrdered() []string {
return rd.streamsOrdered
}

// HasStream returns true if the stream name exists
func (rd ReplicationConfig) HasStream(name string) bool {

normalize := func(n string) string {
n = strings.ReplaceAll(n, "`", "")
n = strings.ReplaceAll(n, `"`, "")
n = strings.ToLower(n)
return n
}

for streamName := range rd.Streams {
if normalize(streamName) == normalize(name) {
return true
}
}
return false
}

// ProcessWildcards process the streams using wildcards
// such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix`
func (rd *ReplicationConfig) ProcessWildcards() (err error) {
Expand Down Expand Up @@ -97,6 +129,10 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
Dialect: conn.GetType(),
}

if rd.HasStream(table.FullName()) {
continue
}

// add to stream map
newCfg := ReplicationStreamConfig{}
g.Unmarshal(g.Marshal(rd.Streams[wildcardName]), &newCfg) // copy config over
Expand All @@ -119,7 +155,7 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) {
type ReplicationStreamConfig struct {
Mode Mode `json:"mode,omitempty" yaml:"mode,omitempty"`
Object string `json:"object,omitempty" yaml:"object,omitempty"`
Columns []string `json:"columns,omitempty" yaml:"columns,flow,omitempty"`
Select []string `json:"select,omitempty" yaml:"select,flow,omitempty"`
PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"`
UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"`
SQL string `json:"sql,omitempty" yaml:"sql,omitempty"`
Expand Down Expand Up @@ -270,17 +306,13 @@ func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err erro
return
}

func getSourceOptionsColumns(sourceOptionsNodes yaml.MapSlice) (columns iop.Columns) {
func getSourceOptionsColumns(sourceOptionsNodes yaml.MapSlice) (columns map[string]any) {
columns = map[string]any{}
for _, sourceOptionsNode := range sourceOptionsNodes {
if cast.ToString(sourceOptionsNode.Key) == "columns" {
for _, columnNode := range sourceOptionsNode.Value.(yaml.MapSlice) {
col := iop.Column{
Name: cast.ToString(columnNode.Key),
Type: iop.ColumnType(cast.ToString(columnNode.Value)),
}
columns = append(columns, col)
columns[cast.ToString(columnNode.Key)] = cast.ToString(columnNode.Value)
}
columns = iop.NewColumns(columns...)
}
}

Expand All @@ -305,5 +337,8 @@ func LoadReplicationConfig(cfgPath string) (config ReplicationConfig, err error)
err = g.Error(err, "Error parsing replication config")
}

config.originalCfg = g.Marshal(config)
config.Env["SLING_CONFIG_PATH"] = cfgPath

return
}
Loading

0 comments on commit 27fc1f3

Please sign in to comment.