Skip to content

Commit

Permalink
Fix #2083: add steps and query-like parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Apr 7, 2021
1 parent 7935c33 commit f46a2b8
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 18 deletions.
68 changes: 59 additions & 9 deletions pkg/cmd/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ func newCmdBind(rootCmdOptions *RootCmdOptions) (*cobra.Command, *bindCmdOptions

cmd.Flags().String("name", "", "Name for the binding")
cmd.Flags().StringP("output", "o", "", "Output format. One of: json|yaml")
cmd.Flags().StringArrayP("property", "p", nil, `Add a binding property in the form of "source.<key>=<value>" or "sink.<key>=<value>"`)
cmd.Flags().StringArrayP("property", "p", nil, `Add a binding property in the form of "source.<key>=<value>", "sink.<key>=<value>" or "step-<n>.<key>=<value>"`)
cmd.Flags().Bool("skip-checks", false, "Do not verify the binding for compliance with Kamelets and other Kubernetes resources")
cmd.Flags().StringArray("step", nil, `Add binding steps as Kubernetes resources, such as Kamelets. Endpoints are expected in the format "[[apigroup/]version:]kind:[namespace/]name" or plain Camel URIs.`)

return &cmd, &options
}

const (
sourceKey = "source"
sinkKey = "sink"
sourceKey = "source"
sinkKey = "sink"
stepKeyPrefix = "step-"
)

type bindCmdOptions struct {
Expand All @@ -74,6 +76,7 @@ type bindCmdOptions struct {
OutputFormat string `mapstructure:"output" yaml:",omitempty"`
Properties []string `mapstructure:"properties" yaml:",omitempty"`
SkipChecks bool `mapstructure:"skip-checks" yaml:",omitempty"`
Steps []string `mapstructure:"steps" yaml:",omitempty"`
}

func (o *bindCmdOptions) validate(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -105,6 +108,17 @@ func (o *bindCmdOptions) validate(cmd *cobra.Command, args []string) error {
if err := o.checkCompliance(cmd, sink); err != nil {
return err
}

for idx, stepDesc := range o.Steps {
stepKey := fmt.Sprintf("%s%d", stepKeyPrefix, idx)
step, err := o.decode(stepDesc, stepKey)
if err != nil {
return err
}
if err := o.checkCompliance(cmd, step); err != nil {
return err
}
}
}

return nil
Expand Down Expand Up @@ -134,6 +148,18 @@ func (o *bindCmdOptions) run(args []string) error {
},
}

if len(o.Steps) > 0 {
binding.Spec.Steps = make([]v1alpha1.Endpoint, 0)
for idx, stepDesc := range o.Steps {
stepKey := fmt.Sprintf("%s%d", stepKeyPrefix, idx)
step, err := o.decode(stepDesc, stepKey)
if err != nil {
return err
}
binding.Spec.Steps = append(binding.Spec.Steps, step)
}
}

switch o.OutputFormat {
case "":
// continue..
Expand Down Expand Up @@ -183,7 +209,8 @@ func (o *bindCmdOptions) run(args []string) error {
func (o *bindCmdOptions) decode(res string, key string) (v1alpha1.Endpoint, error) {
refConverter := reference.NewConverter(reference.KameletPrefix)
endpoint := v1alpha1.Endpoint{}
props, err := o.asEndpointProperties(o.getProperties(key))
explicitProps := o.getProperties(key)
props, err := o.asEndpointProperties(explicitProps)
if err != nil {
return endpoint, err
}
Expand All @@ -201,6 +228,26 @@ func (o *bindCmdOptions) decode(res string, key string) (v1alpha1.Endpoint, erro
if endpoint.Ref.Namespace == "" {
endpoint.Ref.Namespace = o.Namespace
}
embeddedProps, err := refConverter.PropertiesFromString(res)
if err != nil {
return endpoint, err
}
if len(embeddedProps) > 0 {
allProps := make(map[string]string)
for k, v := range explicitProps {
allProps[k] = v
}
for k, v := range embeddedProps {
allProps[k] = v
}

props, err := o.asEndpointProperties(allProps)
if err != nil {
return endpoint, err
}
endpoint.Properties = props
}

return endpoint, nil
}

Expand Down Expand Up @@ -254,14 +301,17 @@ func (o *bindCmdOptions) getProperties(refType string) map[string]string {
func (o *bindCmdOptions) parseProperty(prop string) (string, string, string, error) {
parts := strings.SplitN(prop, "=", 2)
if len(parts) != 2 {
return "", "", "", fmt.Errorf(`property %q does not follow format "[source|sink].<key>=<value>"`, prop)
return "", "", "", fmt.Errorf(`property %q does not follow format "[source|sink|step-<n>].<key>=<value>"`, prop)
}
keyParts := strings.SplitN(parts[0], ".", 2)
if len(keyParts) != 2 {
return "", "", "", fmt.Errorf(`property key %q does not follow format "[source|sink].<key>"`, parts[0])
return "", "", "", fmt.Errorf(`property key %q does not follow format "[source|sink|step-<n>].<key>"`, parts[0])
}
if keyParts[0] != sourceKey && keyParts[0] != sinkKey {
return "", "", "", fmt.Errorf(`property key %q does not start with "source." or "sink."`, parts[0])
isSource := keyParts[0] == sourceKey
isSink := keyParts[0] == sinkKey
isStep := strings.HasPrefix(keyParts[0], stepKeyPrefix)
if !isSource && !isSink && !isStep {
return "", "", "", fmt.Errorf(`property key %q does not start with "source.", "sink." or "step-<n>."`, parts[0])
}
return keyParts[0], keyParts[1], parts[1], nil
}
Expand All @@ -280,7 +330,7 @@ func (o *bindCmdOptions) checkCompliance(cmd *cobra.Command, endpoint v1alpha1.E
if err := c.Get(o.Context, key, &kamelet); err != nil {
if k8serrors.IsNotFound(err) {
// Kamelet may be in the operator namespace, but we currently don't have a way to determine it: we just warn
fmt.Fprintf(cmd.OutOrStderr(), "Warning: Kamelet %q not found in namespace %q\n", key.Name, key.Namespace)
fmt.Fprintf(cmd.ErrOrStderr(), "Warning: Kamelet %q not found in namespace %q\n", key.Name, key.Namespace)
return nil
}
return err
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,24 @@ func (command *RootCmdOptions) preRun(cmd *cobra.Command, _ []string) error {
// Furthermore, there can be any incompatibilities, as the install command deploys
// the operator version it's compatible with.
if cmd.Use != installCommand && cmd.Use != operatorCommand {
checkAndShowCompatibilityWarning(command.Context, c, command.Namespace)
checkAndShowCompatibilityWarning(cmd, command.Context, c, command.Namespace)
}
}

return nil
}

func checkAndShowCompatibilityWarning(ctx context.Context, c client.Client, namespace string) {
func checkAndShowCompatibilityWarning(cmd *cobra.Command, ctx context.Context, c client.Client, namespace string) {
operatorVersion, err := operatorVersion(ctx, c, namespace)
if err != nil {
if k8serrors.IsNotFound(err) {
fmt.Printf("No IntegrationPlatform resource in %s namespace\n", namespace)
fmt.Fprintf(cmd.ErrOrStderr(), "No IntegrationPlatform resource in %s namespace\n", namespace)
} else {
fmt.Printf("Unable to retrieve the operator version: %s\n", err.Error())
fmt.Fprintf(cmd.ErrOrStderr(), "Unable to retrieve the operator version: %s\n", err.Error())
}
} else {
if operatorVersion != "" && !compatibleVersions(operatorVersion, defaults.Version) {
fmt.Printf("You're using Camel K %s client with a %s cluster operator, it's recommended to use the same version to improve compatibility.\n\n", defaults.Version, operatorVersion)
fmt.Fprintf(cmd.ErrOrStderr(), "You're using Camel K %s client with a %s cluster operator, it's recommended to use the same version to improve compatibility.\n\n", defaults.Version, operatorVersion)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (o *runCmdOptions) syncIntegration(cmd *cobra.Command, c client.Client, sou
}
}()
} else {
fmt.Printf("Warning: the following URL will not be watched for changes: %s\n", s)
fmt.Fprintf(cmd.ErrOrStderr(), "Warning: the following URL will not be watched for changes: %s\n", s)
}
}

Expand Down
44 changes: 41 additions & 3 deletions pkg/util/reference/reference.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ limitations under the License.
package reference

import (
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"unicode"

camelv1alpha1 "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
Expand All @@ -35,8 +37,9 @@ const (
)

var (
simpleNameRegexp = regexp.MustCompile(`^(?:(?P<namespace>[a-z0-9-.]+)/)?(?P<name>[a-z0-9-.]+)$`)
fullNameRegexp = regexp.MustCompile(`^(?:(?P<apiVersion>(?:[a-z0-9-.]+/)?(?:[a-z0-9-.]+)):)?(?P<kind>[A-Za-z0-9-.]+):(?:(?P<namespace>[a-z0-9-.]+)/)?(?P<name>[a-z0-9-.]+)$`)
simpleNameRegexp = regexp.MustCompile(`^(?:(?P<namespace>[a-z0-9-.]+)/)?(?P<name>[a-z0-9-.]+)(?:$|[?].*$)`)
fullNameRegexp = regexp.MustCompile(`^(?:(?P<apiVersion>(?:[a-z0-9-.]+/)?(?:[a-z0-9-.]+)):)?(?P<kind>[A-Za-z0-9-.]+):(?:(?P<namespace>[a-z0-9-.]+)/)?(?P<name>[a-z0-9-.]+)(?:$|[?].*$)`)
queryRegexp = regexp.MustCompile(`^[^?]*[?](?P<query>.*)$`)

templates = map[string]corev1.ObjectReference{
"kamelet": corev1.ObjectReference{
Expand Down Expand Up @@ -81,6 +84,41 @@ func (c *Converter) FromString(str string) (corev1.ObjectReference, error) {
return ref, nil
}

func (c *Converter) PropertiesFromString(str string) (map[string]string, error) {
if queryRegexp.MatchString(str) {
groupNames := queryRegexp.SubexpNames()
res := make(map[string]string)
var query string
for _, match := range queryRegexp.FindAllStringSubmatch(str, -1) {
for idx, text := range match {
groupName := groupNames[idx]
switch groupName {
case "query":
query = text
}
}
}
parts := strings.Split(query, "&")
for _, part := range parts {
kv := strings.SplitN(part, "=", 2)
if len(kv) != 2 {
return nil, fmt.Errorf("invalid key=value format for string %q", part)
}
k, errkey := url.QueryUnescape(kv[0])
if errkey != nil {
return nil, errors.Wrapf(errkey, "cannot unescape key %q", kv[0])
}
v, errval := url.QueryUnescape(kv[1])
if errval != nil {
return nil, errors.Wrapf(errval, "cannot unescape value %q", kv[1])
}
res[k] = v
}
return res, nil
}
return nil, nil
}

func (c *Converter) expandReference(ref *corev1.ObjectReference) {
if template, ok := templates[ref.Kind]; ok {
if template.Kind != "" {
Expand Down
36 changes: 36 additions & 0 deletions pkg/util/reference/reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestExpressions(t *testing.T) {
error bool
ref corev1.ObjectReference
stringRef string
properties map[string]string
}{
{
name: "lowercase:source",
Expand Down Expand Up @@ -123,6 +124,37 @@ func TestExpressions(t *testing.T) {
},
stringRef: "postgres.org/v1alpha1:PostgreSQL:ns1/db",
},
{
name: "postgres.org/v1alpha1:PostgreSQL:ns1/db?user=user1&password=pwd2&host=192.168.2.2&special=%201&special2=a=1",
ref: corev1.ObjectReference{
APIVersion: "postgres.org/v1alpha1",
Kind: "PostgreSQL",
Namespace: "ns1",
Name: "db",
},
stringRef: "postgres.org/v1alpha1:PostgreSQL:ns1/db",
properties: map[string]string{
"user": "user1",
"password": "pwd2",
"host": "192.168.2.2",
"special": " 1",
"special2": "a=1",
},
},
{
name: "source?a=b&b=c&d=e",
ref: corev1.ObjectReference{
Kind: "Kamelet",
APIVersion: "camel.apache.org/v1alpha1",
Name: "source",
},
stringRef: "camel.apache.org/v1alpha1:Kamelet:source",
properties: map[string]string{
"a": "b",
"b": "c",
"d": "e",
},
},
}

for i, tc := range tests {
Expand All @@ -143,6 +175,10 @@ func TestExpressions(t *testing.T) {
asString, err2 := converter.ToString(ref)
assert.NoError(t, err2)

props, err3 := converter.PropertiesFromString(tc.name)
assert.NoError(t, err3)
assert.Equal(t, tc.properties, props)

assert.NoError(t, err)
assert.Equal(t, tc.ref, ref)
assert.Equal(t, tc.stringRef, asString)
Expand Down

0 comments on commit f46a2b8

Please sign in to comment.