From febc5bd5fab68b6dd8c450c909b12796413b9660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Mate=CC=8Cjka?= Date: Mon, 26 Sep 2022 16:59:50 +0200 Subject: [PATCH] Generate dbt sources --- go.mod | 2 +- go.sum | 6 +- internal/pkg/cli/cmd/dbt/generate/profile.go | 2 +- internal/pkg/cli/cmd/dbt/generate/sources.go | 17 +- internal/pkg/cli/dialog/dbt.go | 43 ++-- .../dbt/generate/profile/operation.go | 13 +- .../dbt/generate/sources/operation.go | 121 ++++++++++ .../dbt/generate/sources/operation_test.go | 207 ++++++++++++++++++ .../help/dbt-generate-sources/expected-stdout | 3 +- 9 files changed, 366 insertions(+), 48 deletions(-) create mode 100644 pkg/lib/operation/dbt/generate/sources/operation.go create mode 100644 pkg/lib/operation/dbt/generate/sources/operation_test.go diff --git a/go.mod b/go.mod index 77f4986b77..48749bd192 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/jarcoal/httpmock v1.2.0 github.com/joho/godotenv v1.3.0 github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629 - github.com/keboola/go-client v0.5.2 + github.com/keboola/go-client v0.5.3 github.com/keboola/go-utils v0.4.1 github.com/kylelemons/godebug v1.1.0 github.com/nhatthm/aferocopy v1.2.0 diff --git a/go.sum b/go.sum index d7f2ab982c..da5c0f67bf 100644 --- a/go.sum +++ b/go.sum @@ -375,10 +375,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/keboola/go-client v0.5.1 h1:7MJoDLmOIVyXYVj0RdzAF+RwZ36WUyh8jTt/RLPBqnc= -github.com/keboola/go-client v0.5.1/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U= -github.com/keboola/go-client v0.5.2 h1:kVfajbEMHc73cU4oDweSldhzz8jveuu+y91uhZPww1Q= -github.com/keboola/go-client v0.5.2/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U= +github.com/keboola/go-client v0.5.3 h1:/9BudTQmyGk1BXLx+xn5Fs7yVMsi9VjY4171ABVRXwA= +github.com/keboola/go-client v0.5.3/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U= github.com/keboola/go-jsonnet v0.18.1-0.20220810085752-aee7595aa305 h1:hS/jX0HQzeGZWjMBSAs/IqrJTrTB5/BlYu7UcjFW9NU= github.com/keboola/go-jsonnet v0.18.1-0.20220810085752-aee7595aa305/go.mod h1:2YGGjrWbRtdL5/lAp1anatj8zA+y/fKT0lLWvTqk+/Q= github.com/keboola/go-utils v0.4.1 h1:70pOENI3hzVaYZpnL9Q+awxzK/DS521NR3tHG//Y2ZA= diff --git a/internal/pkg/cli/cmd/dbt/generate/profile.go b/internal/pkg/cli/cmd/dbt/generate/profile.go index 418580e537..18f9ec4b63 100644 --- a/internal/pkg/cli/cmd/dbt/generate/profile.go +++ b/internal/pkg/cli/cmd/dbt/generate/profile.go @@ -20,7 +20,7 @@ func ProfileCommand(p dependencies.Provider) *cobra.Command { } // Options - opts, err := d.Dialogs().AskDbtGenerateProfile(d) + opts, err := d.Dialogs().AskTargetName(d) if err != nil { return err } diff --git a/internal/pkg/cli/cmd/dbt/generate/sources.go b/internal/pkg/cli/cmd/dbt/generate/sources.go index 86e8edb339..a74589b28b 100644 --- a/internal/pkg/cli/cmd/dbt/generate/sources.go +++ b/internal/pkg/cli/cmd/dbt/generate/sources.go @@ -1,12 +1,11 @@ package generate import ( - "fmt" - "github.com/spf13/cobra" "github.com/keboola/keboola-as-code/internal/pkg/cli/dependencies" "github.com/keboola/keboola-as-code/internal/pkg/cli/helpmsg" + "github.com/keboola/keboola-as-code/pkg/lib/operation/dbt/generate/sources" ) func SourcesCommand(p dependencies.Provider) *cobra.Command { @@ -15,11 +14,23 @@ func SourcesCommand(p dependencies.Provider) *cobra.Command { Short: helpmsg.Read(`dbt/generate/sources/short`), Long: helpmsg.Read(`dbt/generate/sources/long`), RunE: func(cmd *cobra.Command, args []string) error { - return fmt.Errorf("not implemented") + d, err := p.DependenciesForRemoteCommand() + if err != nil { + return err + } + + // Options + opts, err := d.Dialogs().AskTargetName(d) + if err != nil { + return err + } + + return sources.Run(d.CommandCtx(), opts, d) }, } cmd.Flags().StringP("storage-api-host", "H", "", "storage API host, eg. \"connection.keboola.com\"") + cmd.Flags().StringP("target-name", "T", "", "target name of the profile") return cmd } diff --git a/internal/pkg/cli/dialog/dbt.go b/internal/pkg/cli/dialog/dbt.go index 8a47019ef3..8967a8b4c2 100644 --- a/internal/pkg/cli/dialog/dbt.go +++ b/internal/pkg/cli/dialog/dbt.go @@ -8,49 +8,32 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/cli/options" "github.com/keboola/keboola-as-code/internal/pkg/cli/prompt" - "github.com/keboola/keboola-as-code/internal/pkg/log" - "github.com/keboola/keboola-as-code/internal/pkg/model" - "github.com/keboola/keboola-as-code/pkg/lib/operation/dbt/generate/profile" ) -type dbtGenerateProfileDialogDeps interface { - Logger() log.Logger +type targetNameDialogDeps interface { Options() *options.Options - Components() *model.ComponentsMap } -type dbtGenerateProfileDialog struct { - *Dialogs - prompt prompt.Prompt - deps dbtGenerateProfileDialogDeps - out profile.Options +type TargetNameOptions struct { + Name string } -// AskDbtGenerateProfile - dialog for generating a dbt profile. -func (p *Dialogs) AskDbtGenerateProfile(deps dbtGenerateProfileDialogDeps) (profile.Options, error) { - return (&dbtGenerateProfileDialog{ - Dialogs: p, - prompt: p.Prompt, - deps: deps, - }).ask() -} - -func (d *dbtGenerateProfileDialog) ask() (profile.Options, error) { - // Target Name - if d.deps.Options().IsSet(`target-name`) { - d.out.TargetName = d.deps.Options().GetString(`target-name`) +func (p *Dialogs) AskTargetName(d targetNameDialogDeps) (TargetNameOptions, error) { + opts := TargetNameOptions{} + if d.Options().IsSet(`target-name`) { + opts.Name = d.Options().GetString(`target-name`) } else { - d.out.TargetName = d.askTargetName() + opts.Name = p.askTargetName() } - if err := validateTargetName(d.out.TargetName); err != nil { - return d.out, err + if err := validateTargetName(opts.Name); err != nil { + return opts, err } - return d.out, nil + return opts, nil } -func (d *dbtGenerateProfileDialog) askTargetName() string { - name, _ := d.prompt.Ask(&prompt.Question{ +func (p *Dialogs) askTargetName() string { + name, _ := p.Ask(&prompt.Question{ Label: `Target Name`, Description: "Please enter target name.\nAllowed characters: a-z, A-Z, 0-9, \"_\".", Validator: validateTargetName, diff --git a/pkg/lib/operation/dbt/generate/profile/operation.go b/pkg/lib/operation/dbt/generate/profile/operation.go index f9b006b051..93b60a4cc6 100644 --- a/pkg/lib/operation/dbt/generate/profile/operation.go +++ b/pkg/lib/operation/dbt/generate/profile/operation.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/otel/trace" "gopkg.in/yaml.v3" + "github.com/keboola/keboola-as-code/internal/pkg/cli/dialog" "github.com/keboola/keboola-as-code/internal/pkg/filesystem" "github.com/keboola/keboola-as-code/internal/pkg/log" "github.com/keboola/keboola-as-code/internal/pkg/telemetry" @@ -19,13 +20,9 @@ type dependencies interface { Tracer() trace.Tracer } -type Options struct { - TargetName string -} - const profilePath = "profiles.yml" -func Run(ctx context.Context, opts Options, d dependencies) (err error) { +func Run(ctx context.Context, opts dialog.TargetNameOptions, d dependencies) (err error) { ctx, span := d.Tracer().Start(ctx, "kac.lib.operation.dbt.generate.profile") defer telemetry.EndSpan(span, &err) @@ -47,11 +44,11 @@ func Run(ctx context.Context, opts Options, d dependencies) (err error) { } logger := d.Logger() - targetUpper := strings.ToUpper(opts.TargetName) + targetUpper := strings.ToUpper(opts.Name) profileDetails := map[string]interface{}{ - "target": opts.TargetName, + "target": opts.Name, "outputs": map[string]interface{}{ - opts.TargetName: map[string]interface{}{ + opts.Name: map[string]interface{}{ "account": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_ACCOUNT\") }}", targetUpper), "database": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_DATABASE\") }}", targetUpper), "password": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_PASSWORD\") }}", targetUpper), diff --git a/pkg/lib/operation/dbt/generate/sources/operation.go b/pkg/lib/operation/dbt/generate/sources/operation.go new file mode 100644 index 0000000000..10d8a12371 --- /dev/null +++ b/pkg/lib/operation/dbt/generate/sources/operation.go @@ -0,0 +1,121 @@ +package sources + +import ( + "context" + "fmt" + "strings" + + "github.com/keboola/go-client/pkg/client" + "github.com/keboola/go-client/pkg/storageapi" + "go.opentelemetry.io/otel/trace" + "gopkg.in/yaml.v3" + + "github.com/keboola/keboola-as-code/internal/pkg/cli/dialog" + "github.com/keboola/keboola-as-code/internal/pkg/filesystem" + "github.com/keboola/keboola-as-code/internal/pkg/log" + "github.com/keboola/keboola-as-code/internal/pkg/telemetry" +) + +type dependencies interface { + Fs() filesystem.Fs + Logger() log.Logger + StorageApiClient() client.Sender + Tracer() trace.Tracer +} + +const sourcesPath = "models/_sources" + +func Run(ctx context.Context, opts dialog.TargetNameOptions, d dependencies) (err error) { + ctx, span := d.Tracer().Start(ctx, "kac.lib.operation.dbt.generate.sources") + defer telemetry.EndSpan(span, &err) + + // Check that we are in dbt directory + if !d.Fs().Exists(`dbt_project.yml`) { + return fmt.Errorf(`missing file "dbt_project.yml" in the current directory`) + } + + if !d.Fs().Exists(sourcesPath) { + err = d.Fs().Mkdir(sourcesPath) + if err != nil { + return err + } + } + + tablesList, err := storageapi.ListTablesRequest(storageapi.WithBuckets()).Send(ctx, d.StorageApiClient()) + if err != nil { + return err + } + tablesByBuckets := tablesByBucketsMap(*tablesList) + + for bucketID, tables := range tablesByBuckets { + sourcesDef := generateSourcesDefinition(opts.Name, bucketID, tables) + yamlEnc, err := yaml.Marshal(&sourcesDef) + if err != nil { + return err + } + err = d.Fs().WriteFile(filesystem.NewRawFile(fmt.Sprintf("%s/%s.yml", sourcesPath, bucketID), string(yamlEnc))) + if err != nil { + return err + } + } + + d.Logger().Infof(`Sources stored in "%s" directory.`, sourcesPath) + return nil +} + +func tablesByBucketsMap(tablesList []*storageapi.Table) map[storageapi.BucketID][]*storageapi.Table { + tablesByBuckets := make(map[storageapi.BucketID][]*storageapi.Table) + for _, table := range tablesList { + bucket, ok := tablesByBuckets[table.Bucket.ID] + if !ok { + bucket = make([]*storageapi.Table, 0) + } + bucket = append(bucket, table) + tablesByBuckets[table.Bucket.ID] = bucket + } + return tablesByBuckets +} + +func generateSourcesDefinition(targetName string, bucketID storageapi.BucketID, tablesList []*storageapi.Table) map[string]any { + sourceTables := make([]map[string]any, 0) + for _, table := range tablesList { + sourceTable := map[string]any{ + "name": table.Name, + "quoting": map[string]bool{ + "database": true, + "schema": true, + "identifier": true, + }, + } + if len(table.PrimaryKey) > 0 { + sourceColumns := make([]map[string]any, 0) + for _, primaryKey := range table.PrimaryKey { + sourceColumns = append(sourceColumns, map[string]any{ + "name": fmt.Sprintf(`"%s"`, primaryKey), + "tests": []string{"unique", "not_null"}, + }) + } + sourceTable["columns"] = sourceColumns + } + sourceTables = append(sourceTables, sourceTable) + } + + return map[string]any{ + "version": 2, + "sources": []map[string]any{ + { + "name": string(bucketID), + "freshness": map[string]any{ + "warn_after": map[string]any{ + "count": 1, + "period": "day", + }, + }, + "database": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_DATABASE\") }}", strings.ToUpper(targetName)), + "schema": string(bucketID), + "loaded_at_field": `"_timestamp"`, + "tables": sourceTables, + }, + }, + } +} diff --git a/pkg/lib/operation/dbt/generate/sources/operation_test.go b/pkg/lib/operation/dbt/generate/sources/operation_test.go new file mode 100644 index 0000000000..41de17220e --- /dev/null +++ b/pkg/lib/operation/dbt/generate/sources/operation_test.go @@ -0,0 +1,207 @@ +package sources + +import ( + "testing" + + "github.com/keboola/go-client/pkg/storageapi" + "github.com/stretchr/testify/assert" +) + +func TestTablesByBucketsMap(t *testing.T) { + t.Parallel() + + mainBucket := &storageapi.Bucket{ + ID: "out.c-main", + Uri: "/uri", + Name: "main", + DisplayName: "main", + Stage: "out", + Description: "", + Created: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + IsReadOnly: false, + DataSizeBytes: 0, + RowsCount: 0, + } + secondBucket := &storageapi.Bucket{ + ID: "out.c-second", + Uri: "/uri", + Name: "second", + DisplayName: "second", + Stage: "out", + Description: "", + Created: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + IsReadOnly: false, + DataSizeBytes: 0, + RowsCount: 0, + } + mainTable1 := &storageapi.Table{ + ID: "out.c-main.products", + Uri: "/uri", + Name: "products", + DisplayName: "Products", + PrimaryKey: nil, + Created: storageapi.Time{}, + LastImportDate: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + RowsCount: 0, + DataSizeBytes: 0, + Columns: nil, + Metadata: nil, + ColumnMetadata: nil, + Bucket: mainBucket, + } + mainTable2 := &storageapi.Table{ + ID: "out.c-main.categories", + Uri: "/uri", + Name: "categories", + DisplayName: "Categories", + PrimaryKey: nil, + Created: storageapi.Time{}, + LastImportDate: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + RowsCount: 0, + DataSizeBytes: 0, + Columns: nil, + Metadata: nil, + ColumnMetadata: nil, + Bucket: mainBucket, + } + secTable1 := &storageapi.Table{ + ID: "out.c-second.products", + Uri: "/uri", + Name: "products", + DisplayName: "Products", + PrimaryKey: nil, + Created: storageapi.Time{}, + LastImportDate: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + RowsCount: 0, + DataSizeBytes: 0, + Columns: nil, + Metadata: nil, + ColumnMetadata: nil, + Bucket: secondBucket, + } + secTable2 := &storageapi.Table{ + ID: "out.c-second.third", + Uri: "/uri", + Name: "third", + DisplayName: "Third", + PrimaryKey: nil, + Created: storageapi.Time{}, + LastImportDate: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + RowsCount: 0, + DataSizeBytes: 0, + Columns: nil, + Metadata: nil, + ColumnMetadata: nil, + Bucket: secondBucket, + } + in := []*storageapi.Table{mainTable1, secTable1, mainTable2, secTable2} + + res := tablesByBucketsMap(in) + assert.Equal(t, map[storageapi.BucketID][]*storageapi.Table{ + "out.c-main": {mainTable1, mainTable2}, + "out.c-second": {secTable1, secTable2}, + }, res) +} + +func TestGenerateSourcesDefinition(t *testing.T) { + t.Parallel() + + mainBucket := &storageapi.Bucket{ + ID: "out.c-main", + Uri: "/uri", + Name: "main", + DisplayName: "main", + Stage: "out", + Description: "", + Created: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + IsReadOnly: false, + DataSizeBytes: 0, + RowsCount: 0, + } + mainTable1 := &storageapi.Table{ + ID: "out.c-main.products", + Uri: "/uri", + Name: "products", + DisplayName: "Products", + PrimaryKey: []string{"primary1", "primary2"}, + Created: storageapi.Time{}, + LastImportDate: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + RowsCount: 0, + DataSizeBytes: 0, + Columns: nil, + Metadata: nil, + ColumnMetadata: nil, + Bucket: mainBucket, + } + mainTable2 := &storageapi.Table{ + ID: "out.c-main.categories", + Uri: "/uri", + Name: "categories", + DisplayName: "Categories", + PrimaryKey: nil, + Created: storageapi.Time{}, + LastImportDate: storageapi.Time{}, + LastChangeDate: storageapi.Time{}, + RowsCount: 0, + DataSizeBytes: 0, + Columns: nil, + Metadata: nil, + ColumnMetadata: nil, + Bucket: mainBucket, + } + + res := generateSourcesDefinition("target1", "out.c-main", []*storageapi.Table{mainTable1, mainTable2}) + assert.Equal(t, map[string]any{ + "version": 2, + "sources": []map[string]any{ + { + "name": "out.c-main", + "freshness": map[string]any{ + "warn_after": map[string]any{ + "count": 1, + "period": "day", + }, + }, + "database": "{{ env_var(\"DBT_KBC_TARGET1_DATABASE\") }}", + "schema": "out.c-main", + "loaded_at_field": `"_timestamp"`, + "tables": []map[string]any{ + { + "name": "products", + "quoting": map[string]bool{ + "database": true, + "schema": true, + "identifier": true, + }, + "columns": []map[string]any{ + { + "name": `"primary1"`, + "tests": []string{"unique", "not_null"}, + }, + { + "name": `"primary2"`, + "tests": []string{"unique", "not_null"}, + }, + }, + }, + { + "name": "categories", + "quoting": map[string]bool{ + "database": true, + "schema": true, + "identifier": true, + }, + }, + }, + }, + }, + }, res) +} diff --git a/test/cli/help/dbt-generate-sources/expected-stdout b/test/cli/help/dbt-generate-sources/expected-stdout index 70af5dfc1b..58062bd9b9 100644 --- a/test/cli/help/dbt-generate-sources/expected-stdout +++ b/test/cli/help/dbt-generate-sources/expected-stdout @@ -5,6 +5,7 @@ Usage: Flags: -H, --storage-api-host string storage API host, eg. "connection.keboola.com" + -T, --target-name string target name of the profile Global Flags: -h, --help print help for command @@ -12,4 +13,4 @@ Global Flags: -t, --storage-api-token string storage API token from your project -v, --verbose print details --verbose-api log each API request and response - -d, --working-dir string use other working directory \ No newline at end of file + -d, --working-dir string use other working directory