diff --git a/e2e/cron_test.go b/e2e/cron_test.go index a64936ad76..857f7f9ac3 100644 --- a/e2e/cron_test.go +++ b/e2e/cron_test.go @@ -58,5 +58,14 @@ func TestRunCronExample(t *testing.T) { Eventually(integrationLogs(ns, "cron-fallback"), testTimeoutShort).Should(ContainSubstring("Magicstring!")) Expect(kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil()) }) + + t.Run("cron-fallback-quarkus", func(t *testing.T) { + RegisterTestingT(t) + + Expect(kamel("run", "-n", ns, "files/cron.groovy").Execute()).Should(BeNil()) + Eventually(integrationPodPhase(ns, "cron"), testTimeoutMedium).Should(Equal(v1.PodRunning)) + Eventually(integrationLogs(ns, "cron"), testTimeoutShort).Should(ContainSubstring("Magicstring!")) + Expect(kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil()) + }) }) } diff --git a/pkg/trait/cron.go b/pkg/trait/cron.go index 44454388b6..6b6ac85506 100644 --- a/pkg/trait/cron.go +++ b/pkg/trait/cron.go @@ -20,6 +20,7 @@ package trait import ( "fmt" "regexp" + "sort" "strconv" "strings" @@ -31,7 +32,6 @@ import ( v1 "github.com/apache/camel-k/pkg/apis/camel/v1" "github.com/apache/camel-k/pkg/metadata" "github.com/apache/camel-k/pkg/util" - "github.com/apache/camel-k/pkg/util/envvar" "github.com/apache/camel-k/pkg/util/kubernetes" "github.com/apache/camel-k/pkg/util/uri" ) @@ -125,7 +125,18 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { return false, nil } - if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization) && !e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) { + if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying) { + return false, nil + } + + if _, ok := e.CamelCatalog.Runtime.Capabilities["cron"]; !ok { + e.Integration.Status.SetCondition( + v1.IntegrationConditionCronJobAvailable, + corev1.ConditionFalse, + v1.IntegrationConditionCronJobNotAvailableReason, + "the runtime provider %s does not declare 'cron' capability", + ) + return false, nil } @@ -156,7 +167,14 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { t.ConcurrencyPolicy = string(v1beta1.ForbidConcurrent) } - if t.Schedule == "" && t.Components == "" && t.Fallback == nil { + hasQuarkus := false + + qt := e.GetTrait("quarkus") + if qt != nil { + hasQuarkus = qt.(*quarkusTrait).Enabled != nil && *(qt.(*quarkusTrait).Enabled) + } + + if (hasQuarkus || (t.Schedule == "" && t.Components == "")) && t.Fallback == nil { // If there's at least a `cron` endpoint, add a fallback implementation fromURIs, err := t.getSourcesFromURIs(e) if err != nil { @@ -169,10 +187,8 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { break } } - } } - dt := e.Catalog.GetTrait("deployer") if dt != nil { t.deployer = *dt.(*deployerTrait) @@ -180,7 +196,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { // Fallback strategy can be implemented in any other controller if t.Fallback != nil && *t.Fallback { - if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) { + if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) { e.Integration.Status.SetCondition( v1.IntegrationConditionCronJobAvailable, corev1.ConditionFalse, @@ -202,7 +218,7 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { return false, err } if strategy != ControllerStrategyCronJob { - if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) { + if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) { e.Integration.Status.SetCondition( v1.IntegrationConditionCronJobAvailable, corev1.ConditionFalse, @@ -217,30 +233,43 @@ func (t *cronTrait) Configure(e *Environment) (bool, error) { } func (t *cronTrait) Apply(e *Environment) error { - if t.Fallback != nil && *t.Fallback { - if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { + if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { + if capability, ok := e.CamelCatalog.Runtime.Capabilities["cron"]; ok { + for _, dependency := range capability.Dependencies { + util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID)) + } + + // sort the dependencies to get always the same list if they don't change + sort.Strings(e.Integration.Status.Dependencies) + } + + if t.Fallback != nil && *t.Fallback { util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, genericCronComponentFallback) } - } else { - if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { - util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-cron") - } else if e.InPhase(v1.IntegrationKitPhaseReady, v1.IntegrationPhaseDeploying) { - cronJob := t.getCronJobFor(e) - maps := e.ComputeConfigMaps() + } - e.Resources.AddAll(maps) - e.Resources.Add(cronJob) + if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) { + if e.ApplicationProperties == nil { + e.ApplicationProperties = make(map[string]string) + } - e.Integration.Status.SetCondition( - v1.IntegrationConditionCronJobAvailable, - corev1.ConditionTrue, - v1.IntegrationConditionCronJobAvailableReason, - fmt.Sprintf("CronJob name is %s", cronJob.Name), - ) + e.ApplicationProperties["camel.main.duration-max-messages"] = "1" + e.ApplicationProperties["loader.interceptor.cron.overridable-components"] = t.Components + e.Interceptors = append(e.Interceptors, "cron") - envvar.SetVal(&e.EnvVars, "CAMEL_K_CRON_OVERRIDE", t.Components) - } + cronJob := t.getCronJobFor(e) + maps := e.ComputeConfigMaps() + + e.Resources.AddAll(maps) + e.Resources.Add(cronJob) + + e.Integration.Status.SetCondition( + v1.IntegrationConditionCronJobAvailable, + corev1.ConditionTrue, + v1.IntegrationConditionCronJobAvailableReason, + fmt.Sprintf("CronJob name is %s", cronJob.Name)) } + return nil } diff --git a/pkg/trait/cron_test.go b/pkg/trait/cron_test.go index f36791b24b..c077fe2553 100644 --- a/pkg/trait/cron_test.go +++ b/pkg/trait/cron_test.go @@ -18,10 +18,21 @@ limitations under the License. package trait import ( + "context" + "fmt" "strings" "testing" - "github.com/magiconair/properties/assert" + "github.com/apache/camel-k/pkg/util" + + v1 "github.com/apache/camel-k/pkg/apis/camel/v1" + "github.com/apache/camel-k/pkg/util/camel" + k8sutils "github.com/apache/camel-k/pkg/util/kubernetes" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + passert "github.com/magiconair/properties/assert" + "github.com/stretchr/testify/assert" ) func TestCronFromURI(t *testing.T) { @@ -198,13 +209,324 @@ func TestCronFromURI(t *testing.T) { if res != nil { gotCron = res.schedule } - assert.Equal(t, gotCron, thetest.cron) + passert.Equal(t, gotCron, thetest.cron) gotComponents := "" if res != nil { gotComponents = strings.Join(res.components, ",") } - assert.Equal(t, gotComponents, thetest.components) + passert.Equal(t, gotComponents, thetest.components) }) } } + +func TestCronDeps(t *testing.T) { + catalog, err := camel.DefaultCatalog() + assert.Nil(t, err) + + traitCatalog := NewCatalog(context.TODO(), nil) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "routes.java", + Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`, + }, + Language: v1.LanguageJavaSource, + }, + }, + Resources: []v1.ResourceSpec{}, + Traits: map[string]v1.TraitSpec{}, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"}, + }, + Profile: v1.TraitProfileKnative, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + c, err := NewFakeClient("ns") + assert.Nil(t, err) + + tc := NewCatalog(context.TODO(), c) + + err = tc.apply(&environment) + + assert.Nil(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + + ct := environment.GetTrait("cron").(*cronTrait) + assert.NotNil(t, ct) + assert.Nil(t, ct.Fallback) + + capability, ok := environment.CamelCatalog.Runtime.Capabilities["cron"] + assert.True(t, ok) + + for _, dependency := range capability.Dependencies { + assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID))) + } +} + +func TestCronDepsFallback(t *testing.T) { + catalog, err := camel.DefaultCatalog() + assert.Nil(t, err) + + traitCatalog := NewCatalog(context.TODO(), nil) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseInitialization, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "routes.java", + Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`, + }, + Language: v1.LanguageJavaSource, + }, + }, + Resources: []v1.ResourceSpec{}, + Traits: map[string]v1.TraitSpec{ + "cron": { + Configuration: map[string]string{ + "fallback": "true", + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"}, + }, + Profile: v1.TraitProfileKnative, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + c, err := NewFakeClient("ns") + assert.Nil(t, err) + + tc := NewCatalog(context.TODO(), c) + + err = tc.apply(&environment) + + assert.Nil(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + + ct := environment.GetTrait("cron").(*cronTrait) + assert.NotNil(t, ct) + assert.NotNil(t, ct.Fallback) + + capability, ok := environment.CamelCatalog.Runtime.Capabilities["cron"] + assert.True(t, ok) + + for _, dependency := range capability.Dependencies { + assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, fmt.Sprintf("mvn:%s/%s", dependency.GroupID, dependency.ArtifactID))) + } + + assert.True(t, util.StringSliceExists(environment.Integration.Status.Dependencies, genericCronComponentFallback)) +} + +func TestCronWithMain(t *testing.T) { + catalog, err := camel.DefaultCatalog() + assert.Nil(t, err) + + traitCatalog := NewCatalog(context.TODO(), nil) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "routes.java", + Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`, + }, + Language: v1.LanguageJavaSource, + }, + }, + Resources: []v1.ResourceSpec{}, + Traits: map[string]v1.TraitSpec{ + "quarkus": { + Configuration: map[string]string{ + "enabled": "false", + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"}, + }, + Profile: v1.TraitProfileKnative, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + c, err := NewFakeClient("ns") + assert.Nil(t, err) + + tc := NewCatalog(context.TODO(), c) + + err = tc.apply(&environment) + + assert.Nil(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.Nil(t, environment.GetTrait("quarkus")) + + ct := environment.GetTrait("cron").(*cronTrait) + assert.NotNil(t, ct) + assert.Nil(t, ct.Fallback) + assert.True(t, util.StringSliceExists(environment.Interceptors, "cron")) +} + +func TestCronWithQuarkus(t *testing.T) { + catalog, err := camel.DefaultCatalog() + assert.Nil(t, err) + + traitCatalog := NewCatalog(context.TODO(), nil) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "routes.java", + Content: `from("cron:tab?schedule=0 0/2 * * ?").to("log:test")`, + }, + Language: v1.LanguageJavaSource, + }, + }, + Resources: []v1.ResourceSpec{}, + Traits: map[string]v1.TraitSpec{ + "quarkus": { + Configuration: map[string]string{ + "enabled": "true", + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.IntegrationPlatformRegistrySpec{Address: "registry"}, + }, + Profile: v1.TraitProfileKnative, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + c, err := NewFakeClient("ns") + assert.Nil(t, err) + + tc := NewCatalog(context.TODO(), c) + + err = tc.apply(&environment) + + assert.Nil(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("quarkus")) + + ct := environment.GetTrait("cron").(*cronTrait) + assert.NotNil(t, ct) + assert.NotNil(t, ct.Fallback) + assert.True(t, *ct.Fallback) + assert.True(t, util.StringSliceExists(environment.Interceptors, "cron")) +} diff --git a/pkg/trait/trait_types.go b/pkg/trait/trait_types.go index c3a50e3a43..e6d751e341 100644 --- a/pkg/trait/trait_types.go +++ b/pkg/trait/trait_types.go @@ -209,6 +209,7 @@ type Environment struct { ExecutedTraits []Trait EnvVars []corev1.EnvVar ApplicationProperties map[string]string + Interceptors []string } // ControllerStrategy is used to determine the kind of controller that needs to be created for the integration @@ -483,6 +484,7 @@ func (e *Environment) ComputeSourcesURI() []string { srcName := strings.TrimPrefix(s.Name, "/") src := path.Join(root, srcName) src = "file:" + src + interceptors := make([]string, 0, len(s.Interceptors)) params := make([]string, 0) if s.InferLanguage() != "" { @@ -494,8 +496,15 @@ func (e *Environment) ComputeSourcesURI() []string { if s.Compression { params = append(params, "compression=true") } + if s.Interceptors != nil { - params = append(params, "interceptors="+strings.Join(s.Interceptors, ",")) + interceptors = append(interceptors, s.Interceptors...) + } + if e.Interceptors != nil { + interceptors = append(interceptors, e.Interceptors...) + } + if len(interceptors) > 0 { + params = append(params, "interceptors="+strings.Join(interceptors, ",")) } if len(params) > 0 {