Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1470: support inline YAML #1472

Merged
merged 3 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,24 @@ metadata:
"name": "example"
},
"spec": {
"sources": [
"flows": [
{
"content": "from('timer:groovy?period=1000')\n .setBody().simple('Hello world from Camel K!')\n .to('log:info?showAll=false')",
"name": "example.groovy"
"from": {
"uri": "timer:yaml",
"parameters": {
"period": "1000"
},
"steps": [
{
"set-body": {
"constant": "Hello from Camel K"
}
},
{
"to": "log:info"
}
]
}
}
]
}
Expand Down
11 changes: 9 additions & 2 deletions e2e/knative/knative_platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (

. "github.com/apache/camel-k/e2e/support"
"github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util/flows"
"github.com/apache/camel-k/pkg/util/knative"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)

Expand All @@ -52,8 +54,13 @@ func TestKnativePlatformTest(t *testing.T) {
Eventually(IntegrationProfile(ns, "yaml"), TestTimeoutShort).Should(Equal(v1.TraitProfile(string(cluster))))
// Change something in the integration to produce a redeploy
Expect(UpdateIntegration(ns, "yaml", func(it *v1.Integration) {
it.Spec.Profile = v1.TraitProfile("")
it.Spec.Sources[0].Content = strings.ReplaceAll(it.Spec.Sources[0].Content, "string!", "string!!!")
it.Spec.Profile = ""
content, err := flows.Marshal(it.Spec.Flows)
assert.NoError(t, err)
newData := strings.ReplaceAll(string(content), "string!", "string!!!")
newFlows, err := flows.UnmarshalString(newData)
assert.NoError(t, err)
it.Spec.Flows = newFlows
})).To(BeNil())
// Spec profile should be reset by "kamel run"
Eventually(IntegrationSpecProfile(ns, "yaml")).Should(Equal(v1.TraitProfile("")))
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/camel/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package v1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

// ConfigurationSpec --
Expand Down Expand Up @@ -126,3 +127,15 @@ type ResourceCondition interface {
GetReason() string
GetMessage() string
}

// Flow is an unstructured object representing a Camel Flow in YAML/JSON DSL
type Flow map[string]interface{}

// DeepCopy copies the receiver, creating a new Flow.
func (in *Flow) DeepCopy() *Flow {
if in == nil {
return nil
}
out := Flow(runtime.DeepCopyJSON(*in))
return &out
}
1 change: 1 addition & 0 deletions pkg/apis/camel/v1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type IntegrationSpec struct {
Replicas *int32 `json:"replicas,omitempty"`
Sources []SourceSpec `json:"sources,omitempty"`
Flows []Flow `json:"flows,omitempty"`
Resources []ResourceSpec `json:"resources,omitempty"`
Kit string `json:"kit,omitempty"`
Dependencies []string `json:"dependencies,omitempty"`
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/camel/v1/integration_types_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (in *IntegrationSpec) AddResources(resources ...ResourceSpec) {
in.Resources = append(in.Resources, resources...)
}

// AddFlows --
func (in *IntegrationSpec) AddFlows(flows ...Flow) {
in.Flows = append(in.Flows, flows...)
}

// AddConfiguration --
func (in *IntegrationSpec) AddConfiguration(confType string, confValue string) {
in.Configuration = append(in.Configuration, ConfigurationSpec{
Expand Down Expand Up @@ -129,6 +134,26 @@ func (in *IntegrationStatus) AddOrReplaceGeneratedResources(resources ...Resourc
in.GeneratedResources = append(in.GeneratedResources, newResources...)
}

// AddOrReplaceGeneratedSources --
func (in *IntegrationStatus) AddOrReplaceGeneratedSources(sources ...SourceSpec) {
newSources := make([]SourceSpec, 0)
for _, source := range sources {
replaced := false
for i, r := range in.GeneratedSources {
if r.Name == source.Name {
in.GeneratedSources[i] = source
replaced = true
break
}
}
if !replaced {
newSources = append(newSources, source)
}
}

in.GeneratedSources = append(in.GeneratedSources, newSources...)
}

// Configurations --
func (in *IntegrationSpec) Configurations() []ConfigurationSpec {
if in == nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/camel/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/apache/camel-k/pkg/client"
"github.com/apache/camel-k/pkg/trait"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/flows"
"github.com/apache/camel-k/pkg/util/gzip"
"github.com/apache/camel-k/pkg/util/kubernetes"
k8slog "github.com/apache/camel-k/pkg/util/kubernetes/log"
Expand Down Expand Up @@ -82,6 +83,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) (*cobra.Command, *runCmdOptions)
cmd.Flags().Bool("logs", false, "Print integration logs")
cmd.Flags().Bool("sync", false, "Synchronize the local source file with the cluster, republishing at each change")
cmd.Flags().Bool("dev", false, "Enable Dev mode (equivalent to \"-w --logs --sync\")")
cmd.Flags().Bool("use-flows", true, "Write yaml sources as Flow objects in the integration custom resource")
cmd.Flags().String("profile", "", "Trait profile used for deployment")
cmd.Flags().StringArrayP("trait", "t", nil, "Configure a trait. E.g. \"-t service.enabled=false\"")
cmd.Flags().StringArray("logging-level", nil, "Configure the logging level. e.g. \"--logging-level org.apache.camel=DEBUG\"")
Expand Down Expand Up @@ -110,6 +112,7 @@ type runCmdOptions struct {
Logs bool `mapstructure:"logs" yaml:",omitempty"`
Sync bool `mapstructure:"sync" yaml:",omitempty"`
Dev bool `mapstructure:"dev" yaml:",omitempty"`
UseFlows bool `mapstructure:"use-flows" yaml:",omitempty"`
Save bool `mapstructure:"save" yaml:",omitempty" kamel:"omitsave"`
IntegrationKit string `mapstructure:"kit" yaml:",omitempty"`
IntegrationName string `mapstructure:"name" yaml:",omitempty"`
Expand Down Expand Up @@ -476,13 +479,21 @@ func (o *runCmdOptions) updateIntegrationCode(c client.Client, sources []string)
return nil, err
}

integration.Spec.AddSources(v1.SourceSpec{
DataSpec: v1.DataSpec{
Name: path.Base(source),
Content: data,
Compression: o.Compression,
},
})
if o.UseFlows && (strings.HasSuffix(source, ".yaml") || strings.HasSuffix(source, ".yml")) {
flows, err := flows.UnmarshalString(data)
if err != nil {
return nil, err
}
integration.Spec.AddFlows(flows...)
} else {
integration.Spec.AddSources(v1.SourceSpec{
DataSpec: v1.DataSpec{
Name: path.Base(source),
Content: data,
Compression: o.Compression,
},
})
}
}

for _, resource := range o.Resources {
Expand Down
21 changes: 19 additions & 2 deletions pkg/trait/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"sort"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/pkg/errors"

"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/flows"
"github.com/pkg/errors"
)

const flowsInternalSourceName = "camel-k-embedded-flow.yaml"

// Internal trait
type initTrait struct {
BaseTrait `property:",squash"`
Expand All @@ -48,6 +50,21 @@ func (t *initTrait) Configure(e *Environment) (bool, error) {

func (t *initTrait) Apply(e *Environment) error {
if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {

// Flows need to be turned into a generated source
if len(e.Integration.Spec.Flows) > 0 {
content, err := flows.Marshal(e.Integration.Spec.Flows)
if err != nil {
return err
}
e.Integration.Status.AddOrReplaceGeneratedSources(v1.SourceSpec{
DataSpec: v1.DataSpec{
Name: flowsInternalSourceName,
Content: string(content),
},
})
}

//
// Dependencies need to be recomputed in case of a trait declares a capability but as
// the dependencies trait runs earlier than some task such as the cron one, we need to
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/defaults"
"github.com/apache/camel-k/pkg/util/flows"
)

// ComputeForIntegration a digest of the fields that are relevant for the deployment
Expand Down Expand Up @@ -66,6 +67,17 @@ func ComputeForIntegration(integration *v1.Integration) (string, error) {
}
}

// Integration flows
if len(integration.Spec.Flows) > 0 {
flowData, err := flows.Marshal(integration.Spec.Flows)
if err != nil {
return "", err
}
if _, err := hash.Write(flowData); err != nil {
return "", err
}
}

// Integration dependencies
for _, item := range integration.Spec.Dependencies {
if _, err := hash.Write([]byte(item)); err != nil {
Expand Down
55 changes: 55 additions & 0 deletions pkg/util/flows/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flows

import (
"bytes"
"encoding/json"
"io"
"io/ioutil"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
yaml2 "gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/yaml"
)

// UnmarshalString reads flows contained in a string
func UnmarshalString(flowsString string) ([]v1.Flow, error) {
return Unmarshal(bytes.NewReader([]byte(flowsString)))
}

// Unmarshal flows from a stream
func Unmarshal(reader io.Reader) ([]v1.Flow, error) {
buffered, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
var flows []v1.Flow
// Using the Kubernetes decoder to turn them into JSON before unmarshal.
// This avoids having map[interface{}]interface{} objects which are not JSON compatible.
jsonData, err := yaml.ToJSON(buffered)
if err = json.Unmarshal(jsonData, &flows); err != nil {
return nil, err
}
return flows, err
}

// Marshal flows as byte array
func Marshal(flows []v1.Flow) ([]byte, error) {
return yaml2.Marshal(flows)
}
45 changes: 45 additions & 0 deletions pkg/util/flows/io_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flows

import (
"bytes"
"github.com/stretchr/testify/assert"
"testing"
)

func TestReadWriteYaml(t *testing.T) {
// yaml in conventional form as marshalled by the go runtime
yaml := `- from:
steps:
- to: log:info
uri: timer:tick
`
yamlReader := bytes.NewReader([]byte(yaml))
flows, err := Unmarshal(yamlReader)
assert.NoError(t, err)
assert.NotNil(t, flows)
assert.Len(t, flows, 1)
assert.NotNil(t, flows[0]["from"])
assert.Nil(t, flows[0]["xx"])

clone, err := Marshal(flows)
assert.NoError(t, err)
assert.NotNil(t, clone)
assert.Equal(t, yaml, string(clone))
}