From a60c36ab3d49f4e1d57ceb99c8b10d5c796ef4d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Haris=20Osmanagi=C4=87?= Date: Wed, 6 Nov 2024 17:51:32 +0100 Subject: [PATCH] Basic `conduit init` and `conduit pipelines init` commands (#1927) --- .gitignore | 2 + cmd/cli/cli.go | 147 ++++++++ cmd/cli/conduit_init.go | 129 +++++++ cmd/cli/internal/yaml.go | 85 +++++ cmd/cli/pipeline.tmpl | 30 ++ cmd/cli/pipelines_init.go | 357 ++++++++++++++++++ cmd/conduit/main.go | 6 +- go.mod | 2 +- pkg/conduit/config.go | 17 +- pkg/conduit/entrypoint.go | 24 +- .../processor/builtin/impl/field/rename.go | 2 +- pkg/provisioning/service.go | 2 +- 12 files changed, 780 insertions(+), 23 deletions(-) create mode 100644 cmd/cli/cli.go create mode 100644 cmd/cli/conduit_init.go create mode 100644 cmd/cli/internal/yaml.go create mode 100644 cmd/cli/pipeline.tmpl create mode 100644 cmd/cli/pipelines_init.go diff --git a/.gitignore b/.gitignore index eb00ee4a3..8448637a5 100644 --- a/.gitignore +++ b/.gitignore @@ -97,3 +97,5 @@ pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm # this one is needed for integration tests !pkg/provisioning/test/source-file.txt + +golangci-report.xml \ No newline at end of file diff --git a/cmd/cli/cli.go b/cmd/cli/cli.go new file mode 100644 index 000000000..7849903d3 --- /dev/null +++ b/cmd/cli/cli.go @@ -0,0 +1,147 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cli + +import ( + "fmt" + "os" + + "github.com/conduitio/conduit/pkg/conduit" + "github.com/spf13/cobra" +) + +var ( + initArgs InitArgs + pipelinesInitArgs PipelinesInitArgs +) + +type Instance struct { + rootCmd *cobra.Command +} + +// New creates a new CLI Instance. +func New() *Instance { + return &Instance{ + rootCmd: buildRootCmd(), + } +} + +func (i *Instance) Run() { + if err := i.rootCmd.Execute(); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +} + +func buildRootCmd() *cobra.Command { + cfg := conduit.DefaultConfig() + + cmd := &cobra.Command{ + Use: "conduit", + Short: "Conduit CLI", + Long: "Conduit CLI is a command-line that helps you interact with and manage Conduit.", + Version: conduit.Version(true), + Run: func(cmd *cobra.Command, args []string) { + e := &conduit.Entrypoint{} + e.Serve(cfg) + }, + } + cmd.CompletionOptions.DisableDefaultCmd = true + conduit.Flags(&cfg).VisitAll(cmd.Flags().AddGoFlag) + + // init + cmd.AddCommand(buildInitCmd()) + + // pipelines + cmd.AddGroup(&cobra.Group{ + ID: "pipelines", + Title: "Pipelines", + }) + cmd.AddCommand(buildPipelinesCmd()) + + return cmd +} + +func buildInitCmd() *cobra.Command { + initCmd := &cobra.Command{ + Use: "init", + Short: "Initialize Conduit with a configuration file and directories.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return NewConduitInit(initArgs).Run() + }, + } + initCmd.Flags().StringVar( + &initArgs.Path, + "config.path", + "", + "path where Conduit will be initialized", + ) + + return initCmd +} + +func buildPipelinesCmd() *cobra.Command { + pipelinesCmd := &cobra.Command{ + Use: "pipelines", + Short: "Initialize and manage pipelines", + Args: cobra.NoArgs, + GroupID: "pipelines", + } + + pipelinesCmd.AddCommand(buildPipelinesInitCmd()) + + return pipelinesCmd +} + +func buildPipelinesInitCmd() *cobra.Command { + pipelinesInitCmd := &cobra.Command{ + Use: "init [pipeline-name]", + Short: "Initialize an example pipeline.", + Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors +initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then +a simple and runnable generator-to-log pipeline is configured.`, + Args: cobra.MaximumNArgs(1), + Example: " conduit pipelines init awesome-pipeline-name --source postgres --destination kafka --path pipelines/pg-to-kafka.yaml", + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) > 0 { + pipelinesInitArgs.Name = args[0] + } + return NewPipelinesInit(pipelinesInitArgs).Run() + }, + } + + // Add flags to pipelines init command + pipelinesInitCmd.Flags().StringVar( + &pipelinesInitArgs.Source, + "source", + "", + "Source connector (any of the built-in connectors).", + ) + pipelinesInitCmd.Flags().StringVar( + &pipelinesInitArgs.Destination, + "destination", + "", + "Destination connector (any of the built-in connectors).", + ) + pipelinesInitCmd.Flags().StringVar( + &pipelinesInitArgs.Path, + "pipelines.path", + "./pipelines", + "Path where the pipeline will be saved.", + ) + + return pipelinesInitCmd +} diff --git a/cmd/cli/conduit_init.go b/cmd/cli/conduit_init.go new file mode 100644 index 000000000..e0d23ac2f --- /dev/null +++ b/cmd/cli/conduit_init.go @@ -0,0 +1,129 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cli + +import ( + "flag" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/conduitio/conduit/cmd/cli/internal" + "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/yaml/v3" +) + +type InitArgs struct { + Path string +} + +type ConduitInit struct { + args InitArgs +} + +func NewConduitInit(args InitArgs) *ConduitInit { + return &ConduitInit{args: args} +} + +func (i *ConduitInit) Run() error { + err := i.createDirs() + if err != nil { + return err + } + + err = i.createConfigYAML() + if err != nil { + return fmt.Errorf("failed to create config YAML: %w", err) + } + + fmt.Println(` +Conduit has been initialized! + +To quickly create an example pipeline, run 'conduit pipelines init'. +To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`) + + return nil +} + +func (i *ConduitInit) createConfigYAML() error { + cfgYAML := internal.NewYAMLTree() + i.conduitCfgFlags().VisitAll(func(f *flag.Flag) { + if i.isHiddenFlag(f.Name) { + return // hide flag from output + } + cfgYAML.Insert(f.Name, f.DefValue, f.Usage) + }) + + yamlData, err := yaml.Marshal(cfgYAML.Root) + if err != nil { + return cerrors.Errorf("error marshaling YAML: %w\n", err) + } + + path := filepath.Join(i.path(), "conduit.yaml") + err = os.WriteFile(path, yamlData, 0o600) + if err != nil { + return cerrors.Errorf("error writing conduit.yaml: %w", err) + } + fmt.Printf("Configuration file written to %v\n", path) + + return nil +} + +func (i *ConduitInit) createDirs() error { + dirs := []string{"processors", "connectors", "pipelines"} + + for _, dir := range dirs { + path := filepath.Join(i.path(), dir) + + // Attempt to create the directory, skipping if it already exists + if err := os.Mkdir(path, os.ModePerm); err != nil { + if os.IsExist(err) { + fmt.Printf("Directory '%s' already exists, skipping...\n", path) + continue + } + return fmt.Errorf("failed to create directory '%s': %w", path, err) + } + + fmt.Printf("Created directory: %s\n", path) + } + + return nil +} + +func (i *ConduitInit) isHiddenFlag(name string) bool { + return name == "dev" || + strings.HasPrefix(name, "dev.") || + conduit.DeprecatedFlags[name] +} + +func (i *ConduitInit) conduitCfgFlags() *flag.FlagSet { + cfg := conduit.DefaultConfigWithBasePath(i.path()) + return conduit.Flags(&cfg) +} + +func (i *ConduitInit) path() string { + if i.args.Path != "" { + return i.args.Path + } + + path, err := os.Getwd() + if err != nil { + panic(cerrors.Errorf("failed to get current working directory: %w", err)) + } + + return path +} diff --git a/cmd/cli/internal/yaml.go b/cmd/cli/internal/yaml.go new file mode 100644 index 000000000..28ef80f04 --- /dev/null +++ b/cmd/cli/internal/yaml.go @@ -0,0 +1,85 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "strings" + + "github.com/conduitio/yaml/v3" +) + +// YAMLTree represents a YAML document. +// It makes it possible to insert value nodes with comments. +type YAMLTree struct { + Root *yaml.Node +} + +func NewYAMLTree() *YAMLTree { + return &YAMLTree{ + Root: &yaml.Node{ + Kind: yaml.MappingNode, + }, + } +} + +// Insert adds a path with a value to the tree +func (t *YAMLTree) Insert(path, value, comment string) { + parts := strings.Split(path, ".") + current := t.Root + + // For each part of the path + for i, part := range parts { + // Create key node + keyNode := &yaml.Node{ + Kind: yaml.ScalarNode, + Value: part, + } + + // Find or create value node + var valueNode *yaml.Node + found := false + + // Look for existing key in current mapping + for i := 0; i < len(current.Content); i += 2 { + if current.Content[i].Value == part { + valueNode = current.Content[i+1] + found = true + break + } + } + + // If not found, create new node + if !found { + // If this is the last part, create scalar value node + if i == len(parts)-1 { + valueNode = &yaml.Node{ + Kind: yaml.ScalarNode, + Value: value, + } + keyNode.HeadComment = comment + } else { + // Otherwise create mapping node for nesting + valueNode = &yaml.Node{ + Kind: yaml.MappingNode, + } + } + // Add key-value pair to current node's content + current.Content = append(current.Content, keyNode, valueNode) + } + + // Move to next level + current = valueNode + } +} diff --git a/cmd/cli/pipeline.tmpl b/cmd/cli/pipeline.tmpl new file mode 100644 index 000000000..b2346c3ec --- /dev/null +++ b/cmd/cli/pipeline.tmpl @@ -0,0 +1,30 @@ +version: "2.2" +pipelines: + - id: example-pipeline + status: running + name: "{{ .Name }}" + connectors: + - id: example-source + type: source + plugin: "{{ .SourceSpec.Name }}" + {{ if gt (len .SourceSpec.Params) 0 -}} + settings: + {{- range $name, $param := .SourceSpec.Params }} + {{ formatParameterDescriptionYAML $param.Description }} + # Type: {{ $param.Type }} + # {{ formatParameterRequired $param }} + {{ $name }}: {{ formatParameterValueYAML $param.Default }} + {{- end }} + {{- end }} + - id: example-destination + type: destination + plugin: "{{ .DestinationSpec.Name }}" + {{ if gt (len .DestinationSpec.Params) 0 -}} + settings: + {{- range $name, $param := .DestinationSpec.Params }} + {{ formatParameterDescriptionYAML $param.Description }} + # Type: {{ $param.Type }} + # {{ formatParameterRequired $param }} + {{ $name }}: {{ formatParameterValueYAML $param.Default }} + {{- end }} + {{- end }} diff --git a/cmd/cli/pipelines_init.go b/cmd/cli/pipelines_init.go new file mode 100644 index 000000000..2094b162f --- /dev/null +++ b/cmd/cli/pipelines_init.go @@ -0,0 +1,357 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cli + +import ( + _ "embed" + "fmt" + "log" + "os" + "path/filepath" + "strings" + "text/template" + + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/conduitio/conduit/pkg/plugin/connector/builtin" +) + +//go:embed pipeline.tmpl +var pipelineCfgTmpl string + +var funcMap = template.FuncMap{ + "formatParameterValueTable": formatParameterValueTable, + "formatParameterValueYAML": formatParameterValueYAML, + "formatParameterDescriptionYAML": formatParameterDescriptionYAML, + "formatParameterRequired": formatParameterRequired, +} + +func formatParameterRequired(param config.Parameter) string { + for _, v := range param.Validations { + if v.Type() == config.ValidationTypeRequired { + return "Required" + } + } + + return "Optional" +} + +// formatParameterValue formats the value of a configuration parameter. +func formatParameterValueTable(value string) string { + switch { + case value == "": + return `` + case strings.Contains(value, "\n"): + // specifically used in the javascript processor + return fmt.Sprintf("\n```js\n%s\n```\n", value) + default: + return fmt.Sprintf("`%s`", value) + } +} + +func formatParameterDescriptionYAML(description string) string { + const ( + indentLen = 10 + prefix = "# " + lineLen = 80 + tmpNewLine = "〠" + ) + + // remove markdown new lines + description = strings.ReplaceAll(description, "\n\n", tmpNewLine) + description = strings.ReplaceAll(description, "\n", " ") + description = strings.ReplaceAll(description, tmpNewLine, "\n") + + formattedDescription := formatMultiline(description, strings.Repeat(" ", indentLen)+prefix, lineLen) + // remove first indent and last new line + formattedDescription = formattedDescription[indentLen : len(formattedDescription)-1] + return formattedDescription +} + +func formatMultiline( + input string, + prefix string, + maxLineLen int, +) string { + textLen := maxLineLen - len(prefix) + + // split the input into lines of length textLen + lines := strings.Split(input, "\n") + var formattedLines []string + for _, line := range lines { + if len(line) <= textLen { + formattedLines = append(formattedLines, line) + continue + } + + // split the line into multiple lines, don't break words + words := strings.Fields(line) + var formattedLine string + for _, word := range words { + if len(formattedLine)+len(word) > textLen { + formattedLines = append(formattedLines, formattedLine[1:]) + formattedLine = "" + } + formattedLine += " " + word + } + if formattedLine != "" { + formattedLines = append(formattedLines, formattedLine[1:]) + } + } + + // combine lines including indent and prefix + var formatted string + for _, line := range formattedLines { + formatted += prefix + line + "\n" + } + + return formatted +} + +func formatParameterValueYAML(value string) string { + switch { + case value == "": + return `""` + case strings.Contains(value, "\n"): + // specifically used in the javascript processor + formattedValue := formatMultiline(value, " ", 10000) + return fmt.Sprintf("|\n%s", formattedValue) + default: + return fmt.Sprintf(`'%s'`, value) + } +} + +const ( + defaultDestination = "file" + defaultSource = "generator" +) + +type pipelineTemplate struct { + Name string + SourceSpec connectorTemplate + DestinationSpec connectorTemplate +} + +type connectorTemplate struct { + Name string + Params config.Parameters +} + +type PipelinesInitArgs struct { + Name string + Source string + Destination string + Path string +} + +type PipelinesInit struct { + args PipelinesInitArgs +} + +func NewPipelinesInit(args PipelinesInitArgs) *PipelinesInit { + return &PipelinesInit{args: args} +} + +func (pi *PipelinesInit) Run() error { + var pipeline pipelineTemplate + // if no source/destination arguments are provided, + // we build a runnable example pipeline + if pi.args.Source == "" && pi.args.Destination == "" { + pipeline = pi.buildDemoPipeline() + } else { + p, err := pi.buildTemplatePipeline() + if err != nil { + return err + } + pipeline = p + } + + err := pi.write(pipeline) + if err != nil { + return cerrors.Errorf("could not write pipeline: %w", err) + } + + fmt.Printf(`Your pipeline has been initialized and created at %s. + +To run the pipeline, execute: + +conduit --pipelines.path %s`, + pi.configFilePath(), pi.configFilePath()) + + return nil +} + +func (pi *PipelinesInit) buildTemplatePipeline() (pipelineTemplate, error) { + srcParams, err := pi.getSourceParams() + if err != nil { + return pipelineTemplate{}, cerrors.Errorf("failed getting source params: %w", err) + } + + dstParams, err := pi.getDestinationParams() + if err != nil { + return pipelineTemplate{}, cerrors.Errorf("failed getting destination params: %w", err) + } + + return pipelineTemplate{ + Name: pi.pipelineName(), + SourceSpec: srcParams, + DestinationSpec: dstParams, + }, nil +} + +func (pi *PipelinesInit) buildDemoPipeline() pipelineTemplate { + srcParams, _ := pi.getSourceParams() + dstParams, _ := pi.getDestinationParams() + + return pipelineTemplate{ + Name: pi.pipelineName(), + SourceSpec: connectorTemplate{ + Name: defaultSource, + Params: map[string]config.Parameter{ + "format.type": { + Description: srcParams.Params["format.type"].Description, + Type: srcParams.Params["format.type"].Type, + Default: "structured", + Validations: srcParams.Params["format.type"].Validations, + }, + "format.options.scheduledDeparture": { + Description: "Generate field 'scheduledDeparture' of type 'time'", + Type: config.ParameterTypeString, + Default: "time", + }, + "format.options.airline": { + Description: "Generate field 'airline' of type string", + Type: config.ParameterTypeString, + Default: "string", + }, + "rate": { + Description: srcParams.Params["rate"].Description, + Type: srcParams.Params["rate"].Type, + Default: "1", + }, + }, + }, + DestinationSpec: connectorTemplate{ + Name: defaultDestination, + Params: map[string]config.Parameter{ + "path": { + Description: dstParams.Params["path"].Description, + Type: dstParams.Params["path"].Type, + Default: "./destination.txt", + }, + }, + }, + } +} + +func (pi *PipelinesInit) getOutput() *os.File { + output, err := os.OpenFile(pi.configFilePath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + log.Fatalf("error: failed to open %s: %v", pi.args.Path, err) + } + + return output +} + +func (pi *PipelinesInit) write(pipeline pipelineTemplate) error { + t, err := template.New("").Funcs(funcMap).Option("missingkey=zero").Parse(pipelineCfgTmpl) + if err != nil { + return cerrors.Errorf("failed parsing template: %w", err) + } + + output := pi.getOutput() + defer output.Close() + + err = t.Execute(output, pipeline) + if err != nil { + return cerrors.Errorf("failed executing template: %w", err) + } + + return nil +} + +func (pi *PipelinesInit) getSourceParams() (connectorTemplate, error) { + for _, conn := range builtin.DefaultBuiltinConnectors { + specs := conn.NewSpecification() + if specs.Name == pi.sourceConnector() || specs.Name == "builtin:"+pi.sourceConnector() { + if conn.NewSource == nil { + return connectorTemplate{}, cerrors.Errorf("plugin %v has no source", pi.sourceConnector()) + } + + return connectorTemplate{ + Name: specs.Name, + Params: conn.NewSource().Parameters(), + }, nil + } + } + + return connectorTemplate{}, cerrors.Errorf("%v: %w", pi.sourceConnector(), plugin.ErrPluginNotFound) +} + +func (pi *PipelinesInit) getDestinationParams() (connectorTemplate, error) { + for _, conn := range builtin.DefaultBuiltinConnectors { + specs := conn.NewSpecification() + if specs.Name == pi.destinationConnector() || specs.Name == "builtin:"+pi.destinationConnector() { + if conn.NewDestination == nil { + return connectorTemplate{}, cerrors.Errorf("plugin %v has no source", pi.destinationConnector()) + } + + return connectorTemplate{ + Name: specs.Name, + Params: conn.NewDestination().Parameters(), + }, nil + } + } + + return connectorTemplate{}, cerrors.Errorf("%v: %w", pi.destinationConnector(), plugin.ErrPluginNotFound) +} + +func (pi *PipelinesInit) configFilePath() string { + path := pi.args.Path + if path == "" { + path = "./pipelines" + } + + return filepath.Join(path, pi.configFileName()) +} + +func (pi *PipelinesInit) configFileName() string { + return fmt.Sprintf("pipeline-%s.yaml", pi.pipelineName()) +} + +func (pi *PipelinesInit) sourceConnector() string { + if pi.args.Source != "" { + return pi.args.Source + } + + return defaultSource +} + +func (pi *PipelinesInit) destinationConnector() string { + if pi.args.Destination != "" { + return pi.args.Destination + } + + return defaultDestination +} + +func (pi *PipelinesInit) pipelineName() string { + if pi.args.Name != "" { + return pi.args.Name + } + + return fmt.Sprintf("%s-to-%s", pi.sourceConnector(), pi.destinationConnector()) +} diff --git a/cmd/conduit/main.go b/cmd/conduit/main.go index 89d784a5d..a744d7f3e 100644 --- a/cmd/conduit/main.go +++ b/cmd/conduit/main.go @@ -14,8 +14,10 @@ package main -import "github.com/conduitio/conduit/pkg/conduit" +import ( + "github.com/conduitio/conduit/cmd/cli" +) func main() { - conduit.Serve(conduit.DefaultConfig()) + cli.New().Run() } diff --git a/go.mod b/go.mod index b46ab9279..05eff486e 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.1 github.com/rs/zerolog v1.33.0 + github.com/spf13/cobra v1.8.1 github.com/stealthrocket/wazergo v0.19.1 github.com/tetratelabs/wazero v1.8.1 github.com/twmb/franz-go/pkg/sr v1.2.0 @@ -315,7 +316,6 @@ require ( github.com/sourcegraph/go-diff v0.7.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.7.0 // indirect - github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.19.0 // indirect github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 6d15cffd0..2d33618ba 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -16,6 +16,7 @@ package conduit import ( "os" + "path/filepath" "time" "github.com/conduitio/conduit-commons/database" @@ -119,12 +120,16 @@ type Config struct { } func DefaultConfig() Config { + return DefaultConfigWithBasePath(".") +} + +func DefaultConfigWithBasePath(basePath string) Config { var cfg Config cfg.DB.Type = DBTypeBadger - cfg.DB.Badger.Path = "conduit.db" + cfg.DB.Badger.Path = filepath.Join(basePath, "conduit.db") cfg.DB.Postgres.Table = "conduit_kv_store" - cfg.DB.SQLite.Path = "conduit.db" + cfg.DB.SQLite.Path = filepath.Join(basePath, "conduit.db") cfg.DB.SQLite.Table = "conduit_kv_store" cfg.API.Enabled = true @@ -135,11 +140,11 @@ func DefaultConfig() Config { cfg.Log.Level = "info" cfg.Log.Format = "cli" - cfg.Connectors.Path = "./connectors" + cfg.Connectors.Path = filepath.Join(basePath, "connectors") - cfg.Processors.Path = "./processors" + cfg.Processors.Path = filepath.Join(basePath, "processors") - cfg.Pipelines.Path = "./pipelines" + cfg.Pipelines.Path = filepath.Join(basePath, "pipelines") cfg.Pipelines.ErrorRecovery.MinDelay = time.Second cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 @@ -263,7 +268,7 @@ func (c Config) Validate() error { } // check if folder exists _, err = os.Stat(c.Pipelines.Path) - if c.Pipelines.Path != "./pipelines" && os.IsNotExist(err) { + if c.Pipelines.Path != "pipelines" && os.IsNotExist(err) { return invalidConfigFieldErr("pipelines.path") } diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index e175b3374..f55a598af 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -27,10 +27,8 @@ import ( "github.com/peterbourgon/ff/v3/ffyaml" ) -// Serve is a shortcut for Entrypoint.Serve. -func Serve(cfg Config) { - e := &Entrypoint{} - e.Serve(cfg) +var DeprecatedFlags = map[string]bool{ + "pipelines.exit-on-error": true, } const ( @@ -38,6 +36,12 @@ const ( exitCodeInterrupt = 2 ) +// Serve is a shortcut for Entrypoint.Serve. +func Serve(cfg Config) { + e := &Entrypoint{} + e.Serve(cfg) +} + // Entrypoint provides methods related to the Conduit entrypoint (parsing // config, managing interrupt signals etc.). type Entrypoint struct{} @@ -52,8 +56,9 @@ type Entrypoint struct{} // - environment variables // - config file (lowest priority) func (e *Entrypoint) Serve(cfg Config) { - flags := e.Flags(&cfg) + flags := Flags(&cfg) e.ParseConfig(flags) + if cfg.Log.Format == "cli" { _, _ = fmt.Fprintf(os.Stdout, "%s\n", e.Splash()) } @@ -73,7 +78,7 @@ func (e *Entrypoint) Serve(cfg Config) { // Flags returns a flag set that, when parsed, stores the values in the provided // config struct. -func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { +func Flags(cfg *Config) *flag.FlagSet { // TODO extract flags from config struct rather than defining flags manually flags := flag.NewFlagSet("conduit", flag.ExitOnError) @@ -162,16 +167,11 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file") flags.StringVar(&cfg.dev.blockprofile, "dev.blockprofile", "", "write block profile to file") - // Deprecated flags that are hidden from help output - deprecatedFlags := map[string]bool{ - "pipelines.exit-on-error": true, - } - // show user or dev flags flags.Usage = func() { tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError) flags.VisitAll(func(f *flag.Flag) { - if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp || deprecatedFlags[f.Name] { + if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp || DeprecatedFlags[f.Name] { return // hide flag from output } // reset value to its default, to ensure default is shown correctly diff --git a/pkg/plugin/processor/builtin/impl/field/rename.go b/pkg/plugin/processor/builtin/impl/field/rename.go index ad9300624..7f67c8510 100644 --- a/pkg/plugin/processor/builtin/impl/field/rename.go +++ b/pkg/plugin/processor/builtin/impl/field/rename.go @@ -18,6 +18,7 @@ package field import ( "context" + "slices" "strings" "github.com/conduitio/conduit-commons/config" @@ -26,7 +27,6 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal" - "golang.org/x/exp/slices" ) type renameProcessor struct { diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index 70088c2a1..58158b2c6 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -19,6 +19,7 @@ import ( "io/fs" "os" "path/filepath" + "slices" "sort" "strings" @@ -29,7 +30,6 @@ import ( "github.com/conduitio/conduit/pkg/pipeline" "github.com/conduitio/conduit/pkg/provisioning/config" "github.com/conduitio/conduit/pkg/provisioning/config/yaml" - "golang.org/x/exp/slices" ) type Service struct {