diff --git a/cmd/meroxa/root/flink/deploy.go b/cmd/meroxa/root/flink/deploy.go index b53f36470..d54512aab 100644 --- a/cmd/meroxa/root/flink/deploy.go +++ b/cmd/meroxa/root/flink/deploy.go @@ -29,6 +29,7 @@ import ( "github.com/meroxa/cli/log" "github.com/meroxa/cli/utils" "github.com/meroxa/meroxa-go/pkg/meroxa" + "github.com/meroxa/turbine-core/pkg/ir" ) type deployFlinkJobClient interface { @@ -104,18 +105,18 @@ func (d *Deploy) Execute(ctx context.Context) error { return fmt.Errorf("please provide a JAR file to the --jar flag") } + name := d.args.Name + if name == "" { + return fmt.Errorf("the name of your Flink Job be provided as an argument") + } + secrets := utils.StringSliceToStringMap(d.flags.Secrets) spec, err := flink.GetIRSpec(ctx, jarPath, secrets, d.logger) if err != nil { - fmt.Printf("failed to extract IR spec: %v\n", err) + d.logger.Warnf(ctx, "failed to extract IR spec: %v\n", err) // non-blocking } - name := d.args.Name - if name == "" { - return fmt.Errorf("the name of your Flink Job be provided as an argument") - } - filename := filepath.Base(jarPath) d.logger.StartSpinner("\t", "Fetching Meroxa Platform source...") source, err := d.client.CreateSourceV2(ctx, &meroxa.CreateSourceInputV2{Filename: filename}) @@ -126,17 +127,37 @@ func (d *Deploy) Execute(ctx context.Context) error { } d.logger.StopSpinnerWithStatus("Platform source fetched", log.Successful) + // Logging happens inside UploadFile err = turbine.UploadFile(ctx, d.logger, jarPath, source.PutUrl) if err != nil { return err } - d.logger.StartSpinner("\t", "Creating Flink job...") input := &meroxa.CreateFlinkJobInput{Name: name, JarURL: source.GetUrl} + err = d.addIntegrations(ctx, spec, input) + if err != nil { + return err + } + + d.logger.StartSpinner("\t", "Creating Flink job...") + fj, err := d.client.CreateFlinkJob(ctx, input) + if err != nil { + d.logger.Errorf(ctx, "\t 𐄂 Unable to create Flink job") + d.logger.StopSpinnerWithStatus("\t", log.Failed) + return err + } + + d.logger.StopSpinnerWithStatus("Flink job created", log.Successful) + d.logger.JSON(ctx, fj) + return nil +} + +func (d *Deploy) addIntegrations(ctx context.Context, spec *ir.DeploymentSpec, input *meroxa.CreateFlinkJobInput) error { + d.logger.StartSpinner("\t", "Checking Meroxa integrations...") + successMsg := "Finished checking Meroxa integrations" if spec != nil { - d.logger.StartSpinner("\t", "Adding Meroxa integrations to request...") var bytes []byte - bytes, err = json.Marshal(spec) + bytes, err := json.Marshal(spec) if err != nil { d.logger.Errorf(ctx, "\t 𐄂 Unable to add Meroxa integrations to request") d.logger.StopSpinnerWithStatus("\t", log.Failed) @@ -148,19 +169,10 @@ func (d *Deploy) Execute(ctx context.Context) error { d.logger.StopSpinnerWithStatus("\t", log.Failed) return unmarshalErr } - + successMsg = "Added Meroxa integrations to request" input.Spec = inputSpec input.SpecVersion = spec.Definition.Metadata.SpecVersion } - - fj, err := d.client.CreateFlinkJob(ctx, input) - if err != nil { - d.logger.Errorf(ctx, "\t 𐄂 Unable to create Flink job") - d.logger.StopSpinnerWithStatus("\t", log.Failed) - return err - } - - d.logger.StopSpinnerWithStatus("Flink job created", log.Successful) - d.logger.JSON(ctx, fj) + d.logger.StopSpinnerWithStatus(successMsg, log.Successful) return nil }