Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate dbt sources #837

Merged
merged 2 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/jarcoal/httpmock v1.2.0
github.com/joho/godotenv v1.3.0
github.com/jpillora/longestcommon v0.0.0-20161227235612-adb9d91ee629
github.com/keboola/go-client v0.5.2
github.com/keboola/go-client v0.5.3
github.com/keboola/go-utils v0.4.1
github.com/kylelemons/godebug v1.1.0
github.com/nhatthm/aferocopy v1.2.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/keboola/go-client v0.5.1 h1:7MJoDLmOIVyXYVj0RdzAF+RwZ36WUyh8jTt/RLPBqnc=
github.com/keboola/go-client v0.5.1/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U=
github.com/keboola/go-client v0.5.2 h1:kVfajbEMHc73cU4oDweSldhzz8jveuu+y91uhZPww1Q=
github.com/keboola/go-client v0.5.2/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U=
github.com/keboola/go-client v0.5.3 h1:/9BudTQmyGk1BXLx+xn5Fs7yVMsi9VjY4171ABVRXwA=
github.com/keboola/go-client v0.5.3/go.mod h1:YPQenBSf8SQhXu8/KvpX/rcPgI7nE6p4iiI7sY5hL6U=
github.com/keboola/go-jsonnet v0.18.1-0.20220810085752-aee7595aa305 h1:hS/jX0HQzeGZWjMBSAs/IqrJTrTB5/BlYu7UcjFW9NU=
github.com/keboola/go-jsonnet v0.18.1-0.20220810085752-aee7595aa305/go.mod h1:2YGGjrWbRtdL5/lAp1anatj8zA+y/fKT0lLWvTqk+/Q=
github.com/keboola/go-utils v0.4.1 h1:70pOENI3hzVaYZpnL9Q+awxzK/DS521NR3tHG//Y2ZA=
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/cli/cmd/dbt/generate/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ProfileCommand(p dependencies.Provider) *cobra.Command {
}

// Options
opts, err := d.Dialogs().AskDbtGenerateProfile(d)
opts, err := d.Dialogs().AskTargetName(d)
JakubMatejka marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down
17 changes: 14 additions & 3 deletions internal/pkg/cli/cmd/dbt/generate/sources.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package generate

import (
"fmt"

"github.com/spf13/cobra"

"github.com/keboola/keboola-as-code/internal/pkg/cli/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/cli/helpmsg"
"github.com/keboola/keboola-as-code/pkg/lib/operation/dbt/generate/sources"
)

func SourcesCommand(p dependencies.Provider) *cobra.Command {
Expand All @@ -15,11 +14,23 @@ func SourcesCommand(p dependencies.Provider) *cobra.Command {
Short: helpmsg.Read(`dbt/generate/sources/short`),
Long: helpmsg.Read(`dbt/generate/sources/long`),
RunE: func(cmd *cobra.Command, args []string) error {
return fmt.Errorf("not implemented")
d, err := p.DependenciesForRemoteCommand()
if err != nil {
return err
}

// Options
opts, err := d.Dialogs().AskTargetName(d)
if err != nil {
return err
}

return sources.Run(d.CommandCtx(), opts, d)
},
}

cmd.Flags().StringP("storage-api-host", "H", "", "storage API host, eg. \"connection.keboola.com\"")
cmd.Flags().StringP("target-name", "T", "", "target name of the profile")

return cmd
}
43 changes: 13 additions & 30 deletions internal/pkg/cli/dialog/dbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,32 @@ import (

"github.com/keboola/keboola-as-code/internal/pkg/cli/options"
"github.com/keboola/keboola-as-code/internal/pkg/cli/prompt"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/model"
"github.com/keboola/keboola-as-code/pkg/lib/operation/dbt/generate/profile"
)

type dbtGenerateProfileDialogDeps interface {
Logger() log.Logger
type targetNameDialogDeps interface {
Options() *options.Options
Components() *model.ComponentsMap
}

type dbtGenerateProfileDialog struct {
*Dialogs
prompt prompt.Prompt
deps dbtGenerateProfileDialogDeps
out profile.Options
type TargetNameOptions struct {
Name string
}

// AskDbtGenerateProfile - dialog for generating a dbt profile.
func (p *Dialogs) AskDbtGenerateProfile(deps dbtGenerateProfileDialogDeps) (profile.Options, error) {
return (&dbtGenerateProfileDialog{
Dialogs: p,
prompt: p.Prompt,
deps: deps,
}).ask()
}

func (d *dbtGenerateProfileDialog) ask() (profile.Options, error) {
// Target Name
if d.deps.Options().IsSet(`target-name`) {
d.out.TargetName = d.deps.Options().GetString(`target-name`)
func (p *Dialogs) AskTargetName(d targetNameDialogDeps) (TargetNameOptions, error) {
opts := TargetNameOptions{}
if d.Options().IsSet(`target-name`) {
opts.Name = d.Options().GetString(`target-name`)
} else {
d.out.TargetName = d.askTargetName()
opts.Name = p.askTargetName()
}
if err := validateTargetName(d.out.TargetName); err != nil {
return d.out, err
if err := validateTargetName(opts.Name); err != nil {
return opts, err
}

return d.out, nil
return opts, nil
}

func (d *dbtGenerateProfileDialog) askTargetName() string {
name, _ := d.prompt.Ask(&prompt.Question{
func (p *Dialogs) askTargetName() string {
name, _ := p.Ask(&prompt.Question{
Label: `Target Name`,
Description: "Please enter target name.\nAllowed characters: a-z, A-Z, 0-9, \"_\".",
Validator: validateTargetName,
Expand Down
13 changes: 5 additions & 8 deletions pkg/lib/operation/dbt/generate/profile/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/otel/trace"
"gopkg.in/yaml.v3"

"github.com/keboola/keboola-as-code/internal/pkg/cli/dialog"
"github.com/keboola/keboola-as-code/internal/pkg/filesystem"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
Expand All @@ -19,13 +20,9 @@ type dependencies interface {
Tracer() trace.Tracer
}

type Options struct {
TargetName string
}

const profilePath = "profiles.yml"

func Run(ctx context.Context, opts Options, d dependencies) (err error) {
func Run(ctx context.Context, opts dialog.TargetNameOptions, d dependencies) (err error) {
ctx, span := d.Tracer().Start(ctx, "kac.lib.operation.dbt.generate.profile")
defer telemetry.EndSpan(span, &err)

Expand All @@ -47,11 +44,11 @@ func Run(ctx context.Context, opts Options, d dependencies) (err error) {
}

logger := d.Logger()
targetUpper := strings.ToUpper(opts.TargetName)
targetUpper := strings.ToUpper(opts.Name)
profileDetails := map[string]interface{}{
"target": opts.TargetName,
"target": opts.Name,
"outputs": map[string]interface{}{
opts.TargetName: map[string]interface{}{
opts.Name: map[string]interface{}{
"account": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_ACCOUNT\") }}", targetUpper),
"database": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_DATABASE\") }}", targetUpper),
"password": fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_PASSWORD\") }}", targetUpper),
Expand Down
158 changes: 158 additions & 0 deletions pkg/lib/operation/dbt/generate/sources/operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package sources

import (
"context"
"fmt"
"strings"

"github.com/keboola/go-client/pkg/client"
"github.com/keboola/go-client/pkg/storageapi"
"go.opentelemetry.io/otel/trace"
"gopkg.in/yaml.v3"

"github.com/keboola/keboola-as-code/internal/pkg/cli/dialog"
"github.com/keboola/keboola-as-code/internal/pkg/filesystem"
"github.com/keboola/keboola-as-code/internal/pkg/log"
"github.com/keboola/keboola-as-code/internal/pkg/telemetry"
)

type dependencies interface {
Fs() filesystem.Fs
Logger() log.Logger
StorageApiClient() client.Sender
Tracer() trace.Tracer
}

const sourcesPath = "models/_sources"

type sourceFile struct {
version int `yaml:"version"`
sources []source `yaml:"sources"`
}

type source struct {
name string `yaml:"name"`
freshness sourceFreshness `yaml:"freshness"`
database string `yaml:"database"`
schema string `yaml:"schema"`
loadedAtField string `yaml:"loaded_at_field"` //nolint:tagliatelle
tables []sourceTable `yaml:"tables"`
}

type sourceTable struct {
name string `yaml:"name"`
quoting sourceTableQuoting `yaml:"quoting"`
columns []sourceTableColumn `yaml:"columns"`
}

type sourceTableColumn struct {
name string `yaml:"name"`
tests []string `yaml:"tests"`
}

type sourceTableQuoting struct {
database bool `yaml:"database"`
schema bool `yaml:"schema"`
identifier bool `yaml:"identifier"`
}

type sourceFreshness struct {
warnAfter sourceFreshnessWarnAfter `yaml:"warn_after"` //nolint:tagliatelle
}

type sourceFreshnessWarnAfter struct {
count int `yaml:"count"`
period string `yaml:"period"`
}

func Run(ctx context.Context, opts dialog.TargetNameOptions, d dependencies) (err error) {
ctx, span := d.Tracer().Start(ctx, "kac.lib.operation.dbt.generate.sources")
defer telemetry.EndSpan(span, &err)

// Check that we are in dbt directory
if !d.Fs().Exists(`dbt_project.yml`) {
return fmt.Errorf(`missing file "dbt_project.yml" in the current directory`)
}

if !d.Fs().Exists(sourcesPath) {
err = d.Fs().Mkdir(sourcesPath)
if err != nil {
return err
}
}

tablesList, err := storageapi.ListTablesRequest(storageapi.WithBuckets()).Send(ctx, d.StorageApiClient())
if err != nil {
return err
}
tablesByBuckets := tablesByBucketsMap(*tablesList)

for bucketID, tables := range tablesByBuckets {
sourcesDef := generateSourcesDefinition(opts.Name, bucketID, tables)
yamlEnc, err := yaml.Marshal(&sourcesDef)
if err != nil {
return err
}
err = d.Fs().WriteFile(filesystem.NewRawFile(fmt.Sprintf("%s/%s.yml", sourcesPath, bucketID), string(yamlEnc)))
if err != nil {
return err
}
}

d.Logger().Infof(`Sources stored in "%s" directory.`, sourcesPath)
return nil
}

func tablesByBucketsMap(tablesList []*storageapi.Table) map[storageapi.BucketID][]*storageapi.Table {
tablesByBuckets := make(map[storageapi.BucketID][]*storageapi.Table)
for _, table := range tablesList {
bucket, ok := tablesByBuckets[table.Bucket.ID]
if !ok {
bucket = make([]*storageapi.Table, 0)
}
bucket = append(bucket, table)
tablesByBuckets[table.Bucket.ID] = bucket
}
return tablesByBuckets
}

func generateSourcesDefinition(targetName string, bucketID storageapi.BucketID, tablesList []*storageapi.Table) sourceFile {
sourceTables := make([]sourceTable, 0)
for _, table := range tablesList {
sourceTable := sourceTable{
name: table.Name,
quoting: sourceTableQuoting{
database: true,
schema: true,
identifier: true,
},
}
if len(table.PrimaryKey) > 0 {
sourceColumns := make([]sourceTableColumn, 0)
for _, primaryKey := range table.PrimaryKey {
sourceColumns = append(sourceColumns, sourceTableColumn{
name: fmt.Sprintf(`"%s"`, primaryKey),
tests: []string{"unique", "not_null"},
})
}
sourceTable.columns = sourceColumns
}
sourceTables = append(sourceTables, sourceTable)
}
return sourceFile{
version: 2,
sources: []source{
{
name: string(bucketID),
freshness: sourceFreshness{sourceFreshnessWarnAfter{
count: 1,
period: "day",
}},
database: fmt.Sprintf("{{ env_var(\"DBT_KBC_%s_DATABASE\") }}", strings.ToUpper(targetName)),
schema: string(bucketID),
loadedAtField: `"_timestamp"`,
tables: sourceTables,
},
},
}
}
Loading