From 27fc1f3723bc1d28ed610c4fa1e559be0f179c74 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Mon, 4 Dec 2023 11:29:18 -0300 Subject: [PATCH] V1.0.63 (#77) * use FileTypeNone as default * add avro reading * update dbio * improve replication HasStream * clean up docs linking * add r.11 test * add windows path fix * rename source.columns to source.select to avoid confusion * update test tasks * update test names * add sas7bdat file format * add store history keeping * improve store reliability * update github.com/flarco/dbio v0.4.38 * improve store --- cmd/sling/sling.go | 8 +- cmd/sling/sling_logic.go | 42 ++++- cmd/sling/sling_test.go | 4 +- cmd/sling/tests/replications/r.11.yaml | 19 ++ cmd/sling/tests/tasks/task.16.json | 2 +- cmd/sling/tests/tasks/task.17.json | 2 +- core/sling/config.go | 33 +++- core/sling/replication.go | 55 ++++-- core/sling/task.go | 13 +- core/sling/task_run.go | 9 + core/sling/task_run_read.go | 8 +- core/store/db.go | 53 ++++++ core/store/store.go | 233 +++++++++++++++++++++++++ go.mod | 8 +- go.sum | 4 + 15 files changed, 455 insertions(+), 38 deletions(-) create mode 100644 cmd/sling/tests/replications/r.11.yaml create mode 100644 core/store/db.go create mode 100644 core/store/store.go diff --git a/cmd/sling/sling.go b/cmd/sling/sling.go index ca001b5d..b581df27 100755 --- a/cmd/sling/sling.go +++ b/cmd/sling/sling.go @@ -48,20 +48,20 @@ var sentryOptions = sentry.ClientOptions{ var cliRun = &g.CliSC{ Name: "run", - Description: "Execute an ad-hoc task", + Description: "Execute a run", AdditionalHelpPrepend: "\nSee more examples and configuration details at https://docs.slingdata.io/sling-cli/", Flags: []g.Flag{ { Name: "config", ShortName: "c", Type: "string", - Description: "The config string or file to use (JSON or YAML).", + Description: "The task config string or file to use (JSON or YAML).", }, { Name: "replication", ShortName: "r", Type: "string", - Description: "The replication config file to use to run multiple tasks (YAML).\n", + Description: "The replication config file to use (JSON or YAML).\n", }, { Name: "src-conn", @@ -308,7 +308,7 @@ var cliConns = &g.CliSC{ Name: "key=value properties...", ShortName: "", Type: "string", - Description: "The key=value properties to set. See https://docs.slingdata.io/sling-cli/environment#local-connections", + Description: "The key=value properties to set. See https://docs.slingdata.io/sling-cli/environment#set-connections", }, }, }, diff --git a/cmd/sling/sling_logic.go b/cmd/sling/sling_logic.go index 1ea2cf2d..adb73cc8 100755 --- a/cmd/sling/sling_logic.go +++ b/cmd/sling/sling_logic.go @@ -2,6 +2,8 @@ package main import ( "os" + "os/exec" + "path/filepath" "runtime/debug" "strings" "time" @@ -13,6 +15,7 @@ import ( "github.com/flarco/dbio/connection" "github.com/slingdata-io/sling-cli/core/env" "github.com/slingdata-io/sling-cli/core/sling" + "github.com/slingdata-io/sling-cli/core/store" "github.com/flarco/g" "github.com/spf13/cast" @@ -36,6 +39,9 @@ func init() { if masterServerURL == "" { masterServerURL = "https://api.slingdata.io" } + + // init sqlite + store.InitDB() } func processRun(c *g.CliSC) (ok bool, err error) { @@ -170,7 +176,7 @@ func processRun(c *g.CliSC) (ok bool, err error) { // run task telemetryMap["run_mode"] = "task" - err = runTask(cfg) + err = runTask(cfg, nil) if err != nil { return ok, g.Error(err, "failure running task (see docs @ https://docs.slingdata.io/sling-cli)") } @@ -178,7 +184,7 @@ func processRun(c *g.CliSC) (ok bool, err error) { return ok, nil } -func runTask(cfg *sling.Config) (err error) { +func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error) { var task *sling.TaskExecution // track usage @@ -242,7 +248,16 @@ func runTask(cfg *sling.Config) (err error) { return } + // try to get project_id + setProjectID(cfg.Env["SLING_CONFIG_PATH"]) + cfg.Env["SLING_PROJECT_ID"] = projectID + task = sling.NewTask(0, cfg) + task.Replication = replication + + // insert into store for history keeping + sling.StoreInsert(task) + if task.Err != nil { err = g.Error(task.Err) return @@ -308,7 +323,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) { Source: sling.Source{ Conn: replication.Source, Stream: name, - Columns: stream.Columns, + Select: stream.Select, PrimaryKeyI: stream.PrimaryKey(), UpdateKey: stream.UpdateKey, }, @@ -339,7 +354,7 @@ func runReplication(cfgPath string, selectStreams ...string) (err error) { } telemetryMap["run_mode"] = "replication" - err = runTask(&cfg) + err = runTask(&cfg, &replication) if err != nil { err = g.Error(err, "error for stream `%s`", name) g.LogError(err) @@ -480,3 +495,22 @@ func parsePayload(payload string, validate bool) (options map[string]any, err er return options, nil } + +// setProjectID attempts to get the first sha of the repo +func setProjectID(cfgPath string) { + if cfgPath == "" { + return + } + + cfgPath, _ = filepath.Abs(cfgPath) + + if fs, err := os.Stat(cfgPath); err == nil && !fs.IsDir() { + // get first sha + cmd := exec.Command("git", "rev-list", "--max-parents=0", "HEAD") + cmd.Dir = filepath.Dir(cfgPath) + out, err := cmd.Output() + if err == nil { + projectID = strings.TrimSpace(string(out)) + } + } +} diff --git a/cmd/sling/sling_test.go b/cmd/sling/sling_test.go index f92fce12..90b41ff2 100755 --- a/cmd/sling/sling_test.go +++ b/cmd/sling/sling_test.go @@ -577,7 +577,7 @@ func TestCfgPath(t *testing.T) { g.AssertNoError(t, err) } -func testOneTask(t *testing.T) { +func Test1Task(t *testing.T) { os.Setenv("SLING_CLI", "TRUE") config := &sling.Config{} cfgStr := ` @@ -603,7 +603,7 @@ options: } } -func TestOneReplication(t *testing.T) { +func Test1Replication(t *testing.T) { sling.ShowProgress = false os.Setenv("_DEBUG", "LOW") os.Setenv("SLING_CLI", "TRUE") diff --git a/cmd/sling/tests/replications/r.11.yaml b/cmd/sling/tests/replications/r.11.yaml new file mode 100644 index 00000000..89f15a69 --- /dev/null +++ b/cmd/sling/tests/replications/r.11.yaml @@ -0,0 +1,19 @@ +source: postgres +target: aws_s3 + +defaults: + mode: full-refresh + object: 'test1k/{YEAR}' + target_options: + file_max_rows: 1000000 # multiple files + compression: gzip + +streams: + test1: + sql: | + select * from public.test1k + where date_trunc('year', create_date) = '{YEAR}-01-01' + +env: + # pass in env vars, e.g. YEAR=2005 + YEAR: $YEAR \ No newline at end of file diff --git a/cmd/sling/tests/tasks/task.16.json b/cmd/sling/tests/tasks/task.16.json index 16c2ca29..11b52070 100755 --- a/cmd/sling/tests/tasks/task.16.json +++ b/cmd/sling/tests/tasks/task.16.json @@ -1 +1 @@ -{"env":{},"mode":"full-refresh","options":{},"source":{"conn":"","options":{},"primary_key":[],"stream":"file://tests/files/test1.csv","update_key":""},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true}}} \ No newline at end of file +{"env":{},"mode":"full-refresh","options":{},"source":{"conn":"","options":{},"primary_key":[],"stream":"file://tests/files/test1.csv","update_key":""},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true,"use_bulk":true}}} \ No newline at end of file diff --git a/cmd/sling/tests/tasks/task.17.json b/cmd/sling/tests/tasks/task.17.json index 549aabab..643a521f 100755 --- a/cmd/sling/tests/tasks/task.17.json +++ b/cmd/sling/tests/tasks/task.17.json @@ -1 +1 @@ -{"env":{"validation_cols":"0,1,2,3,4,6","validation_file":"file://tests/files/test1.result.csv"},"mode":"incremental","options":{},"source":{"conn":"","options":{},"primary_key":["id"],"stream":"file://tests/files/test1.upsert.csv","update_key":"create_dt"},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true}}} \ No newline at end of file +{"env":{"validation_cols":"0,1,2,3,4,6","validation_file":"file://tests/files/test1.result.csv"},"mode":"incremental","options":{},"source":{"conn":"","options":{},"primary_key":["id"],"stream":"file://tests/files/test1.upsert.csv","update_key":"create_dt"},"target":{"conn":"POSTGRES","object":"public.test1","options":{"add_new_columns":true,"adjust_column_type":true,"use_bulk":true}}} \ No newline at end of file diff --git a/core/sling/config.go b/core/sling/config.go index 802081f8..12a34093 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -2,7 +2,7 @@ package sling import ( "database/sql/driver" - "io/ioutil" + "io" "os" "path/filepath" "regexp" @@ -135,7 +135,7 @@ func (cfg *Config) Unmarshal(cfgStr string) error { return g.Error(err, "Unable to open cfgStr: "+cfgStr) } - cfgBytes, err = ioutil.ReadAll(cfgFile) + cfgBytes, err = io.ReadAll(cfgFile) if err != nil { return g.Error(err, "could not read from cfgFile") } @@ -166,6 +166,11 @@ func (cfg *Config) Unmarshal(cfgStr string) error { cfg.TgtConn.Data = g.M() } + // add config path + if _, err := os.Stat(cfgStr); err == nil && !cfg.ReplicationMode { + cfg.Env["SLING_CONFIG_PATH"] = cfgStr + } + return nil } @@ -221,7 +226,7 @@ func (cfg *Config) DetermineType() (Type JobType, err error) { // need to loaded_at column for file incremental cfg.MetadataLoadedAt = true } else if cfg.Source.UpdateKey == "" && len(cfg.Source.PrimaryKey()) == 0 { - err = g.Error("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/configuration#mode") + err = g.Error("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/run/configuration") return } } else if cfg.Mode == SnapshotMode { @@ -318,6 +323,10 @@ func (cfg *Config) Prepare() (err error) { } // set from shell env variable, if value starts with $ and found + if cfg.Env == nil { + cfg.Env = map[string]string{} + } + for k, v := range cfg.Env { cfg.Env[k] = os.ExpandEnv(v) } @@ -641,7 +650,17 @@ func (cfg *Config) Scan(value interface{}) error { // Value return json value, implement driver.Valuer interface func (cfg Config) Value() (driver.Value, error) { - return g.JSONValuer(cfg, "{}") + jBytes, err := json.Marshal(cfg) + if err != nil { + return nil, g.Error(err, "could not marshal") + } + + out := string(jBytes) + out = strings.ReplaceAll(out, `,"_src_conn":{}`, ``) + out = strings.ReplaceAll(out, `,"_tgt_conn":{}`, ``) + out = strings.ReplaceAll(out, `,"primary_key":null`, ``) + + return []byte(out), err } // ConfigOptions are configuration options @@ -655,7 +674,7 @@ type ConfigOptions struct { type Source struct { Conn string `json:"conn" yaml:"conn"` Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` - Columns []string `json:"columns,omitempty" yaml:"columns,omitempty"` + Select []string `json:"select,omitempty" yaml:"select,omitempty"` // Select columns PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,omitempty"` UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"` Options *SourceOptions `json:"options,omitempty" yaml:"options,omitempty"` @@ -790,7 +809,7 @@ var TargetFileOptionsDefault = TargetOptions{ cast.ToInt64(os.Getenv("FILE_MAX_BYTES")), 0, ), - Format: filesys.FileTypeCsv, + Format: filesys.FileTypeNone, UseBulk: g.Bool(true), AddNewColumns: g.Bool(true), DatetimeFormat: "auto", @@ -885,7 +904,7 @@ func (o *TargetOptions) SetDefaults(targetOptions TargetOptions) { if o.Compression == nil { o.Compression = targetOptions.Compression } - if string(o.Format) == "" { + if o.Format == filesys.FileTypeNone { o.Format = targetOptions.Format } if o.Concurrency == 0 { diff --git a/core/sling/replication.go b/core/sling/replication.go index fd57732d..e3ece574 100644 --- a/core/sling/replication.go +++ b/core/sling/replication.go @@ -8,7 +8,6 @@ import ( "github.com/flarco/dbio/connection" "github.com/flarco/dbio/database" - "github.com/flarco/dbio/iop" "github.com/flarco/g" "github.com/samber/lo" "github.com/spf13/cast" @@ -23,6 +22,12 @@ type ReplicationConfig struct { Env map[string]any `json:"env,omitempty" yaml:"env,omitempty"` streamsOrdered []string + originalCfg string +} + +// OriginalCfg returns original config +func (rd *ReplicationConfig) OriginalCfg() string { + return rd.originalCfg } // Scan scan value into Jsonb, implements sql.Scanner interface @@ -32,7 +37,16 @@ func (rd *ReplicationConfig) Scan(value interface{}) error { // Value return json value, implement driver.Valuer interface func (rd ReplicationConfig) Value() (driver.Value, error) { - return g.JSONValuer(rd, "{}") + if rd.OriginalCfg() != "" { + return []byte(rd.OriginalCfg()), nil + } + + jBytes, err := json.Marshal(rd) + if err != nil { + return nil, g.Error(err, "could not marshal") + } + + return jBytes, err } // StreamsOrdered returns the stream names as ordered in the YAML file @@ -40,6 +54,24 @@ func (rd ReplicationConfig) StreamsOrdered() []string { return rd.streamsOrdered } +// HasStream returns true if the stream name exists +func (rd ReplicationConfig) HasStream(name string) bool { + + normalize := func(n string) string { + n = strings.ReplaceAll(n, "`", "") + n = strings.ReplaceAll(n, `"`, "") + n = strings.ToLower(n) + return n + } + + for streamName := range rd.Streams { + if normalize(streamName) == normalize(name) { + return true + } + } + return false +} + // ProcessWildcards process the streams using wildcards // such as `my_schema.*` or `my_schema.my_prefix_*` or `my_schema.*_my_suffix` func (rd *ReplicationConfig) ProcessWildcards() (err error) { @@ -97,6 +129,10 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) { Dialect: conn.GetType(), } + if rd.HasStream(table.FullName()) { + continue + } + // add to stream map newCfg := ReplicationStreamConfig{} g.Unmarshal(g.Marshal(rd.Streams[wildcardName]), &newCfg) // copy config over @@ -119,7 +155,7 @@ func (rd *ReplicationConfig) ProcessWildcards() (err error) { type ReplicationStreamConfig struct { Mode Mode `json:"mode,omitempty" yaml:"mode,omitempty"` Object string `json:"object,omitempty" yaml:"object,omitempty"` - Columns []string `json:"columns,omitempty" yaml:"columns,flow,omitempty"` + Select []string `json:"select,omitempty" yaml:"select,flow,omitempty"` PrimaryKeyI any `json:"primary_key,omitempty" yaml:"primary_key,flow,omitempty"` UpdateKey string `json:"update_key,omitempty" yaml:"update_key,omitempty"` SQL string `json:"sql,omitempty" yaml:"sql,omitempty"` @@ -270,17 +306,13 @@ func UnmarshalReplication(replicYAML string) (config ReplicationConfig, err erro return } -func getSourceOptionsColumns(sourceOptionsNodes yaml.MapSlice) (columns iop.Columns) { +func getSourceOptionsColumns(sourceOptionsNodes yaml.MapSlice) (columns map[string]any) { + columns = map[string]any{} for _, sourceOptionsNode := range sourceOptionsNodes { if cast.ToString(sourceOptionsNode.Key) == "columns" { for _, columnNode := range sourceOptionsNode.Value.(yaml.MapSlice) { - col := iop.Column{ - Name: cast.ToString(columnNode.Key), - Type: iop.ColumnType(cast.ToString(columnNode.Value)), - } - columns = append(columns, col) + columns[cast.ToString(columnNode.Key)] = cast.ToString(columnNode.Value) } - columns = iop.NewColumns(columns...) } } @@ -305,5 +337,8 @@ func LoadReplicationConfig(cfgPath string) (config ReplicationConfig, err error) err = g.Error(err, "Error parsing replication config") } + config.originalCfg = g.Marshal(config) + config.Env["SLING_CONFIG_PATH"] = cfgPath + return } diff --git a/core/sling/task.go b/core/sling/task.go index 0b68a542..c30851b7 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -14,6 +14,9 @@ import ( "github.com/spf13/cast" ) +// Set in the store/store.go file for history keeping +var StoreInsert, StoreUpdate func(t *TaskExecution) + // TaskExecution is a sling ELT task run, synonymous to an execution type TaskExecution struct { ExecID int64 `json:"exec_id"` @@ -32,6 +35,7 @@ type TaskExecution struct { prevByteCount uint64 lastIncrement time.Time // the time of last row increment (to determine stalling) + Replication *ReplicationConfig ProgressHist []string `json:"progress_hist"` PBar *ProgressBar `json:"-"` ProcStatsStart g.ProcStats `json:"-"` // process stats at beginning @@ -81,6 +85,8 @@ func NewTask(execID int64, cfg *Config) (t *TaskExecution) { t.PBar = NewPBar(time.Second) ticker := time.NewTicker(1 * time.Second) go func() { + defer ticker.Stop() + for { select { case <-ticker.C: @@ -93,6 +99,9 @@ func NewTask(execID int64, cfg *Config) (t *TaskExecution) { t.PBar.bar.Set("rowRate", g.F("%s r/s", humanize.Comma(rowRate))) t.PBar.bar.Set("byteRate", g.F("%s/s", humanize.Bytes(cast.ToUint64(byteRate)))) } + + // update rows every 1sec + StoreUpdate(t) default: time.Sleep(100 * time.Millisecond) if t.PBar.finished || t.df.Err() != nil { @@ -378,13 +387,13 @@ func ErrorHelper(err error) (helpString string) { switch { case strings.Contains(errString, "utf8") || strings.Contains(errString, "ascii"): - helpString = "Perhaps the 'transforms' source option could help with encodings? See https://docs.slingdata.io/sling-cli/running-tasks#advanced-options" + helpString = "Perhaps the 'transforms' source option could help with encodings? See https://docs.slingdata.io/sling-cli/run/configuration#source" case strings.Contains(errString, "failed to verify certificate"): helpString = "Perhaps specifying `encrypt=true` and `TrustServerCertificate=true` properties could help? See https://docs.slingdata.io/connections/database-connections/sqlserver" case strings.Contains(errString, "ssl is not enabled on the server"): helpString = "Perhaps setting the 'sslmode' option could help? See https://docs.slingdata.io/connections/database-connections/postgres" case strings.Contains(errString, "invalid input syntax for type") || (strings.Contains(errString, " value ") && strings.Contains(errString, "is not recognized")): - helpString = "Perhaps setting a higher 'SAMPLE_SIZE' environment variable could help? This represents the number of records to process in order to infer column types (especially for file sources). The default is 900. Try 2000 or even higher.\nYou can also manually specify the column types with the `columns` source option. See https://docs.slingdata.io/sling-cli/running-tasks#advanced-options" + helpString = "Perhaps setting a higher 'SAMPLE_SIZE' environment variable could help? This represents the number of records to process in order to infer column types (especially for file sources). The default is 900. Try 2000 or even higher.\nYou can also manually specify the column types with the `columns` source option. See https://docs.slingdata.io/sling-cli/run/configuration#source" } } return diff --git a/core/sling/task_run.go b/core/sling/task_run.go index 08f9f8cd..349f479f 100644 --- a/core/sling/task_run.go +++ b/core/sling/task_run.go @@ -75,6 +75,9 @@ func (t *TaskExecution) Execute() error { return } + // update into store + StoreUpdate(t) + g.DebugLow("type is %s", t.Type) g.Debug("using source options: %s", g.Marshal(t.Config.Source.Options)) g.Debug("using target options: %s", g.Marshal(t.Config.Target.Options)) @@ -98,6 +101,9 @@ func (t *TaskExecution) Execute() error { t.SetProgress("task execution configuration is invalid") t.Err = g.Error("Cannot Execute. Task Type is not specified") } + + // update into store + StoreUpdate(t) }() select { @@ -134,6 +140,9 @@ func (t *TaskExecution) Execute() error { now2 := time.Now() t.EndTime = &now2 + // update into store + StoreUpdate(t) + return t.Err } diff --git a/core/sling/task_run_read.go b/core/sling/task_run_read.go index bf1b903f..552b0809 100644 --- a/core/sling/task_run_read.go +++ b/core/sling/task_run_read.go @@ -46,8 +46,8 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df } } - if len(cfg.Source.Columns) > 0 { - fields := lo.Map(cfg.Source.Columns, func(f string, i int) string { + if len(cfg.Source.Select) > 0 { + fields := lo.Map(cfg.Source.Select, func(f string, i int) string { return f }) fieldsStr = strings.Join(fields, ", ") @@ -142,7 +142,7 @@ func (t *TaskExecution) ReadFromAPI(cfg *Config, client *airbyte.Airbyte) (df *i if cfg.SrcConn.Type.IsAirbyte() { config := airbyte.StreamConfig{ - Columns: cfg.Source.Columns, + Columns: cfg.Source.Select, StartDate: cfg.IncrementalVal, } stream, err = client.Stream(cfg.Source.Stream, config) @@ -183,7 +183,7 @@ func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error) return t.df, err } - fsCfg := filesys.FileStreamConfig{Columns: cfg.Source.Columns, Limit: cfg.Source.Limit()} + fsCfg := filesys.FileStreamConfig{Columns: cfg.Source.Select, Limit: cfg.Source.Limit()} df, err = fs.ReadDataflow(cfg.SrcConn.URL(), fsCfg) if err != nil { err = g.Error(err, "Could not FileSysReadDataflow for %s", cfg.SrcConn.Type) diff --git a/core/store/db.go b/core/store/db.go new file mode 100644 index 00000000..6dcc4890 --- /dev/null +++ b/core/store/db.go @@ -0,0 +1,53 @@ +package store + +import ( + "github.com/flarco/dbio/database" + "github.com/flarco/g" + "github.com/jmoiron/sqlx" + "github.com/slingdata-io/sling-cli/core/env" + "gorm.io/gorm" +) + +var ( + // Db is the main databse connection + Db *gorm.DB + Dbx *sqlx.DB + Conn database.Connection + + // DropAll signifies to drop all tables and recreate them + DropAll = false +) + +// InitDB initializes the database +func InitDB() { + var err error + + dbURL := g.F("sqlite://%s/.sling.db?cache=shared&mode=rwc&_journal_mode=WAL", env.HomeDir) + Conn, err = database.NewConn(dbURL) + if err != nil { + g.Debug("could not initialize local .sling.db. %s", err.Error()) + return + } + + g.LogError(err, "Could not initialize sqlite connection: %s", dbURL) + + Db, err = Conn.GetGormConn(&gorm.Config{}) + g.LogError(err, "Could not connect to sqlite database: %s", dbURL) + + allTables := []interface{}{ + &Execution{}, + &Task{}, + &Replication{}, + } + + for _, table := range allTables { + dryDB := Db.Session(&gorm.Session{DryRun: true}) + tableName := dryDB.Find(table).Statement.Table + if DropAll { + Db.Exec(g.F(`drop table if exists "%s"`, tableName)) + } + g.Debug("Creating table: " + tableName) + err = Db.AutoMigrate(table) + g.LogError(err, "error AutoMigrating table: "+tableName) + } +} diff --git a/core/store/store.go b/core/store/store.go new file mode 100644 index 00000000..dab5db35 --- /dev/null +++ b/core/store/store.go @@ -0,0 +1,233 @@ +package store + +import ( + "strings" + "time" + + "github.com/flarco/dbio/connection" + "github.com/flarco/g" + "github.com/slingdata-io/sling-cli/core/sling" + "github.com/spf13/cast" + "gorm.io/gorm/clause" +) + +func init() { + sling.StoreInsert = StoreInsert + sling.StoreUpdate = StoreUpdate +} + +type Execution struct { + // ID auto-increments + ID int64 `json:"id" gorm:"primaryKey"` + + // StreamID represents the stream inside the replication that is running. + // Is an MD5 construct:`md5(Source, Target, Stream)`. + StreamID string `json:"stream_id" sql:"not null" gorm:"index"` + + // ConfigMD5 points to config table. not null + TaskMD5 string `json:"task_md5" sql:"not null" gorm:"index"` + ReplicationMD5 string `json:"replication_md5" sql:"not null" gorm:"index"` + + Status sling.ExecStatus `json:"status" gorm:"index"` + Err *string `json:"error"` + StartTime *time.Time `json:"start_time" gorm:"index"` + EndTime *time.Time `json:"end_time" gorm:"index"` + Bytes uint64 `json:"bytes"` + ExitCode int `json:"exit_code"` + Output string `json:"output" sql:"default ''"` + Rows uint64 `json:"rows"` + + // ProjectID represents the project or the repository. + // If .git exists, grab first commit with `git rev-list --max-parents=0 HEAD`. + // if not, use md5 of path of folder. Can be `null` if using task. + ProjectID *string `json:"project_id" gorm:"index"` + + // FilePath represents the path to a file. + // We would need this to refer back to the same file, even if + // the contents change. So this should just be the relative path + // of the replication.yaml or task.yaml from the root of the project. + // If Ad-hoc from CLI flags, let it be `null`. + FilePath *string `json:"file_path" gorm:"index"` + + CreatedDt time.Time `json:"created_dt" gorm:"autoCreateTime"` + UpdatedDt time.Time `json:"updated_dt" gorm:"autoUpdateTime"` +} + +type Task struct { + // MD5 is MD5 of Config json string + MD5 string `json:"md5" gorm:"primaryKey"` + + Type sling.JobType `json:"type" gorm:"index"` + + Task sling.Config `json:"task"` + + CreatedDt time.Time `json:"created_dt" gorm:"autoCreateTime"` + UpdatedDt time.Time `json:"updated_dt" gorm:"autoUpdateTime"` +} + +type Replication struct { + // MD5 is MD5 of Config json string + MD5 string `json:"md5" gorm:"primaryKey"` + + Type sling.JobType `json:"type" gorm:"index"` + + Replication sling.ReplicationConfig `json:"replication"` + + CreatedDt time.Time `json:"created_dt" gorm:"autoCreateTime"` + UpdatedDt time.Time `json:"updated_dt" gorm:"autoUpdateTime"` +} + +// Store saves the task into the local sqlite +func ToExecutionObject(t *sling.TaskExecution) *Execution { + + exec := Execution{ + ID: t.ExecID, + StreamID: g.MD5(t.Config.Source.Conn, t.Config.Target.Conn, t.Config.Source.Stream), + Status: t.Status, + StartTime: t.StartTime, + EndTime: t.EndTime, + Bytes: t.Bytes, + Output: "", + Rows: t.GetCount(), + ProjectID: g.String(t.Config.Env["SLING_PROJECT_ID"]), + FilePath: g.String(t.Config.Env["SLING_CONFIG_PATH"]), + } + + if t.Err != nil { + exec.Err = g.String(t.Err.Error()) + } + + if t.Replication != nil && t.Replication.Env["SLING_CONFIG_PATH"] != nil { + exec.FilePath = g.String(cast.ToString(t.Replication.Env["SLING_CONFIG_PATH"])) + } + + return &exec +} + +func ToConfigObject(t *sling.TaskExecution) (task *Task, replication *Replication) { + if t.Config == nil { + return + } + + task = &Task{ + Type: t.Type, + Task: *t.Config, + } + + if t.Replication != nil { + replication = &Replication{ + Type: t.Type, + Replication: *t.Replication, + } + + payload := replication.Replication.OriginalCfg() + + // clean up + if strings.Contains(replication.Replication.Source, "://") { + cleanSource := strings.Split(replication.Replication.Source, "://")[0] + "://" + payload = strings.ReplaceAll(payload, replication.Replication.Source, cleanSource) + } + + if strings.Contains(replication.Replication.Target, "://") { + cleanTarget := strings.Split(replication.Replication.Target, "://")[0] + "://" + payload = strings.ReplaceAll(payload, replication.Replication.Target, cleanTarget) + } + + // set md5 + replication.MD5 = g.MD5(payload) + } + + // clean up + if strings.Contains(task.Task.Source.Conn, "://") { + task.Task.Source.Conn = strings.Split(task.Task.Source.Conn, "://")[0] + "://" + } + + if strings.Contains(task.Task.Target.Conn, "://") { + task.Task.Target.Conn = strings.Split(task.Task.Target.Conn, "://")[0] + "://" + } + + task.Task.Source.Data = nil + task.Task.Target.Data = nil + + task.Task.SrcConn = connection.Connection{} + task.Task.TgtConn = connection.Connection{} + + task.Task.Prepared = false + + delete(task.Task.Env, "SLING_PROJECT_ID") + delete(task.Task.Env, "SLING_CONFIG_PATH") + + // set md5 + task.MD5 = g.MD5(g.Marshal(task.Task)) + + return +} + +// Store saves the task into the local sqlite +func StoreInsert(t *sling.TaskExecution) { + if Db == nil { + return + } + + // make execution + exec := ToExecutionObject(t) + + // insert config + task, replication := ToConfigObject(t) + err := Db.Clauses(clause.OnConflict{DoNothing: true}). + Create(task).Error + if err != nil { + g.Debug("could not insert task config into local .sling.db. %s", err.Error()) + return + } + exec.TaskMD5 = task.MD5 + + if replication != nil { + err := Db.Clauses(clause.OnConflict{DoNothing: true}). + Create(replication).Error + if err != nil { + g.Debug("could not insert replication config into local .sling.db. %s", err.Error()) + return + } + exec.ReplicationMD5 = replication.MD5 + } + + // insert execution + err = Db.Create(exec).Error + if err != nil { + g.Debug("could not insert execution into local .sling.db. %s", err.Error()) + return + } + + t.ExecID = exec.ID +} + +// Store saves the task into the local sqlite +func StoreUpdate(t *sling.TaskExecution) { + if Db == nil { + return + } + + exec := &Execution{ID: t.ExecID} + err := Db.First(exec).Error + if err != nil { + g.Debug("could not select execution from local .sling.db. %s", err.Error()) + return + } + execNew := ToExecutionObject(t) + + exec.StartTime = execNew.StartTime + exec.EndTime = execNew.EndTime + exec.Status = execNew.Status + exec.Err = execNew.Err + exec.Bytes = execNew.Bytes + exec.Rows = execNew.Rows + exec.Output = execNew.Output + + err = Db.Updates(exec).Error + if err != nil { + g.Debug("could not update execution into local .sling.db. %s", err.Error()) + return + } + +} diff --git a/go.mod b/go.mod index 76c0b1a0..d89adbbd 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,9 @@ require ( github.com/c-bata/go-prompt v0.2.6 github.com/denisbrodbeck/machineid v1.0.1 github.com/dustin/go-humanize v1.0.1 - github.com/flarco/dbio v0.4.32 - github.com/flarco/g v0.1.64 + github.com/fatih/color v1.13.0 + github.com/flarco/dbio v0.4.38 + github.com/flarco/g v0.1.66 github.com/getsentry/sentry-go v0.11.0 github.com/google/uuid v1.3.0 github.com/integrii/flaggy v1.5.2 @@ -85,7 +86,6 @@ require ( github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1 // indirect github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect - github.com/fatih/color v1.13.0 // indirect github.com/flarco/bigquery v0.0.9 // indirect github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -129,7 +129,9 @@ require ( github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.2.3 // indirect github.com/kr/fs v0.1.0 // indirect + github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011 // indirect github.com/lib/pq v1.10.2 // indirect + github.com/linkedin/goavro/v2 v2.12.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-ieproxy v0.0.9 // indirect diff --git a/go.sum b/go.sum index f14f8303..1f268f0f 100644 --- a/go.sum +++ b/go.sum @@ -598,6 +598,8 @@ github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011 h1:PNO6bcxsCMsX4R8nN/78gswn9F3We86Pi80NV0YPkLc= +github.com/kshedden/datareader v0.0.0-20210325133423-816b6ffdd011/go.mod h1:oTL9FJaM6f+gPQyrBN/Dd274KKAEkHw9ATjZ+7GD86U= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= @@ -607,6 +609,8 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg= +github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=