diff --git a/addons/keda/keda.go b/addons/keda/keda.go index 1c7ff2b56d..a8a0edf488 100644 --- a/addons/keda/keda.go +++ b/addons/keda/keda.go @@ -312,7 +312,7 @@ func (t *kedaTrait) getTopControllerReference(e *trait.Environment) *v1.ObjectRe func (t *kedaTrait) populateTriggersFromKamelets(e *trait.Environment) error { kameletURIs := make(map[string][]string) - _, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + _, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { for _, kameletURI := range meta.FromURIs { if kameletStr := source.ExtractKamelet(kameletURI); kameletStr != "" && camelv1.ValidKameletName(kameletStr) { kamelet := kameletStr diff --git a/addons/master/master.go b/addons/master/master.go index 242c7ee40b..42fc9d6453 100644 --- a/addons/master/master.go +++ b/addons/master/master.go @@ -95,7 +95,7 @@ func (t *masterTrait) Configure(e *trait.Environment) (bool, *trait.TraitConditi return ptr.Deref(t.Enabled, false), nil, nil } - enabled, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + enabled, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { found := false loop: for _, endpoint := range meta.FromURIs { diff --git a/e2e/common/misc/cron_test.go b/e2e/common/misc/cron_test.go deleted file mode 100644 index c86f35c00e..0000000000 --- a/e2e/common/misc/cron_test.go +++ /dev/null @@ -1,96 +0,0 @@ -//go:build integration -// +build integration - -// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" - -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package common - -import ( - "context" - "testing" - - . "github.com/onsi/gomega" - - corev1 "k8s.io/api/core/v1" - - . "github.com/apache/camel-k/v2/e2e/support" - v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" - - "github.com/stretchr/testify/assert" -) - -func TestRunCronExample(t *testing.T) { - t.Parallel() - - WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { - t.Run("cron-yaml", func(t *testing.T) { - name := RandomizedSuffixName("cron-yaml") - g.Expect(KamelRun(t, ctx, ns, "files/cron-yaml.yaml", "--name", name).Execute()).To(Succeed()) - g.Eventually(IntegrationCronJob(t, ctx, ns, name)).ShouldNot(BeNil()) - g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady)).Should(Equal(corev1.ConditionTrue)) - g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutMedium).Should(ContainSubstring("Magicstring!")) - g.Expect(Kamel(t, ctx, "delete", name, "-n", ns).Execute()).To(Succeed()) - }) - - t.Run("cron-timer", func(t *testing.T) { - name := RandomizedSuffixName("cron-timer") - g.Expect(KamelRun(t, ctx, ns, "files/cron-timer.yaml", "--name", name).Execute()).To(Succeed()) - g.Eventually(IntegrationCronJob(t, ctx, ns, name), TestTimeoutLong).ShouldNot(BeNil()) - g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady)).Should(Equal(corev1.ConditionTrue)) - g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutMedium).Should(ContainSubstring("Magicstring!")) - g.Expect(Kamel(t, ctx, "delete", name, "-n", ns).Execute()).To(Succeed()) - }) - - t.Run("cron-fallback", func(t *testing.T) { - name := RandomizedSuffixName("cron-fallback") - g.Expect(KamelRun(t, ctx, ns, "files/cron-fallback.yaml", "--name", name).Execute()).To(Succeed()) - g.Eventually(IntegrationPodPhase(t, ctx, ns, name)).Should(Equal(corev1.PodRunning)) - g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady)).Should(Equal(corev1.ConditionTrue)) - g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutMedium).Should(ContainSubstring("Magicstring!")) - g.Expect(Kamel(t, ctx, "delete", name, "-n", ns).Execute()).To(Succeed()) - }) - - t.Run("cron-quartz", func(t *testing.T) { - name := RandomizedSuffixName("cron-quartz") - g.Expect(KamelRun(t, ctx, ns, "files/cron-quartz.yaml", "--name", name).Execute()).To(Succeed()) - g.Eventually(IntegrationPodPhase(t, ctx, ns, name)).Should(Equal(corev1.PodRunning)) - g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady)).Should(Equal(corev1.ConditionTrue)) - g.Eventually(IntegrationLogs(t, ctx, ns, name), TestTimeoutMedium).Should(ContainSubstring("Magicstring!")) - g.Expect(Kamel(t, ctx, "delete", name, "-n", ns).Execute()).To(Succeed()) - }) - - t.Run("cron-trait-yaml", func(t *testing.T) { - name := RandomizedSuffixName("cron-trait-yaml") - g.Expect(KamelRun(t, ctx, ns, "files/cron-trait-yaml.yaml", "--name", name, "-t", "cron.enabled=true", "-t", "cron.schedule=0/2 * * * *").Execute()).To(Succeed()) - g.Eventually(IntegrationConditionStatus(t, ctx, ns, name, v1.IntegrationConditionReady)).Should(Equal(corev1.ConditionTrue)) - g.Eventually(IntegrationCronJob(t, ctx, ns, name)).ShouldNot(BeNil()) - - // Verify that `-t cron.schedule` overrides the schedule in the yaml - // - // kubectl get cronjobs -n test-de619ae2-eddc-4bac-86a6-53d80be030ea - // NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE - // cron-trait-yaml 0/2 * * * * False 0 38s - - cronJob := IntegrationCronJob(t, ctx, ns, name)() - assert.Equal(t, "0/2 * * * *", cronJob.Spec.Schedule) - g.Expect(Kamel(t, ctx, "delete", name, "-n", ns).Execute()).To(Succeed()) - }) - }) -} diff --git a/e2e/common/misc/files/cron-trait-yaml.yaml b/e2e/common/misc/files/cron-trait-yaml.yaml deleted file mode 100644 index 1016d624f0..0000000000 --- a/e2e/common/misc/files/cron-trait-yaml.yaml +++ /dev/null @@ -1,37 +0,0 @@ -# --------------------------------------------------------------------------- -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# --------------------------------------------------------------------------- - -- from: - uri: "cron:tab" - parameters: - # Every minute - schedule: "* * * * *" - steps: - - setHeader: - name: "m" - constant: "string!" - - setBody: - simple: "Magic${header.m}" - # Simulate a job workload - - delay: - expression: - constant: 20000 - asyncDelayed: false - - to: - uri: "log:info" - parameters: - show-all: "false" diff --git a/e2e/common/traits/cron_test.go b/e2e/common/traits/cron_test.go new file mode 100644 index 0000000000..dd5af649e5 --- /dev/null +++ b/e2e/common/traits/cron_test.go @@ -0,0 +1,110 @@ +//go:build integration +// +build integration + +// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration" + +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/ptr" + + . "github.com/apache/camel-k/v2/e2e/support" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" +) + +func TestRunCronExample(t *testing.T) { + t.Parallel() + WithNewTestNamespace(t, func(ctx context.Context, g *WithT, ns string) { + + t.Run("cron-timer", func(t *testing.T) { + g.Expect(KamelRun(t, ctx, ns, "files/cron-timer.yaml").Execute()).To(Succeed()) + g.Eventually(IntegrationCronJob(t, ctx, ns, "cron-timer"), TestTimeoutLong).ShouldNot(BeNil()) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-timer", v1.IntegrationConditionReady), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-timer", v1.IntegrationConditionCronJobAvailable), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + // As it's a cron, we expect it's triggered, executed and turned off + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-timer"), TestTimeoutMedium).Should(Equal(ptr.To(int32(1)))) + g.Eventually(IntegrationLogs(t, ctx, ns, "cron-timer")).Should(ContainSubstring("Magicstring!")) + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-timer")).Should(Equal(ptr.To(int32(0)))) + g.Eventually(DeleteIntegrations(t, ctx, ns)).Should(Equal(0)) + }) + + t.Run("cron-java", func(t *testing.T) { + g.Expect(KamelRun(t, ctx, ns, "files/CronJava.java").Execute()).To(Succeed()) + g.Eventually(IntegrationCronJob(t, ctx, ns, "cron-java"), TestTimeoutLong).ShouldNot(BeNil()) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-java", v1.IntegrationConditionReady), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-java", v1.IntegrationConditionCronJobAvailable), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + // As it's a cron, we expect it's triggered, executed and turned off + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-java"), TestTimeoutMedium).Should(Equal(ptr.To(int32(1)))) + g.Eventually(IntegrationLogs(t, ctx, ns, "cron-java")).Should(ContainSubstring("Magicstring!")) + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-java")).Should(Equal(ptr.To(int32(0)))) + g.Eventually(DeleteIntegrations(t, ctx, ns)).Should(Equal(0)) + }) + + t.Run("cron-tab", func(t *testing.T) { + g.Expect(KamelRun(t, ctx, ns, "files/cron-tab.yaml").Execute()).To(Succeed()) + g.Eventually(IntegrationCronJob(t, ctx, ns, "cron-tab"), TestTimeoutLong).ShouldNot(BeNil()) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-tab", v1.IntegrationConditionReady), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-tab", v1.IntegrationConditionCronJobAvailable), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + // As it's a cron, we expect it's triggered, executed and turned off + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-tab"), TestTimeoutMedium).Should(Equal(ptr.To(int32(1)))) + g.Eventually(IntegrationLogs(t, ctx, ns, "cron-tab")).Should(ContainSubstring("Magicstring!")) + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-tab")).Should(Equal(ptr.To(int32(0)))) + g.Eventually(DeleteIntegrations(t, ctx, ns)).Should(Equal(0)) + }) + + t.Run("cron-quartz", func(t *testing.T) { + g.Expect(KamelRun(t, ctx, ns, "files/cron-quartz.yaml").Execute()).To(Succeed()) + g.Eventually(IntegrationCronJob(t, ctx, ns, "cron-quartz"), TestTimeoutLong).ShouldNot(BeNil()) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-quartz", v1.IntegrationConditionReady), TestTimeoutShort).Should( + Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-quartz", v1.IntegrationConditionCronJobAvailable), TestTimeoutMedium).Should( + Equal(corev1.ConditionTrue)) + // As it's a cron, we expect it's triggered, executed and turned off + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-quartz"), TestTimeoutMedium).Should(Equal(ptr.To(int32(1)))) + g.Eventually(IntegrationLogs(t, ctx, ns, "cron-quartz")).Should(ContainSubstring("Magicstring!")) + g.Eventually(IntegrationStatusReplicas(t, ctx, ns, "cron-quartz")).Should(Equal(ptr.To(int32(0)))) + g.Eventually(DeleteIntegrations(t, ctx, ns)).Should(Equal(0)) + }) + + t.Run("cron-fallback", func(t *testing.T) { + g.Expect(KamelRun(t, ctx, ns, "files/cron-fallback.yaml").Execute()).To(Succeed()) + g.Eventually(IntegrationCronJob(t, ctx, ns, "cron-fallback"), TestTimeoutLong).Should(BeNil()) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-fallback", v1.IntegrationConditionReady), TestTimeoutShort).Should( + Equal(corev1.ConditionTrue)) + g.Eventually(IntegrationConditionStatus(t, ctx, ns, "cron-fallback", v1.IntegrationConditionCronJobAvailable), TestTimeoutMedium).Should( + Equal(corev1.ConditionFalse)) + g.Eventually(IntegrationPodPhase(t, ctx, ns, "cron-fallback")).Should(Equal(corev1.PodRunning)) + g.Eventually(IntegrationLogs(t, ctx, ns, "cron-fallback"), TestTimeoutShort).Should(ContainSubstring("Magicstring!")) + g.Eventually(DeleteIntegrations(t, ctx, ns)).Should(Equal(0)) + }) + + }) +} diff --git a/e2e/common/traits/files/CronJava.java b/e2e/common/traits/files/CronJava.java new file mode 100644 index 0000000000..c325f2693d --- /dev/null +++ b/e2e/common/traits/files/CronJava.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + import org.apache.camel.builder.RouteBuilder; + + public class CronJava extends RouteBuilder { + @Override + public void configure() throws Exception { + from("timer:java?period=60000") + .setHeader("m").constant("string!") + .setBody().simple("Magic${header.m}") + .log("${body}"); + } + } + diff --git a/e2e/common/misc/files/cron-fallback.yaml b/e2e/common/traits/files/cron-fallback.yaml similarity index 100% rename from e2e/common/misc/files/cron-fallback.yaml rename to e2e/common/traits/files/cron-fallback.yaml diff --git a/e2e/common/misc/files/cron-quartz.yaml b/e2e/common/traits/files/cron-quartz.yaml similarity index 97% rename from e2e/common/misc/files/cron-quartz.yaml rename to e2e/common/traits/files/cron-quartz.yaml index c80da789c3..cfa8db8252 100644 --- a/e2e/common/misc/files/cron-quartz.yaml +++ b/e2e/common/traits/files/cron-quartz.yaml @@ -20,7 +20,7 @@ from: uri: "quartz:trigger" parameters: - cron: "0/1 * * * * ?" + cron: "0 */1 * * * ?" steps: - setHeader: name: "m" diff --git a/e2e/common/misc/files/cron-yaml.yaml b/e2e/common/traits/files/cron-tab.yaml similarity index 100% rename from e2e/common/misc/files/cron-yaml.yaml rename to e2e/common/traits/files/cron-tab.yaml diff --git a/e2e/common/misc/files/cron-timer.yaml b/e2e/common/traits/files/cron-timer.yaml similarity index 100% rename from e2e/common/misc/files/cron-timer.yaml rename to e2e/common/traits/files/cron-timer.yaml diff --git a/pkg/apis/camel/v1/common_types.go b/pkg/apis/camel/v1/common_types.go index 561c896701..b5d7cb8a2b 100644 --- a/pkg/apis/camel/v1/common_types.go +++ b/pkg/apis/camel/v1/common_types.go @@ -448,6 +448,7 @@ type SourceSpec struct { Loader string `json:"loader,omitempty"` // Interceptors are optional identifiers the org.apache.camel.k.RoutesLoader // uses to pre/post process sources + // Deprecated: no longer in use. Interceptors []string `json:"interceptors,omitempty"` // Type defines the kind of source described by this object Type SourceType `json:"type,omitempty"` diff --git a/pkg/apis/camel/v1/integration_types_support.go b/pkg/apis/camel/v1/integration_types_support.go index 9683fef31b..c54a4e8867 100644 --- a/pkg/apis/camel/v1/integration_types_support.go +++ b/pkg/apis/camel/v1/integration_types_support.go @@ -18,28 +18,34 @@ limitations under the License. package v1 import ( + "bytes" + "encoding/json" "fmt" + "io" "regexp" "strings" + yaml2 "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/yaml" ) -// IntegrationLabel is used to tag k8s object created by a given Integration. -const IntegrationLabel = "camel.apache.org/integration" - -// IntegrationGenerationLabel is used to check on outdated integration resources that can be removed by garbage collection. -const IntegrationGenerationLabel = "camel.apache.org/generation" - -// IntegrationSyntheticLabel is used to tag k8s synthetic Integrations. -const IntegrationSyntheticLabel = "camel.apache.org/is-synthetic" - -// IntegrationImportedKindLabel specifies from what kind of resource an Integration was imported. -const IntegrationImportedKindLabel = "camel.apache.org/imported-from-kind" - -// IntegrationImportedNameLabel specifies from what resource an Integration was imported. -const IntegrationImportedNameLabel = "camel.apache.org/imported-from-name" +const ( + // IntegrationLabel is used to tag k8s object created by a given Integration. + IntegrationLabel = "camel.apache.org/integration" + // IntegrationGenerationLabel is used to check on outdated integration resources that can be removed by garbage collection. + IntegrationGenerationLabel = "camel.apache.org/generation" + // IntegrationSyntheticLabel is used to tag k8s synthetic Integrations. + IntegrationSyntheticLabel = "camel.apache.org/is-synthetic" + // IntegrationImportedKindLabel specifies from what kind of resource an Integration was imported. + IntegrationImportedKindLabel = "camel.apache.org/imported-from-kind" + // IntegrationImportedNameLabel specifies from what resource an Integration was imported. + IntegrationImportedNameLabel = "camel.apache.org/imported-from-name" + + // IntegrationFlowEmbeddedSourceName --. + IntegrationFlowEmbeddedSourceName = "camel-k-embedded-flow.yaml" +) func NewIntegration(namespace string, name string) Integration { return Integration{ @@ -74,18 +80,64 @@ func (in *Integration) Initialize() { } } -// Sources return a new slice containing all the sources associated to the integration. +// AllSources returns a new slice containing all the sources associated to the Integration. +// It merges any generated source, giving priority to this if the same +// source exist both in spec and status. func (in *Integration) AllSources() []SourceSpec { - sources := make([]SourceSpec, 0, len(in.Spec.Sources)+len(in.Status.GeneratedSources)) - sources = append(sources, in.Spec.Sources...) + var sources []SourceSpec sources = append(sources, in.Status.GeneratedSources...) + for _, src := range in.Spec.Sources { + if len(in.Status.GeneratedSources) == 0 { + sources = append(sources, src) + } else { + for _, genSrc := range in.Status.GeneratedSources { + if src.Name != genSrc.Name { + sources = append(sources, src) + } + } + } + } + + return sources +} + +// OriginalSources return a new slice containing only the original sources provided within the Integration. +// It checks if the spec source was transformed and available in the status, and return the latter in such a case. +func (in *Integration) OriginalSources() []SourceSpec { + var sources []SourceSpec + for _, src := range in.Spec.Sources { + found := false + loop: + for _, genSrc := range in.Status.GeneratedSources { + if src.Name == genSrc.Name { + sources = append(sources, genSrc) + found = true + break loop + } + } + if !found { + sources = append(sources, src) + } + } return sources } -func (in *Integration) UserDefinedSources() []SourceSpec { - sources := make([]SourceSpec, 0, len(in.Spec.Sources)) +// OriginalSourcesOnly return a new slice containing only the original sources provided within the Integration spec +// including the embedded yaml flow if it exists. +func (in *Integration) OriginalSourcesOnly() []SourceSpec { + var sources []SourceSpec sources = append(sources, in.Spec.Sources...) + if len(in.Spec.Flows) > 0 { + content, _ := ToYamlDSL(in.Spec.Flows) + sources = append(sources, SourceSpec{ + DataSpec: DataSpec{ + Name: IntegrationFlowEmbeddedSourceName, + Content: string(content), + }, + }) + } + return sources } @@ -418,3 +470,48 @@ func (c IntegrationCondition) GetReason() string { func (c IntegrationCondition) GetMessage() string { return c.Message } + +// FromYamlDSLString creates a slice of flows from a Camel YAML DSL string. +func FromYamlDSLString(flowsString string) ([]Flow, error) { + return FromYamlDSL(bytes.NewReader([]byte(flowsString))) +} + +// FromYamlDSL creates a slice of flows from a Camel YAML DSL stream. +func FromYamlDSL(reader io.Reader) ([]Flow, error) { + buffered, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + var flows []Flow + // Using the Kubernetes decoder to turn them into JSON before unmarshal. + // This avoids having map[interface{}]interface{} objects which are not JSON compatible. + jsonData, err := yaml.ToJSON(buffered) + if err != nil { + return nil, err + } + + if err = json.Unmarshal(jsonData, &flows); err != nil { + return nil, err + } + return flows, err +} + +// ToYamlDSL converts a flow into its Camel YAML DSL equivalent. +func ToYamlDSL(flows []Flow) ([]byte, error) { + data, err := json.Marshal(&flows) + if err != nil { + return nil, err + } + jsondata := make([]map[string]interface{}, 0) + d := json.NewDecoder(bytes.NewReader(data)) + d.UseNumber() + if err := d.Decode(&jsondata); err != nil { + return nil, fmt.Errorf("error unmarshalling json: %w", err) + } + yamldata, err := yaml2.Marshal(&jsondata) + if err != nil { + return nil, fmt.Errorf("error marshalling to yaml: %w", err) + } + + return yamldata, nil +} diff --git a/pkg/apis/camel/v1/integration_types_support_test.go b/pkg/apis/camel/v1/integration_types_support_test.go index fda60c1b29..1481339e02 100644 --- a/pkg/apis/camel/v1/integration_types_support_test.go +++ b/pkg/apis/camel/v1/integration_types_support_test.go @@ -18,11 +18,14 @@ limitations under the License. package v1 import ( + "bytes" + "encoding/json" "fmt" "testing" "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAllLanguages(t *testing.T) { @@ -125,3 +128,32 @@ func TestManagedBuild(t *testing.T) { } assert.True(t, integration.IsManagedBuild()) } + +func TestReadWriteYaml(t *testing.T) { + // yaml in conventional form as marshalled by the go runtime + yaml := `- from: + parameters: + period: 3600001 + steps: + - to: log:info + uri: timer:tick +` + + yamlReader := bytes.NewReader([]byte(yaml)) + flows, err := FromYamlDSL(yamlReader) + require.NoError(t, err) + assert.NotNil(t, flows) + assert.Len(t, flows, 1) + + flow := map[string]interface{}{} + err = json.Unmarshal(flows[0].RawMessage, &flow) + require.NoError(t, err) + + assert.NotNil(t, flow["from"]) + assert.Nil(t, flow["xx"]) + + data, err := ToYamlDSL(flows) + require.NoError(t, err) + assert.NotNil(t, data) + assert.Equal(t, yaml, string(data)) +} diff --git a/pkg/apis/camel/v1/trait/cron.go b/pkg/apis/camel/v1/trait/cron.go index 71169b8714..f65dc261e2 100644 --- a/pkg/apis/camel/v1/trait/cron.go +++ b/pkg/apis/camel/v1/trait/cron.go @@ -34,7 +34,7 @@ package trait // // - `cron`, `quartz`: when the cron expression does not contain seconds (or the "seconds" part is set to 0). E.g. // -// `cron:tab?schedule=0/2${plus}*{plus}*{plus}*{plus}?` or `quartz:trigger?cron=0{plus}0/2{plus}*{plus}*{plus}*{plus}?`. +// `cron:tab?schedule=0/2 * * * ?` or `quartz:trigger?cron=0 0/2 * * * ?`. // // +camel-k:trait=cron. type CronTrait struct { @@ -45,9 +45,6 @@ type CronTrait struct { // The timezone that the CronJob will run on TimeZone *string `property:"timeZone" json:"timeZone,omitempty"` // A comma separated list of the Camel components that need to be customized in order for them to work when the schedule is triggered externally by Kubernetes. - // A specific customizer is activated for each specified component. E.g. for the `timer` component, the `cron-timer` customizer is - // activated (it's present in the `org.apache.camel.k:camel-k-cron` library). - // // Supported components are currently: `cron`, `timer` and `quartz`. Components string `property:"components" json:"components,omitempty"` // Use the default Camel implementation of the `cron` endpoint (`quartz`) instead of trying to materialize the integration diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 8f458b33de..b4667868d9 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -55,7 +55,6 @@ import ( "github.com/apache/camel-k/v2/pkg/trait" "github.com/apache/camel-k/v2/pkg/util" "github.com/apache/camel-k/v2/pkg/util/camel" - "github.com/apache/camel-k/v2/pkg/util/dsl" "github.com/apache/camel-k/v2/pkg/util/kubernetes" k8slog "github.com/apache/camel-k/v2/pkg/util/kubernetes/log" "github.com/apache/camel-k/v2/pkg/util/property" @@ -673,7 +672,7 @@ func (o *runCmdOptions) resolveSources(cmd *cobra.Command, sources []string, it for _, source := range resolvedSources { if o.UseFlows && !o.Compression && source.IsYaml() { - flows, err := dsl.FromYamlDSLString(source.Content) + flows, err := v1.FromYamlDSLString(source.Content) if err != nil { return err } diff --git a/pkg/controller/integration/kits.go b/pkg/controller/integration/kits.go index 341f795e3e..21add8eeb1 100644 --- a/pkg/controller/integration/kits.go +++ b/pkg/controller/integration/kits.go @@ -202,10 +202,10 @@ func kitMatches(c client.Client, kit *v1.IntegrationKit, target *v1.IntegrationK } func hasMatchingSourcesForNative(it *v1.Integration, kit *v1.IntegrationKit) bool { - if len(it.UserDefinedSources()) != len(kit.Spec.Sources) { + if len(it.OriginalSources()) != len(kit.Spec.Sources) { return false } - for _, itSource := range it.UserDefinedSources() { + for _, itSource := range it.OriginalSources() { found := false for _, ikSource := range kit.Spec.Sources { if itSource.Content == ikSource.Content { diff --git a/pkg/controller/pipe/integration_test.go b/pkg/controller/pipe/integration_test.go index 3b334f7866..d41a232f07 100644 --- a/pkg/controller/pipe/integration_test.go +++ b/pkg/controller/pipe/integration_test.go @@ -22,7 +22,6 @@ import ( "testing" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" - "github.com/apache/camel-k/v2/pkg/util/dsl" "github.com/apache/camel-k/v2/pkg/util/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -50,7 +49,7 @@ func TestCreateIntegrationForPipe(t *testing.T) { assert.Equal(t, "camel.apache.org/v1", it.OwnerReferences[0].APIVersion) assert.Equal(t, "Pipe", it.OwnerReferences[0].Kind) assert.Equal(t, "my-pipe", it.OwnerReferences[0].Name) - dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + dsl, err := v1.ToYamlDSL(it.Spec.Flows) require.NoError(t, err) assert.Equal(t, expectedNominalRoute(), string(dsl)) } @@ -75,7 +74,7 @@ func TestCreateIntegrationForPipeWithSinkErrorHandler(t *testing.T) { assert.Equal(t, "#class:org.apache.camel.builder.DeadLetterChannelBuilder", it.Spec.GetConfigurationProperty("camel.beans.defaultErrorHandler")) assert.Equal(t, "someUri", it.Spec.GetConfigurationProperty("camel.beans.defaultErrorHandler.deadLetterUri")) assert.Equal(t, "defaultErrorHandler", it.Spec.GetConfigurationProperty(v1.ErrorHandlerRefName)) - dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + dsl, err := v1.ToYamlDSL(it.Spec.Flows) require.NoError(t, err) assert.Equal(t, expectedNominalRoute(), string(dsl)) } @@ -100,7 +99,7 @@ func TestCreateIntegrationForPipeWithLogErrorHandler(t *testing.T) { assert.Equal(t, "#class:org.apache.camel.builder.DefaultErrorHandlerBuilder", it.Spec.GetConfigurationProperty("camel.beans.defaultErrorHandler")) assert.Equal(t, "true", it.Spec.GetConfigurationProperty("camel.beans.defaultErrorHandler.showHeaders")) assert.Equal(t, "defaultErrorHandler", it.Spec.GetConfigurationProperty(v1.ErrorHandlerRefName)) - dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + dsl, err := v1.ToYamlDSL(it.Spec.Flows) require.NoError(t, err) assert.Equal(t, expectedNominalRoute(), string(dsl)) } @@ -117,7 +116,7 @@ func TestCreateIntegrationForPipeDataType(t *testing.T) { } it, err := CreateIntegrationFor(context.TODO(), client, &pipe) require.NoError(t, err) - dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + dsl, err := v1.ToYamlDSL(it.Spec.Flows) require.NoError(t, err) assert.Equal(t, expectedNominalRouteWithDataType("data-type-action"), string(dsl)) } @@ -136,7 +135,7 @@ func TestCreateIntegrationForPipeDataTypeOverridden(t *testing.T) { pipe.Annotations[v1.KameletDataTypeLabel] = newDataTypeKameletAction it, err := CreateIntegrationFor(context.TODO(), client, &pipe) require.NoError(t, err) - dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + dsl, err := v1.ToYamlDSL(it.Spec.Flows) require.NoError(t, err) assert.Equal(t, expectedNominalRouteWithDataType(newDataTypeKameletAction), string(dsl)) } diff --git a/pkg/trait/cron.go b/pkg/trait/cron.go index 80da757783..841709bf62 100644 --- a/pkg/trait/cron.go +++ b/pkg/trait/cron.go @@ -32,6 +32,7 @@ import ( traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/metadata" "github.com/apache/camel-k/v2/pkg/util" + "github.com/apache/camel-k/v2/pkg/util/source" "github.com/apache/camel-k/v2/pkg/util/uri" ) @@ -44,6 +45,8 @@ const ( defaultCronBackoffLimit = int32(2) genericCronComponent = "cron" genericCronComponentFallbackScheme = "quartz" + + overriddenFromURI = "timer:camel-k-overridden-cron?delay=0&period=1&repeatCount=1" ) type cronTrait struct { @@ -177,9 +180,9 @@ func (t *cronTrait) autoConfigure(e *Environment) error { } func (t *cronTrait) Apply(e *Environment) error { + //nolint: nestif if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityCron) - if ptr.Deref(t.Fallback, false) { fallbackArtifact := e.CamelCatalog.GetArtifactByScheme(genericCronComponentFallbackScheme) if fallbackArtifact == nil { @@ -187,17 +190,26 @@ func (t *cronTrait) Apply(e *Environment) error { } util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, fallbackArtifact.GetDependencyID()) util.StringSliceUniqueConcat(&e.Integration.Status.Dependencies, fallbackArtifact.GetConsumerDependencyIDs(genericCronComponentFallbackScheme)) + + return nil + } + // Will change the "from" URI in order to execute the task just once + if err := t.changeSourcesCronURI(e); err != nil { + return err + } + cronComponentArtifact := e.CamelCatalog.GetArtifactByScheme("timer") + if cronComponentArtifact == nil { + return fmt.Errorf("no timer artifact has been found in camel catalog") } + util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, cronComponentArtifact.GetDependencyID()) } - if !ptr.Deref(t.Fallback, false) && e.IntegrationInRunningPhases() { + if e.IntegrationInRunningPhases() && !ptr.Deref(t.Fallback, false) { if e.ApplicationProperties == nil { e.ApplicationProperties = make(map[string]string) } - - e.ApplicationProperties["camel.main.duration-max-idle-seconds"] = "5" - e.ApplicationProperties["loader.interceptor.cron.overridable-components"] = t.Components - e.Interceptors = append(e.Interceptors, "cron") + // Will instruct the context to stop as soon as the first message is done + e.ApplicationProperties["camel.main.durationMaxMessages"] = "1" cronJob := t.getCronJobFor(e) e.Resources.Add(cronJob) @@ -206,7 +218,12 @@ func (t *cronTrait) Apply(e *Environment) error { v1.IntegrationConditionCronJobAvailable, corev1.ConditionTrue, v1.IntegrationConditionCronJobAvailableReason, - fmt.Sprintf("CronJob name is %s", cronJob.Name)) + fmt.Sprintf( + "CronJob name is %s. Notice that the routes \"from\" parameter was changed to "+ + "\"%s\" in order to be able to trigger the Camel application as a CronJob.", + cronJob.Name, + overriddenFromURI, + )) } return nil @@ -341,7 +358,7 @@ func (t *cronTrait) getGlobalCron(e *Environment) (*cronInfo, error) { func (t *cronTrait) getSourcesFromURIs(e *Environment) ([]string, error) { var fromUris []string - _, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + _, err := e.ConsumeMeta(true, func(meta metadata.IntegrationMetadata) bool { fromUris = meta.FromURIs return true }) @@ -493,3 +510,20 @@ func checkedStringToUint64(str string) uint64 { } return res } + +// changeSourcesCronURI is in charge to change the value of the from route with a component that executes +// the workload just once. +func (t *cronTrait) changeSourcesCronURI(e *Environment) error { + for _, src := range e.Integration.AllSources() { + dslInspector := source.InspectorForLanguage(e.CamelCatalog, src.InferLanguage()) + replaced, err := dslInspector.ReplaceFromURI(&src, overriddenFromURI) + if replaced { + // replace generated source + e.Integration.Status.AddOrReplaceGeneratedSources(src) + } else if err != nil { + return fmt.Errorf("wasn't able to replace cron uri trigger in source %s", src.Name) + } + } + + return nil +} diff --git a/pkg/trait/cron_test.go b/pkg/trait/cron_test.go index e09eaeff21..f797a126d9 100644 --- a/pkg/trait/cron_test.go +++ b/pkg/trait/cron_test.go @@ -18,6 +18,7 @@ limitations under the License. package trait import ( + "fmt" "strings" "testing" @@ -28,6 +29,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" "k8s.io/utils/ptr" v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" @@ -296,7 +298,7 @@ func TestCronDeps(t *testing.T) { assert.NotNil(t, ct) assert.Nil(t, ct.Fallback) assert.True(t, util.StringSliceExists(environment.Integration.Status.Capabilities, v1.CapabilityCron)) - assert.Contains(t, environment.Integration.Status.Dependencies, "mvn:org.apache.camel.k:camel-k-cron") + assert.Contains(t, environment.Integration.Status.Dependencies, "camel:timer") } func TestCronMultipleScheduleFallback(t *testing.T) { @@ -459,7 +461,7 @@ func TestCronDepsFallback(t *testing.T) { assert.NotNil(t, ct.Fallback) assert.True(t, util.StringSliceExists(environment.Integration.Status.Capabilities, v1.CapabilityCron)) assert.Contains(t, environment.Integration.Status.Dependencies, "camel:quartz") - assert.Contains(t, environment.Integration.Status.Dependencies, "mvn:org.apache.camel.k:camel-k-cron") + assert.NotContains(t, environment.Integration.Status.Dependencies, "camel:timer") } func TestCronWithActiveDeadline(t *testing.T) { @@ -540,7 +542,6 @@ func TestCronWithActiveDeadline(t *testing.T) { ct, _ := environment.GetTrait("cron").(*cronTrait) assert.NotNil(t, ct) assert.Nil(t, ct.Fallback) - assert.Contains(t, environment.Interceptors, "cron") cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true }) assert.NotNil(t, cronJob) @@ -630,7 +631,6 @@ func TestCronWithBackoffLimit(t *testing.T) { ct, _ := environment.GetTrait("cron").(*cronTrait) assert.NotNil(t, ct) assert.Nil(t, ct.Fallback) - assert.Contains(t, environment.Interceptors, "cron") cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true }) assert.NotNil(t, cronJob) @@ -724,7 +724,6 @@ func TestCronWithTimeZone(t *testing.T) { ct, _ := environment.GetTrait("cron").(*cronTrait) assert.NotNil(t, ct) assert.Nil(t, ct.Fallback) - assert.Contains(t, environment.Interceptors, "cron") cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true }) assert.NotNil(t, cronJob) @@ -801,3 +800,87 @@ func TestCronAuto(t *testing.T) { Components: "cron", }, traits.Cron) } + +func TestCronRuntimeTriggerReplacement(t *testing.T) { + catalog, err := camel.DefaultCatalog() + assert.Nil(t, err) + + client, _ := test.NewFakeClient() + traitCatalog := NewCatalog(nil) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: client, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + Spec: v1.IntegrationSpec{ + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "routes.java", + Content: `from("quartz:trigger?cron=0 0/1 * * * ?").to("log:test")`, + }, + }, + }, + Traits: v1.Traits{ + Cron: &traitv1.CronTrait{ + BackoffLimit: pointer.Int32(5), + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Build: v1.IntegrationPlatformBuildSpec{ + RuntimeVersion: catalog.Runtime.Version, + }, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: kubernetes.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + c, err := NewFakeClient("ns") + assert.Nil(t, err) + + tc := NewCatalog(c) + _, _, err = tc.apply(&environment) + assert.Nil(t, err) + + // Integration Initialization + assert.NotEmpty(t, environment.Integration.Status.GeneratedSources) + assert.Equal(t, + fmt.Sprintf("from(\"%s\").to(\"log:test\")", overriddenFromURI), + environment.Integration.Status.GeneratedSources[0].Content, + ) + + // Integration Deployment + environment.Integration.Status.Phase = v1.IntegrationPhaseDeploying + _, _, err = tc.apply(&environment) + assert.Nil(t, err) + + cronJob := environment.Resources.GetCronJob(func(job *batchv1.CronJob) bool { return true }) + assert.NotNil(t, cronJob) + assert.Equal(t, "1", environment.ApplicationProperties["camel.main.durationMaxMessages"]) + assert.Equal(t, + fmt.Sprintf("from(\"%s\").to(\"log:test\")", overriddenFromURI), + environment.Integration.Status.GeneratedSources[0].Content, + ) +} diff --git a/pkg/trait/dependencies.go b/pkg/trait/dependencies.go index 55b1559dee..216bec009d 100644 --- a/pkg/trait/dependencies.go +++ b/pkg/trait/dependencies.go @@ -69,6 +69,7 @@ func (t *dependenciesTrait) Apply(e *Environment) error { } _, err := e.consumeSourcesMeta( + false, func(sources []v1.SourceSpec) bool { for _, s := range sources { // Add source-related language dependencies diff --git a/pkg/trait/init.go b/pkg/trait/init.go index 4a2306764d..9a1f0e46dc 100644 --- a/pkg/trait/init.go +++ b/pkg/trait/init.go @@ -24,15 +24,12 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" traitv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" "github.com/apache/camel-k/v2/pkg/util" - "github.com/apache/camel-k/v2/pkg/util/dsl" "k8s.io/utils/ptr" ) const ( initTraitID = "init" initTraitOrder = 1 - - flowsInternalSourceName = "camel-k-embedded-flow.yaml" ) type initTrait struct { @@ -57,13 +54,13 @@ func (t *initTrait) Configure(e *Environment) (bool, *TraitCondition, error) { func (t *initTrait) Apply(e *Environment) error { // Flows need to be turned into a generated source if len(e.Integration.Spec.Flows) > 0 { - content, err := dsl.ToYamlDSL(e.Integration.Spec.Flows) + content, err := v1.ToYamlDSL(e.Integration.Spec.Flows) if err != nil { return err } e.Integration.Status.AddOrReplaceGeneratedSources(v1.SourceSpec{ DataSpec: v1.DataSpec{ - Name: flowsInternalSourceName, + Name: v1.IntegrationFlowEmbeddedSourceName, Content: string(content), }, }) diff --git a/pkg/trait/kamelets.go b/pkg/trait/kamelets.go index 328b432664..f587b5109a 100644 --- a/pkg/trait/kamelets.go +++ b/pkg/trait/kamelets.go @@ -76,7 +76,7 @@ func (t *kameletsTrait) Configure(e *Environment) (bool, *TraitCondition, error) } if ptr.Deref(t.Auto, true) { var kamelets []string - _, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + _, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { util.StringSliceUniqueConcat(&kamelets, meta.Kamelets) return true }) diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index 19439453d3..a456ae1ad1 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -80,7 +80,7 @@ func (t *knativeTrait) Configure(e *Environment) (bool, *TraitCondition, error) return false, nil, nil } - _, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + _, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { if len(t.ChannelSources) == 0 { t.ChannelSources = filterMetaItems(meta, knativeapi.CamelServiceTypeChannel, "from") } diff --git a/pkg/trait/knative_service.go b/pkg/trait/knative_service.go index 0a53459923..dd7c8236f9 100644 --- a/pkg/trait/knative_service.go +++ b/pkg/trait/knative_service.go @@ -151,7 +151,7 @@ func (t *knativeServiceTrait) SelectControllerStrategy(e *Environment) (*Control return &controllerStrategy, nil } - enabled, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + enabled, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { return meta.ExposesHTTPServices || meta.PassiveEndpoints }) if err != nil { diff --git a/pkg/trait/quarkus.go b/pkg/trait/quarkus.go index 50327b22c5..9c16382604 100644 --- a/pkg/trait/quarkus.go +++ b/pkg/trait/quarkus.go @@ -507,7 +507,7 @@ func sourcesRequiredAtBuildTime(e *Environment, source v1.SourceSpec) bool { // Propagates the user defined sources that are required at build time for native compilation. func propagateSourcesRequiredAtBuildTime(e *Environment) []v1.SourceSpec { array := make([]v1.SourceSpec, 0) - for _, source := range e.Integration.UserDefinedSources() { + for _, source := range e.Integration.OriginalSources() { if sourcesRequiredAtBuildTime(e, source) { array = append(array, source) } diff --git a/pkg/trait/resolver.go b/pkg/trait/resolver.go index e750429866..05762e0466 100644 --- a/pkg/trait/resolver.go +++ b/pkg/trait/resolver.go @@ -87,13 +87,21 @@ func resolveIntegrationSources( context context.Context, client controller.Reader, integration *v1.Integration, + originalSourcesOnly bool, resources *kubernetes.Collection) ([]v1.SourceSpec, error) { if integration == nil { return nil, nil } - return resolveSources(integration.AllSources(), func(name string) (*corev1.ConfigMap, error) { + var sources []v1.SourceSpec + if originalSourcesOnly { + sources = integration.OriginalSourcesOnly() + } else { + sources = integration.AllSources() + } + + return resolveSources(sources, func(name string) (*corev1.ConfigMap, error) { // the config map could be part of the resources created // by traits cm := resources.GetConfigMap(func(m *corev1.ConfigMap) bool { diff --git a/pkg/trait/service.go b/pkg/trait/service.go index b114115f63..9cb3e85224 100644 --- a/pkg/trait/service.go +++ b/pkg/trait/service.go @@ -72,7 +72,7 @@ func (t *serviceTrait) Configure(e *Environment) (bool, *TraitCondition, error) } if ptr.Deref(t.Auto, true) { - exposeHTTPServices, err := e.ConsumeMeta(func(meta metadata.IntegrationMetadata) bool { + exposeHTTPServices, err := e.ConsumeMeta(false, func(meta metadata.IntegrationMetadata) bool { return meta.ExposesHTTPServices }) var condition *TraitCondition diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go index 5b6ac50b96..2daff66310 100644 --- a/pkg/trait/trait_types.go +++ b/pkg/trait/trait_types.go @@ -233,7 +233,6 @@ type Environment struct { ExecutedTraits []Trait EnvVars []corev1.EnvVar ApplicationProperties map[string]string - Interceptors []string } // ControllerStrategy is used to determine the kind of controller that needs to be created for the integration. @@ -490,16 +489,6 @@ func (e *Environment) addSourcesProperties() { e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].compressed", idx)] = boolean.TrueString } - interceptors := make([]string, 0, len(s.Interceptors)) - if s.Interceptors != nil { - interceptors = append(interceptors, s.Interceptors...) - } - if e.Interceptors != nil { - interceptors = append(interceptors, e.Interceptors...) - } - for intID, interceptor := range interceptors { - e.ApplicationProperties[fmt.Sprintf("camel.k.sources[%d].interceptors[%d]", idx, intID)] = interceptor - } idx++ } } @@ -759,18 +748,21 @@ func CapabilityPropertyKey(camelPropertyKey string, vars map[string]string) stri // ConsumeMeta is used to consume metadata information coming from Integration sources. If no sources available, // would return false. When consuming from meta you should make sure that the configuration is stored in the // status traits by setting each trait configuration when in "auto" mode. -func (e *Environment) ConsumeMeta(consumeMeta func(metadata.IntegrationMetadata) bool) (bool, error) { - return e.consumeSourcesMeta(nil, consumeMeta) +// originalSourcesOnly flag indicates if you want to use only the sources provided originally to the Integration, otherwise +// it will consume all sources, also the one autogenerated by the operator. +func (e *Environment) ConsumeMeta(originalSourcesOnly bool, consumeMeta func(metadata.IntegrationMetadata) bool) (bool, error) { + return e.consumeSourcesMeta(originalSourcesOnly, nil, consumeMeta) } // consumeSourcesMeta is used to consume both sources and metadata information coming from Integration sources. // If no sources available would return false. func (e *Environment) consumeSourcesMeta( + originalSourcesOnly bool, consumeSources func(sources []v1.SourceSpec) bool, consumeMeta func(metadata.IntegrationMetadata) bool) (bool, error) { var sources []v1.SourceSpec var err error - if sources, err = resolveIntegrationSources(e.Ctx, e.Client, e.Integration, e.Resources); err != nil { + if sources, err = resolveIntegrationSources(e.Ctx, e.Client, e.Integration, originalSourcesOnly, e.Resources); err != nil { return false, err } if len(sources) < 1 { diff --git a/pkg/util/digest/digest.go b/pkg/util/digest/digest.go index 9630467db8..16a6054d90 100644 --- a/pkg/util/digest/digest.go +++ b/pkg/util/digest/digest.go @@ -34,7 +34,6 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util" "github.com/apache/camel-k/v2/pkg/util/defaults" - "github.com/apache/camel-k/v2/pkg/util/dsl" "fmt" ) @@ -88,7 +87,7 @@ func ComputeForIntegration(integration *v1.Integration, configmapVersions []stri // Integration flows if len(integration.Spec.Flows) > 0 { - flows, err := dsl.ToYamlDSL(integration.Spec.Flows) + flows, err := v1.ToYamlDSL(integration.Spec.Flows) if err != nil { return "", err } @@ -310,12 +309,6 @@ func ComputeForSource(s v1.SourceSpec) (string, error) { if _, err := hash.Write([]byte(s.Loader)); err != nil { return "", err } - for _, i := range s.Interceptors { - if _, err := hash.Write([]byte(i)); err != nil { - return "", err - } - } - if _, err := hash.Write([]byte(strconv.FormatBool(s.Compression))); err != nil { return "", err } diff --git a/pkg/util/dsl/flow.go b/pkg/util/dsl/flow.go deleted file mode 100644 index 32569e30ad..0000000000 --- a/pkg/util/dsl/flow.go +++ /dev/null @@ -1,76 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dsl - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - - yaml2 "gopkg.in/yaml.v2" - - "k8s.io/apimachinery/pkg/util/yaml" - - v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" -) - -// FromYamlDSLString creates a slice of flows from a Camel YAML DSL string. -func FromYamlDSLString(flowsString string) ([]v1.Flow, error) { - return FromYamlDSL(bytes.NewReader([]byte(flowsString))) -} - -// FromYamlDSL creates a slice of flows from a Camel YAML DSL stream. -func FromYamlDSL(reader io.Reader) ([]v1.Flow, error) { - buffered, err := io.ReadAll(reader) - if err != nil { - return nil, err - } - var flows []v1.Flow - // Using the Kubernetes decoder to turn them into JSON before unmarshal. - // This avoids having map[interface{}]interface{} objects which are not JSON compatible. - jsonData, err := yaml.ToJSON(buffered) - if err != nil { - return nil, err - } - - if err = json.Unmarshal(jsonData, &flows); err != nil { - return nil, err - } - return flows, err -} - -// ToYamlDSL converts a flow into its Camel YAML DSL equivalent. -func ToYamlDSL(flows []v1.Flow) ([]byte, error) { - data, err := json.Marshal(&flows) - if err != nil { - return nil, err - } - jsondata := make([]map[string]interface{}, 0) - d := json.NewDecoder(bytes.NewReader(data)) - d.UseNumber() - if err := d.Decode(&jsondata); err != nil { - return nil, fmt.Errorf("error unmarshalling json: %w", err) - } - yamldata, err := yaml2.Marshal(&jsondata) - if err != nil { - return nil, fmt.Errorf("error marshalling to yaml: %w", err) - } - - return yamldata, nil -} diff --git a/pkg/util/dsl/flow_test.go b/pkg/util/dsl/flow_test.go deleted file mode 100644 index 4a2b62642e..0000000000 --- a/pkg/util/dsl/flow_test.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dsl - -import ( - "bytes" - "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestReadWriteYaml(t *testing.T) { - // yaml in conventional form as marshalled by the go runtime - yaml := `- from: - parameters: - period: 3600001 - steps: - - to: log:info - uri: timer:tick -` - - yamlReader := bytes.NewReader([]byte(yaml)) - flows, err := FromYamlDSL(yamlReader) - require.NoError(t, err) - assert.NotNil(t, flows) - assert.Len(t, flows, 1) - - flow := map[string]interface{}{} - err = json.Unmarshal(flows[0].RawMessage, &flow) - require.NoError(t, err) - - assert.NotNil(t, flow["from"]) - assert.Nil(t, flow["xx"]) - - data, err := ToYamlDSL(flows) - require.NoError(t, err) - assert.NotNil(t, data) - assert.Equal(t, yaml, string(data)) -} diff --git a/pkg/util/source/inspector.go b/pkg/util/source/inspector.go index d13bcd68e3..04c646b95d 100644 --- a/pkg/util/source/inspector.go +++ b/pkg/util/source/inspector.go @@ -164,7 +164,10 @@ var ( // Inspector is the common interface for language specific inspector implementations. type Inspector interface { + // Extract scan the source spec for metadata. Extract(spec v1.SourceSpec, metadata *Metadata) error + // ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. + ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) } // InspectorForLanguage is the factory function to return a new inspector for the given language @@ -225,6 +228,10 @@ func (i baseInspector) Extract(v1.SourceSpec, *Metadata) error { return nil } +func (i baseInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + return false, nil +} + func (i *baseInspector) extract(source v1.SourceSpec, meta *Metadata, from, to, kameletEips []string, hasRest bool) error { meta.FromURIs = append(meta.FromURIs, from...) diff --git a/pkg/util/source/inspector_groovy.go b/pkg/util/source/inspector_groovy.go index 63c272e15e..57ecba6c7a 100644 --- a/pkg/util/source/inspector_groovy.go +++ b/pkg/util/source/inspector_groovy.go @@ -18,14 +18,18 @@ limitations under the License. package source import ( + "strings" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util" ) +// GroovyInspector inspects Groovy DSL spec. type GroovyInspector struct { baseInspector } +// Extract extracts all metadata from source spec. func (i GroovyInspector) Extract(source v1.SourceSpec, meta *Metadata) error { from := util.FindAllDistinctStringSubmatch( source.Content, @@ -54,3 +58,28 @@ func (i GroovyInspector) Extract(source v1.SourceSpec, meta *Metadata) error { return i.extract(source, meta, from, to, kameletEips, hasRest) } + +// ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. Returns true if it applies a replacement. +func (i GroovyInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + froms := util.FindAllDistinctStringSubmatch( + source.Content, + singleQuotedFrom, + doubleQuotedFrom, + singleQuotedFromF, + doubleQuotedFromF, + ) + newContent := source.Content + if froms == nil { + return false, nil + } + for _, from := range froms { + newContent = strings.ReplaceAll(newContent, from, newFromURI) + } + replaced := newContent != source.Content + + if replaced { + source.Content = newContent + } + + return replaced, nil +} diff --git a/pkg/util/source/inspector_groovy_test.go b/pkg/util/source/inspector_groovy_test.go index 010b72a62a..c89e1c5dc0 100644 --- a/pkg/util/source/inspector_groovy_test.go +++ b/pkg/util/source/inspector_groovy_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util/camel" ) @@ -170,3 +171,35 @@ func TestGroovyDataFormat(t *testing.T) { }) } } + +func TestGroovyReplaceURI(t *testing.T) { + inspector := newTestGroovyInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.groovy", + Content: "from('quartz:trigger?cron=0 0/1 * * * ?').to('log:info')", + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + assert.Equal(t, "from('direct:newURI?hello=world').to('log:info')", sourceSpec.Content) + + sourceSpec = &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.groovy", + Content: "from(\"quartz:trigger?cron=0 0/1 * * * ?\").to(\"log:info\")", + }, + } + replaced, err = inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.True(t, replaced) + assert.Nil(t, err) + assert.Equal(t, "from(\"direct:newURI?hello=world\").to(\"log:info\")", sourceSpec.Content) +} diff --git a/pkg/util/source/inspector_java_script.go b/pkg/util/source/inspector_java_script.go index 14eee461d3..c54b18881d 100644 --- a/pkg/util/source/inspector_java_script.go +++ b/pkg/util/source/inspector_java_script.go @@ -18,14 +18,18 @@ limitations under the License. package source import ( + "strings" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util" ) +// JavaScriptInspector inspects Javascript DSL spec. type JavaScriptInspector struct { baseInspector } +// Extract extracts all metadata from source spec. func (i JavaScriptInspector) Extract(source v1.SourceSpec, meta *Metadata) error { from := util.FindAllDistinctStringSubmatch( source.Content, @@ -54,3 +58,28 @@ func (i JavaScriptInspector) Extract(source v1.SourceSpec, meta *Metadata) error return i.extract(source, meta, from, to, kameletEips, hasRest) } + +// ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. Returns true if it applies a replacement. +func (i JavaScriptInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + froms := util.FindAllDistinctStringSubmatch( + source.Content, + singleQuotedFrom, + doubleQuotedFrom, + singleQuotedFromF, + doubleQuotedFromF, + ) + newContent := source.Content + if froms == nil { + return false, nil + } + for _, from := range froms { + newContent = strings.ReplaceAll(newContent, from, newFromURI) + } + replaced := newContent != source.Content + + if replaced { + source.Content = newContent + } + + return replaced, nil +} diff --git a/pkg/util/source/inspector_java_script_test.go b/pkg/util/source/inspector_java_script_test.go index 6a8f6d3dac..8c860acfb0 100644 --- a/pkg/util/source/inspector_java_script_test.go +++ b/pkg/util/source/inspector_java_script_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/stretchr/testify/assert" @@ -152,3 +153,35 @@ func TestJavaScriptDataFormat(t *testing.T) { }) } } + +func TestJavascriptReplaceURI(t *testing.T) { + inspector := newTestJavaScriptInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.js", + Content: "from('quartz:trigger?cron=0 0/1 * * * ?').to('log:info')", + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + assert.Equal(t, "from('direct:newURI?hello=world').to('log:info')", sourceSpec.Content) + + sourceSpec = &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.js", + Content: "from(\"quartz:trigger?cron=0 0/1 * * * ?\").to(\"log:info\")", + }, + } + replaced, err = inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + assert.Equal(t, "from(\"direct:newURI?hello=world\").to(\"log:info\")", sourceSpec.Content) +} diff --git a/pkg/util/source/inspector_java_source.go b/pkg/util/source/inspector_java_source.go index c1bc46c2e7..6486bdcefc 100644 --- a/pkg/util/source/inspector_java_source.go +++ b/pkg/util/source/inspector_java_source.go @@ -18,14 +18,18 @@ limitations under the License. package source import ( + "strings" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util" ) +// JavaScriptInspector inspects Java DSL spec. type JavaSourceInspector struct { baseInspector } +// Extract extracts all metadata from source spec. func (i JavaSourceInspector) Extract(source v1.SourceSpec, meta *Metadata) error { from := util.FindAllDistinctStringSubmatch( source.Content, @@ -48,3 +52,25 @@ func (i JavaSourceInspector) Extract(source v1.SourceSpec, meta *Metadata) error return i.extract(source, meta, from, to, kameletEips, hasRest) } + +// ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. Returns true if it applies a replacement. +func (i JavaSourceInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + froms := util.FindAllDistinctStringSubmatch( + source.Content, + doubleQuotedFrom, + doubleQuotedFromF, + ) + newContent := source.Content + if froms == nil { + return false, nil + } + for _, from := range froms { + newContent = strings.ReplaceAll(newContent, from, newFromURI) + } + replaced := newContent != source.Content + if replaced { + source.Content = newContent + } + + return replaced, nil +} diff --git a/pkg/util/source/inspector_java_source_test.go b/pkg/util/source/inspector_java_source_test.go index ba78b032b8..71e60e2dbd 100644 --- a/pkg/util/source/inspector_java_source_test.go +++ b/pkg/util/source/inspector_java_source_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/stretchr/testify/assert" @@ -152,3 +153,21 @@ func TestJavaSourceDataFormat(t *testing.T) { }) } } + +func TestJavaReplaceURI(t *testing.T) { + inspector := newTestJavaSourceInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.java", + Content: "from(\"quartz:trigger?cron=0 0/1 * * * ?\").to(\"log:info\")", + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + assert.Equal(t, "from(\"direct:newURI?hello=world\").to(\"log:info\")", sourceSpec.Content) +} diff --git a/pkg/util/source/inspector_kotlin.go b/pkg/util/source/inspector_kotlin.go index b7bf57a3df..bdf57afe7c 100644 --- a/pkg/util/source/inspector_kotlin.go +++ b/pkg/util/source/inspector_kotlin.go @@ -18,14 +18,18 @@ limitations under the License. package source import ( + "strings" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util" ) +// KotlinInspector inspects Kotlin DSL spec. type KotlinInspector struct { baseInspector } +// Extract extracts all metadata from source spec. func (i KotlinInspector) Extract(source v1.SourceSpec, meta *Metadata) error { from := util.FindAllDistinctStringSubmatch( source.Content, @@ -48,3 +52,26 @@ func (i KotlinInspector) Extract(source v1.SourceSpec, meta *Metadata) error { return i.extract(source, meta, from, to, kameletEips, hasRest) } + +// ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. Returns true if it applies a replacement. +func (i KotlinInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + froms := util.FindAllDistinctStringSubmatch( + source.Content, + doubleQuotedFrom, + doubleQuotedFromF, + ) + newContent := source.Content + if froms == nil { + return false, nil + } + for _, from := range froms { + newContent = strings.ReplaceAll(newContent, from, newFromURI) + } + replaced := newContent != source.Content + + if replaced { + source.Content = newContent + } + + return replaced, nil +} diff --git a/pkg/util/source/inspector_kotlin_test.go b/pkg/util/source/inspector_kotlin_test.go index 5d176f9aa6..3e5b9c3b09 100644 --- a/pkg/util/source/inspector_kotlin_test.go +++ b/pkg/util/source/inspector_kotlin_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/stretchr/testify/assert" @@ -152,3 +153,21 @@ func TestKotlinDataFormat(t *testing.T) { }) } } + +func TestKotlinReplaceURI(t *testing.T) { + inspector := newTestKotlinInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.java", + Content: "from(\"quartz:trigger?cron=0 0/1 * * * ?\").to(\"log:info\")", + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + assert.Equal(t, "from(\"direct:newURI?hello=world\").to(\"log:info\")", sourceSpec.Content) +} diff --git a/pkg/util/source/inspector_xml.go b/pkg/util/source/inspector_xml.go index 3f484ba507..1a1954879a 100644 --- a/pkg/util/source/inspector_xml.go +++ b/pkg/util/source/inspector_xml.go @@ -24,18 +24,22 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" ) -// XMLInspector --. +const ( + language = "language" + URI = "uri" +) + +// XMLInspector inspects XML DSL spec. type XMLInspector struct { baseInspector } -// Extract --. -// -//nolint:goconst,nestif +// Extract extracts all metadata from source spec. func (i XMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error { content := strings.NewReader(source.Content) decoder := xml.NewDecoder(content) + //nolint: nestif for { // Read tokens from the XML document in a stream. t, _ := decoder.Token() @@ -60,9 +64,9 @@ func (i XMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error { if dfDep := i.catalog.GetArtifactByDataFormat(dataFormatID); dfDep != nil { meta.AddDependency(dfDep.GetDependencyID()) } - case "language": + case language: for _, a := range se.Attr { - if a.Name.Local == "language" { + if a.Name.Local == language { if dependency, ok := i.catalog.GetLanguageDependency(a.Value); ok { meta.AddDependency(dependency) } @@ -70,13 +74,13 @@ func (i XMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error { } case "from", "fromF": for _, a := range se.Attr { - if a.Name.Local == "uri" { + if a.Name.Local == URI { meta.FromURIs = append(meta.FromURIs, a.Value) } } case "to", "toD", "toF", "wireTap": for _, a := range se.Attr { - if a.Name.Local == "uri" { + if a.Name.Local == URI { meta.ToURIs = append(meta.ToURIs, a.Value) } } @@ -107,3 +111,25 @@ func (i XMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error { return nil } + +// ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. Returns true if it applies a replacement. +func (i XMLInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + metadata := NewMetadata() + if err := i.Extract(*source, &metadata); err != nil { + return false, err + } + newContent := source.Content + if metadata.FromURIs == nil { + return false, nil + } + for _, from := range metadata.FromURIs { + newContent = strings.ReplaceAll(newContent, from, newFromURI) + } + replaced := newContent != source.Content + + if replaced { + source.Content = newContent + } + + return replaced, nil +} diff --git a/pkg/util/source/inspector_xml_test.go b/pkg/util/source/inspector_xml_test.go index db04fdcfda..8e68c3bcf0 100644 --- a/pkg/util/source/inspector_xml_test.go +++ b/pkg/util/source/inspector_xml_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" "github.com/apache/camel-k/v2/pkg/util/camel" "github.com/stretchr/testify/assert" @@ -186,3 +187,21 @@ func TestXMLDataFormat(t *testing.T) { }) } } + +func TestXMLReplaceURI(t *testing.T) { + inspector := newTestXMLInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.xml", + Content: xmlJSONEip, + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + assert.Contains(t, sourceSpec.Content, "") +} diff --git a/pkg/util/source/inspector_yaml.go b/pkg/util/source/inspector_yaml.go index 1e208c0d2b..1990076358 100644 --- a/pkg/util/source/inspector_yaml.go +++ b/pkg/util/source/inspector_yaml.go @@ -27,12 +27,12 @@ import ( "github.com/apache/camel-k/v2/pkg/util/uri" ) -// YAMLInspector --. +// YAMLInspector inspects YAML DSL spec. type YAMLInspector struct { baseInspector } -// Extract --. +// Extract extracts all metadata from source spec. func (i YAMLInspector) Extract(source v1.SourceSpec, meta *Metadata) error { definitions := make([]map[string]interface{}, 0) @@ -200,3 +200,46 @@ func (i YAMLInspector) parseStepsParam(steps []interface{}, meta *Metadata) erro } return nil } + +// ReplaceFromURI parses the source content and replace the `from` URI configuration with the a new URI. Returns true if it applies a replacement. +func (i YAMLInspector) ReplaceFromURI(source *v1.SourceSpec, newFromURI string) (bool, error) { + definitions := make([]map[string]interface{}, 0) + + if err := yaml2.Unmarshal([]byte(source.Content), &definitions); err != nil { + return false, err + } + + // We expect the from in .route.from or .from location + for _, routeRaw := range definitions { + var from map[interface{}]interface{} + var fromOk bool + route, routeOk := routeRaw["route"].(map[interface{}]interface{}) + if routeOk { + from, fromOk = route["from"].(map[interface{}]interface{}) + if !fromOk { + return false, nil + } + } + if from == nil { + from, fromOk = routeRaw["from"].(map[interface{}]interface{}) + if !fromOk { + return false, nil + } + } + delete(from, "parameters") + from["uri"] = newFromURI + } + + newContentRaw, err := yaml2.Marshal(definitions) + if err != nil { + return false, err + } + + newContent := string(newContentRaw) + if newContent != source.Content { + source.Content = newContent + return true, nil + } + + return false, nil +} diff --git a/pkg/util/source/inspector_yaml_test.go b/pkg/util/source/inspector_yaml_test.go index 81cc915580..d1eee14629 100644 --- a/pkg/util/source/inspector_yaml_test.go +++ b/pkg/util/source/inspector_yaml_test.go @@ -643,3 +643,89 @@ func TestYAMLRouteWithUnknownScheme(t *testing.T) { }) } } + +const yamlRouteCronReplacement = ` +- route: + id: route1 + from: + uri: "cron:tab" + parameters: + schedule: "* * * * ?" + steps: + - setBody: + constant: "Hello Yaml !!!" + - transform: + simple: "${body.toUpperCase()}" + - to: "{{url}}" +` + +const yamlFromCronReplacement = ` +- from: + uri: "cron:tab" + parameters: + schedule: "* * * * ?" + steps: + - setBody: + constant: "Hello Yaml !!!" + - transform: + simple: "${body.toUpperCase()}" + - to: "{{url}}" +` + +const expectedYamlFromCronReplacement = `from: + steps: + - setBody: + constant: Hello Yaml !!! + - transform: + simple: ${body.toUpperCase()} + - to: '{{url}}' + uri: direct:newURI?hello=world +` + +const expectedYamlRouteCronReplacement = `from: + steps: + - setBody: + constant: Hello Yaml !!! + - transform: + simple: ${body.toUpperCase()} + - to: '{{url}}' + uri: direct:newURI?hello=world +` + +func TestYAMLFromReplaceURI(t *testing.T) { + inspector := newTestYAMLInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.yaml", + Content: yamlFromCronReplacement, + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + // Assert changed uri and removed parameters + assert.Contains(t, sourceSpec.Content, expectedYamlFromCronReplacement) +} + +func TestYAMLRouteReplaceURI(t *testing.T) { + inspector := newTestYAMLInspector(t) + + sourceSpec := &v1.SourceSpec{ + DataSpec: v1.DataSpec{ + Name: "test.yaml", + Content: yamlRouteCronReplacement, + }, + } + replaced, err := inspector.ReplaceFromURI( + sourceSpec, + "direct:newURI?hello=world", + ) + assert.Nil(t, err) + assert.True(t, replaced) + // Assert changed uri and removed parameters + assert.Contains(t, sourceSpec.Content, expectedYamlRouteCronReplacement) +}