Skip to content

Commit

Permalink
Generate dbt sources
Browse files Browse the repository at this point in the history
  • Loading branch information
JakubMatejka committed Sep 26, 2022
1 parent 39a7283 commit febc5bd
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/jarcoal/httpmock v1.2.0
github.com/joho/godotenv v1.3.0
github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629
github.com/keboola/go-client v0.5.2
github.com/keboola/go-client v0.5.3
github.com/keboola/go-utils v0.4.1
github.com/kylelemons/godebug v1.1.0
github.com/nhatthm/aferocopy v1.2.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/keboola/go-client v0.5.1 h1:7MJoDLmOIVyXYVj0RdzAF+RwZ36WUyh8jTt/RLPBqnc=
github.com/keboola/go-client v0.5.1/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U=
github.com/keboola/go-client v0.5.2 h1:kVfajbEMHc73cU4oDweSldhzz8jveuu+y91uhZPww1Q=
github.com/keboola/go-client v0.5.2/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U=
github.com/keboola/go-client v0.5.3 h1:/9BudTQmyGk1BXLx+xn5Fs7yVMsi9VjY4171ABVRXwA=
github.com/keboola/go-client v0.5.3/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U=
github.com/keboola/go-jsonnet v0.18.1-0.20220810085752-aee7595aa305 h1:hS/jX0HQzeGZWjMBSAs/IqrJTrTB5/BlYu7UcjFW9NU=
github.com/keboola/go-jsonnet v0.18.1-0.20220810085752-aee7595aa305/go.mod h1:2YGGjrWbRtdL5/lAp1anatj8zA+y/fKT0lLWvTqk+/Q=
github.com/keboola/go-utils v0.4.1 h1:70pOENI3hzVaYZpnL9Q+awxzK/DS521NR3tHG//Y2ZA=
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/cli/cmd/dbt/generate/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ProfileCommand(p dependencies.Provider) *cobra.Command {
}

// Options
opts, err := d.Dialogs().AskDbtGenerateProfile(d)
opts, err := d.Dialogs().AskTargetName(d)
if err != nil {
return err
}
Expand Down
17 changes: 14 additions & 3 deletions internal/pkg/cli/cmd/dbt/generate/sources.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package generate

import (
"fmt"

"github.com/spf13/cobra"

"github.com/keboola/keboola-as-code/internal/pkg/cli/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/cli/helpmsg"
"github.com/keboola/keboola-as-code/pkg/lib/operation/dbt/generate/sources"
)

func SourcesCommand(p dependencies.Provider) *cobra.Command {
Expand All @@ -15,11 +14,23 @@ func SourcesCommand(p dependencies.Provider) *cobra.Command {
Short: helpmsg.Read(`dbt/generate/sources/short`),
Long: helpmsg.Read(`dbt/generate/sources/long`),
RunE: func(cmd *cobra.Command, args []string) error {
return fmt.Errorf("not implemented")
d, err := p.DependenciesForRemoteCommand()
if err != nil {
return err
}

// Options
opts, err := d.Dialogs().AskTargetName(d)
if err != nil {
return err
}

return sources.Run(d.CommandCtx(), opts, d)
},
}

cmd.Flags().StringP("storage-api-host", "H", "", "storage API host, eg. \"connection.keboola.com\"")
cmd.Flags().StringP("target-name", "T", "", "target name of the profile")

return cmd
}
43 changes: 13 additions & 30 deletions internal/pkg/cli/dialog/dbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,32 @@ import (

"github.com/keboola/keboola-as-code/internal/pkg/cli/options"
"github.com/keboola/keboola-as-code/internal/pkg/cli/prompt"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/model"
"github.com/keboola/keboola-as-code/pkg/lib/operation/dbt/generate/profile"
)

type dbtGenerateProfileDialogDeps interface {
Logger() log.Logger
type targetNameDialogDeps interface {
Options() *options.Options
Components() *model.ComponentsMap
}

type dbtGenerateProfileDialog struct {
*Dialogs
prompt prompt.Prompt
deps dbtGenerateProfileDialogDeps
out profile.Options
type TargetNameOptions struct {
Name string
}

// AskDbtGenerateProfile - dialog for generating a dbt profile.
func (p *Dialogs) AskDbtGenerateProfile(deps dbtGenerateProfileDialogDeps) (profile.Options, error) {
return (&dbtGenerateProfileDialog{
Dialogs: p,
prompt: p.Prompt,
deps: deps,
}).ask()
}

func (d *dbtGenerateProfileDialog) ask() (profile.Options, error) {
// Target Name
if d.deps.Options().IsSet(`target-name`) {
d.out.TargetName = d.deps.Options().GetString(`target-name`)
func (p *Dialogs) AskTargetName(d targetNameDialogDeps) (TargetNameOptions, error) {
opts := TargetNameOptions{}
if d.Options().IsSet(`target-name`) {
opts.Name = d.Options().GetString(`target-name`)
} else {
d.out.TargetName = d.askTargetName()
opts.Name = p.askTargetName()
}
if err := validateTargetName(d.out.TargetName); err != nil {
return d.out, err
if err := validateTargetName(opts.Name); err != nil {
return opts, err
}

return d.out, nil
return opts, nil
}

func (d *dbtGenerateProfileDialog) askTargetName() string {
name, _ := d.prompt.Ask(&prompt.Question{
func (p *Dialogs) askTargetName() string {
name, _ := p.Ask(&prompt.Question{
Label: `Target Name`,
Description: "Please enter target name.\nAllowed characters: a-z, A-Z, 0-9, \"_\".",
Validator: validateTargetName,
Expand Down
13 changes: 5 additions & 8 deletions pkg/lib/operation/dbt/generate/profile/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/otel/trace"
"gopkg.in/yaml.v3"

"github.com/keboola/keboola-as-code/internal/pkg/cli/dialog"
"github.com/keboola/keboola-as-code/internal/pkg/filesystem"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
Expand All @@ -19,13 +20,9 @@ type dependencies interface {
Tracer() trace.Tracer
}

type Options struct {
TargetName string
}

const profilePath = "profiles.yml"

func Run(ctx context.Context, opts Options, d dependencies) (err error) {
func Run(ctx context.Context, opts dialog.TargetNameOptions, d dependencies) (err error) {
ctx, span := d.Tracer().Start(ctx, "kac.lib.operation.dbt.generate.profile")
defer telemetry.EndSpan(span, &err)

Expand All @@ -47,11 +44,11 @@ func Run(ctx context.Context, opts Options, d dependencies) (err error) {
}

logger := d.Logger()
targetUpper := strings.ToUpper(opts.TargetName)
targetUpper := strings.ToUpper(opts.Name)
profileDetails := map[string]interface{}{
"target": opts.TargetName,
"target": opts.Name,
"outputs": map[string]interface{}{
opts.TargetName: map[string]interface{}{
opts.Name: map[string]interface{}{
"account": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_ACCOUNT\") }}", targetUpper),
"database": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_DATABASE\") }}", targetUpper),
"password": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_PASSWORD\") }}", targetUpper),
Expand Down
121 changes: 121 additions & 0 deletions pkg/lib/operation/dbt/generate/sources/operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package sources

import (
"context"
"fmt"
"strings"

"github.com/keboola/go-client/pkg/client"
"github.com/keboola/go-client/pkg/storageapi"
"go.opentelemetry.io/otel/trace"
"gopkg.in/yaml.v3"

"github.com/keboola/keboola-as-code/internal/pkg/cli/dialog"
"github.com/keboola/keboola-as-code/internal/pkg/filesystem"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
)

type dependencies interface {
Fs() filesystem.Fs
Logger() log.Logger
StorageApiClient() client.Sender
Tracer() trace.Tracer
}

const sourcesPath = "models/_sources"

func Run(ctx context.Context, opts dialog.TargetNameOptions, d dependencies) (err error) {
ctx, span := d.Tracer().Start(ctx, "kac.lib.operation.dbt.generate.sources")
defer telemetry.EndSpan(span, &err)

// Check that we are in dbt directory
if !d.Fs().Exists(`dbt_project.yml`) {
return fmt.Errorf(`missing file "dbt_project.yml" in the current directory`)
}

if !d.Fs().Exists(sourcesPath) {
err = d.Fs().Mkdir(sourcesPath)
if err != nil {
return err
}
}

tablesList, err := storageapi.ListTablesRequest(storageapi.WithBuckets()).Send(ctx, d.StorageApiClient())
if err != nil {
return err
}
tablesByBuckets := tablesByBucketsMap(*tablesList)

for bucketID, tables := range tablesByBuckets {
sourcesDef := generateSourcesDefinition(opts.Name, bucketID, tables)
yamlEnc, err := yaml.Marshal(&sourcesDef)
if err != nil {
return err
}
err = d.Fs().WriteFile(filesystem.NewRawFile(fmt.Sprintf("%s/%s.yml", sourcesPath, bucketID), string(yamlEnc)))
if err != nil {
return err
}
}

d.Logger().Infof(`Sources stored in "%s" directory.`, sourcesPath)
return nil
}

func tablesByBucketsMap(tablesList []*storageapi.Table) map[storageapi.BucketID][]*storageapi.Table {
tablesByBuckets := make(map[storageapi.BucketID][]*storageapi.Table)
for _, table := range tablesList {
bucket, ok := tablesByBuckets[table.Bucket.ID]
if !ok {
bucket = make([]*storageapi.Table, 0)
}
bucket = append(bucket, table)
tablesByBuckets[table.Bucket.ID] = bucket
}
return tablesByBuckets
}

func generateSourcesDefinition(targetName string, bucketID storageapi.BucketID, tablesList []*storageapi.Table) map[string]any {
sourceTables := make([]map[string]any, 0)
for _, table := range tablesList {
sourceTable := map[string]any{
"name": table.Name,
"quoting": map[string]bool{
"database": true,
"schema": true,
"identifier": true,
},
}
if len(table.PrimaryKey) > 0 {
sourceColumns := make([]map[string]any, 0)
for _, primaryKey := range table.PrimaryKey {
sourceColumns = append(sourceColumns, map[string]any{
"name": fmt.Sprintf(`"%s"`, primaryKey),
"tests": []string{"unique", "not_null"},
})
}
sourceTable["columns"] = sourceColumns
}
sourceTables = append(sourceTables, sourceTable)
}

return map[string]any{
"version": 2,
"sources": []map[string]any{
{
"name": string(bucketID),
"freshness": map[string]any{
"warn_after": map[string]any{
"count": 1,
"period": "day",
},
},
"database": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_DATABASE\") }}", strings.ToUpper(targetName)),
"schema": string(bucketID),
"loaded_at_field": `"_timestamp"`,
"tables": sourceTables,
},
},
}
}
Loading

0 comments on commit febc5bd

Please sign in to comment.