From a5d4788b657c16ed8d03d168cda208d80e04b7f2 Mon Sep 17 00:00:00 2001 From: Luca Burgazzoli Date: Fri, 13 Nov 2020 16:31:01 +0100 Subject: [PATCH] knative: support configuring the knative environment using properties #506 --- .../knative/deployment/KnativeProcessor.java | 3 +- .../KnativeSinkBindingContextCustomizer.java | 47 +-- .../spi/KnativeEnvironmentConfigurer.java | 59 ++++ .../spi/KnativeResourceConfigurer.java | 128 ++++++++ ...l.component.knative.spi.KnativeEnvironment | 2 + ...amel.component.knative.spi.KnativeResource | 2 + .../camel/component/knative/spi/Knative.java | 18 +- .../knative/spi/KnativeConsumerFactory.java | 2 +- .../knative/spi/KnativeEnvironment.java | 123 +++---- .../knative/spi/KnativeProducerFactory.java | 2 +- .../knative/spi/KnativeResource.java | 306 ++++++++++++++++++ .../knative/spi/KnativeEnvironmentTest.java | 47 ++- .../src/test/resources/log4j2-test.xml | 2 +- .../knative/http/KnativeHttpConsumer.java | 38 ++- .../http/KnativeHttpConsumerFactory.java | 4 +- .../knative/http/KnativeHttpProducer.java | 10 +- .../http/KnativeHttpProducerFactory.java | 4 +- .../knative/http/KnativeHttpSupport.java | 64 ++-- .../knative/http/KnativeHttpTest.java | 115 ++++--- .../knative/http/KnativeHttpTestSupport.java | 20 +- .../test/KnativeEnvironmentSupport.java | 41 +-- .../camel/component/knative/knative.json | 8 +- .../component/knative/KnativeComponent.java | 11 +- .../knative/KnativeConfiguration.java | 12 +- .../component/knative/KnativeEndpoint.java | 146 +++++---- .../ce/AbstractCloudEventProcessor.java | 33 +- .../knative/ce/CloudEventProcessor.java | 6 +- .../knative/ce/CloudEventProcessors.java | 6 +- .../knative/KnativeTransportNoop.java | 6 +- .../it/KnativeSinkBindingApplication.java | 9 +- .../src/main/resources/application.properties | 2 +- .../source/KnativeSourceApplication.java | 2 +- .../src/main/resources/application.properties | 2 +- .../source/KnativeSourceApplication.java | 2 +- .../source/KnativeSourceApplication.java | 2 +- .../source/KnativeSourceApplication.java | 2 +- .../source/KnativeSourceApplication.java | 2 +- .../k/quarkus/it/KnativeApplication.java | 37 ++- .../camel/k/quarkus/it/KnativeTest.java | 25 +- .../camel/k/quarkus/it/Application.java | 2 +- 40 files changed, 941 insertions(+), 411 deletions(-) create mode 100644 components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeEnvironmentConfigurer.java create mode 100644 components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeResourceConfigurer.java create mode 100644 components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeEnvironment create mode 100644 components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeResource create mode 100644 components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeResource.java diff --git a/camel-k-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/KnativeProcessor.java b/camel-k-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/KnativeProcessor.java index 68f42d73a..4e3cccb3e 100644 --- a/camel-k-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/KnativeProcessor.java +++ b/camel-k-knative/deployment/src/main/java/org/apache/camel/k/quarkus/knative/deployment/KnativeProcessor.java @@ -26,6 +26,7 @@ import org.apache.camel.component.knative.KnativeComponent; import org.apache.camel.component.knative.KnativeConstants; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.k.quarkus.knative.KnativeRecorder; import org.apache.camel.quarkus.core.deployment.spi.CamelRuntimeBeanBuildItem; import org.apache.camel.quarkus.core.deployment.spi.CamelServiceFilter; @@ -43,7 +44,7 @@ List unremovableBeans() { List reflectiveClasses() { return List.of( new ReflectiveClassBuildItem(true, false, KnativeEnvironment.class), - new ReflectiveClassBuildItem(true, false, KnativeEnvironment.KnativeResource.class) + new ReflectiveClassBuildItem(true, false, KnativeResource.class) ); } diff --git a/camel-k-knative/impl/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java b/camel-k-knative/impl/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java index f409956d2..8a42c5229 100644 --- a/camel-k-knative/impl/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java +++ b/camel-k-knative/impl/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java @@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.camel.CamelContext; import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.k.ContextCustomizer; import org.apache.camel.k.annotation.Customizer; import org.apache.camel.spi.Configurer; @@ -34,13 +34,9 @@ @Configurer @Customizer("sinkbinding") public class KnativeSinkBindingContextCustomizer implements ContextCustomizer { - private String name; - private Knative.Type type; - private String kind; - private String apiVersion; @Override @@ -51,7 +47,7 @@ public void apply(CamelContext camelContext) { }); } - private Optional createSyntheticDefinition( + private Optional createSyntheticDefinition( CamelContext camelContext, String sinkName) { @@ -60,41 +56,28 @@ private Optional createSyntheticDefinition( if (ObjectHelper.isNotEmpty(kSinkUrl)) { // create a synthetic service definition to target the K_SINK url - var serviceBuilder = KnativeEnvironment.serviceBuilder(type, sinkName) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) - .withMeta(Knative.SERVICE_META_URL, kSinkUrl); - - if (ObjectHelper.isNotEmpty(kind)) { - serviceBuilder = serviceBuilder.withMeta(Knative.KNATIVE_KIND, kind); - } - - if (ObjectHelper.isNotEmpty(apiVersion)) { - serviceBuilder = serviceBuilder.withMeta(Knative.KNATIVE_API_VERSION, apiVersion); - } + KnativeResource resource = new KnativeResource(); + resource.setEndpointKind(Knative.EndpointKind.sink); + resource.setType(type); + resource.setName(name); + resource.setUrl(kSinkUrl); + resource.setObjectApiVersion(apiVersion); + resource.setObjectKind(kind); if (ObjectHelper.isNotEmpty(kCeOverride)) { try (Reader reader = new StringReader(kCeOverride)) { // assume K_CE_OVERRIDES is defined as simple key/val json - var overrides = Knative.MAPPER.readValue( - reader, - new TypeReference>() { - } - ); - - for (var entry : overrides.entrySet()) { - // generate proper ce-override meta-data for the service - // definition - serviceBuilder.withMeta( - Knative.KNATIVE_CE_OVERRIDE_PREFIX + entry.getKey(), - entry.getValue() - ); - } + Knative.MAPPER.readValue( + reader, + new TypeReference>() { + } + ).forEach(resource::addCeOverride); } catch (IOException e) { throw new RuntimeException(e); } } - return Optional.of(serviceBuilder.build()); + return Optional.of(resource); } return Optional.empty(); diff --git a/components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeEnvironmentConfigurer.java b/components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeEnvironmentConfigurer.java new file mode 100644 index 000000000..216453ff9 --- /dev/null +++ b/components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeEnvironmentConfigurer.java @@ -0,0 +1,59 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.component.knative.spi; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.spi.PropertyConfigurerGetter; +import org.apache.camel.util.CaseInsensitiveMap; +import org.apache.camel.component.knative.spi.KnativeEnvironment; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@SuppressWarnings("unchecked") +public class KnativeEnvironmentConfigurer extends org.apache.camel.support.component.PropertyConfigurerSupport implements GeneratedPropertyConfigurer, PropertyConfigurerGetter { + + private static final Map ALL_OPTIONS; + static { + Map map = new CaseInsensitiveMap(); + map.put("Resources", java.util.List.class); + ALL_OPTIONS = map; + } + + @Override + public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { + org.apache.camel.component.knative.spi.KnativeEnvironment target = (org.apache.camel.component.knative.spi.KnativeEnvironment) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "resources": + case "Resources": target.setResources(property(camelContext, java.util.List.class, value)); return true; + default: return false; + } + } + + @Override + public Map getAllOptions(Object target) { + return ALL_OPTIONS; + } + + @Override + public Object getOptionValue(Object obj, String name, boolean ignoreCase) { + org.apache.camel.component.knative.spi.KnativeEnvironment target = (org.apache.camel.component.knative.spi.KnativeEnvironment) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "resources": + case "Resources": return target.getResources(); + default: return null; + } + } + + @Override + public Object getCollectionValueType(Object target, String name, boolean ignoreCase) { + switch (ignoreCase ? name.toLowerCase() : name) { + case "resources": + case "Resources": return org.apache.camel.component.knative.spi.KnativeResource.class; + default: return null; + } + } +} + diff --git a/components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeResourceConfigurer.java b/components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeResourceConfigurer.java new file mode 100644 index 000000000..cb0b84913 --- /dev/null +++ b/components/camel-knative/camel-knative-api/src/generated/java/org/apache/camel/component/knative/spi/KnativeResourceConfigurer.java @@ -0,0 +1,128 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.component.knative.spi; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.spi.PropertyConfigurerGetter; +import org.apache.camel.util.CaseInsensitiveMap; +import org.apache.camel.component.knative.spi.KnativeResource; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@SuppressWarnings("unchecked") +public class KnativeResourceConfigurer extends org.apache.camel.support.component.PropertyConfigurerSupport implements GeneratedPropertyConfigurer, PropertyConfigurerGetter { + + private static final Map ALL_OPTIONS; + static { + Map map = new CaseInsensitiveMap(); + map.put("CeOverrides", java.util.Map.class); + map.put("CloudEventType", java.lang.String.class); + map.put("ContentType", java.lang.String.class); + map.put("EndpointKind", org.apache.camel.component.knative.spi.Knative.EndpointKind.class); + map.put("Filters", java.util.Map.class); + map.put("Metadata", java.util.Map.class); + map.put("Name", java.lang.String.class); + map.put("ObjectApiVersion", java.lang.String.class); + map.put("ObjectKind", java.lang.String.class); + map.put("ObjectName", java.lang.String.class); + map.put("Path", java.lang.String.class); + map.put("Reply", java.lang.Boolean.class); + map.put("Type", org.apache.camel.component.knative.spi.Knative.Type.class); + map.put("Url", java.lang.String.class); + ALL_OPTIONS = map; + } + + @Override + public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { + org.apache.camel.component.knative.spi.KnativeResource target = (org.apache.camel.component.knative.spi.KnativeResource) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "ceoverrides": + case "CeOverrides": target.setCeOverrides(property(camelContext, java.util.Map.class, value)); return true; + case "cloudeventtype": + case "CloudEventType": target.setCloudEventType(property(camelContext, java.lang.String.class, value)); return true; + case "contenttype": + case "ContentType": target.setContentType(property(camelContext, java.lang.String.class, value)); return true; + case "endpointkind": + case "EndpointKind": target.setEndpointKind(property(camelContext, org.apache.camel.component.knative.spi.Knative.EndpointKind.class, value)); return true; + case "filters": + case "Filters": target.setFilters(property(camelContext, java.util.Map.class, value)); return true; + case "metadata": + case "Metadata": target.setMetadata(property(camelContext, java.util.Map.class, value)); return true; + case "name": + case "Name": target.setName(property(camelContext, java.lang.String.class, value)); return true; + case "objectapiversion": + case "ObjectApiVersion": target.setObjectApiVersion(property(camelContext, java.lang.String.class, value)); return true; + case "objectkind": + case "ObjectKind": target.setObjectKind(property(camelContext, java.lang.String.class, value)); return true; + case "objectname": + case "ObjectName": target.setObjectName(property(camelContext, java.lang.String.class, value)); return true; + case "path": + case "Path": target.setPath(property(camelContext, java.lang.String.class, value)); return true; + case "reply": + case "Reply": target.setReply(property(camelContext, java.lang.Boolean.class, value)); return true; + case "type": + case "Type": target.setType(property(camelContext, org.apache.camel.component.knative.spi.Knative.Type.class, value)); return true; + case "url": + case "Url": target.setUrl(property(camelContext, java.lang.String.class, value)); return true; + default: return false; + } + } + + @Override + public Map getAllOptions(Object target) { + return ALL_OPTIONS; + } + + @Override + public Object getOptionValue(Object obj, String name, boolean ignoreCase) { + org.apache.camel.component.knative.spi.KnativeResource target = (org.apache.camel.component.knative.spi.KnativeResource) obj; + switch (ignoreCase ? name.toLowerCase() : name) { + case "ceoverrides": + case "CeOverrides": return target.getCeOverrides(); + case "cloudeventtype": + case "CloudEventType": return target.getCloudEventType(); + case "contenttype": + case "ContentType": return target.getContentType(); + case "endpointkind": + case "EndpointKind": return target.getEndpointKind(); + case "filters": + case "Filters": return target.getFilters(); + case "metadata": + case "Metadata": return target.getMetadata(); + case "name": + case "Name": return target.getName(); + case "objectapiversion": + case "ObjectApiVersion": return target.getObjectApiVersion(); + case "objectkind": + case "ObjectKind": return target.getObjectKind(); + case "objectname": + case "ObjectName": return target.getObjectName(); + case "path": + case "Path": return target.getPath(); + case "reply": + case "Reply": return target.getReply(); + case "type": + case "Type": return target.getType(); + case "url": + case "Url": return target.getUrl(); + default: return null; + } + } + + @Override + public Object getCollectionValueType(Object target, String name, boolean ignoreCase) { + switch (ignoreCase ? name.toLowerCase() : name) { + case "ceoverrides": + case "CeOverrides": return java.lang.String.class; + case "filters": + case "Filters": return java.lang.String.class; + case "metadata": + case "Metadata": return java.lang.String.class; + default: return null; + } + } +} + diff --git a/components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeEnvironment b/components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeEnvironment new file mode 100644 index 000000000..2e656649f --- /dev/null +++ b/components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeEnvironment @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.knative.spi.KnativeEnvironmentConfigurer diff --git a/components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeResource b/components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeResource new file mode 100644 index 000000000..dc135138e --- /dev/null +++ b/components/camel-knative/camel-knative-api/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.component.knative.spi.KnativeResource @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.knative.spi.KnativeResourceConfigurer diff --git a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java index 18248aff7..6831ad5de 100644 --- a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java +++ b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java @@ -22,25 +22,27 @@ public final class Knative { public static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new Jdk8Module()); + public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json"; + public static final String MIME_BATCH_CONTENT_MODE = "application/cloudevents-batch+json"; + public static final String KNATIVE_TRANSPORT_RESOURCE_PATH = "META-INF/services/org/apache/camel/knative/transport/"; + public static final String KNATIVE_FILTER_PREFIX = "filter."; public static final String KNATIVE_CE_OVERRIDE_PREFIX = "ce.override."; public static final String KNATIVE_TYPE = "knative.type"; - public static final String KNATIVE_EVENT_TYPE = "knative.event.type"; - public static final String KNATIVE_KIND = "knative.kind"; - public static final String KNATIVE_NAME = "knative.name"; - public static final String KNATIVE_API_VERSION = "knative.apiVersion"; + public static final String KNATIVE_CLOUD_EVENT_TYPE = "knative.event.type"; public static final String KNATIVE_REPLY = "knative.reply"; public static final String CONTENT_TYPE = "content.type"; - public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json"; - public static final String MIME_BATCH_CONTENT_MODE = "application/cloudevents-batch+json"; public static final String CAMEL_ENDPOINT_KIND = "camel.endpoint.kind"; - public static final String SERVICE_META_HOST = "service.host"; - public static final String SERVICE_META_ZONE = "service.zone"; public static final String SERVICE_META_PATH = "service.path"; public static final String SERVICE_META_URL = "service.url"; + public static final String KNATIVE_OBJECT_API_VERSION = "knative.apiVersion"; + public static final String KNATIVE_OBJECT_KIND = "knative.kind"; + public static final String KNATIVE_OBJECT_NAME = "knative.name"; + + private Knative() { } diff --git a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeConsumerFactory.java b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeConsumerFactory.java index 4ac824a6a..c0e63070e 100644 --- a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeConsumerFactory.java +++ b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeConsumerFactory.java @@ -33,5 +33,5 @@ public interface KnativeConsumerFactory extends Service { Consumer createConsumer( Endpoint endpoint, KnativeTransportConfiguration configuration, - KnativeEnvironment.KnativeResource service, Processor processor); + KnativeResource service, Processor processor); } diff --git a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java index cebc93207..8d77da01a 100644 --- a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java +++ b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeEnvironment.java @@ -25,19 +25,21 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.stream.Stream; import com.fasterxml.jackson.annotation.JsonAlias; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.spi.Configurer; +import org.apache.camel.spi.GeneratedPropertyConfigurer; +import org.apache.camel.support.PropertyBindingSupport; import org.apache.camel.support.ResourceHelper; /* * Assuming it is loaded from a json for now */ +@Configurer public class KnativeEnvironment { private final List resources; @@ -55,6 +57,13 @@ public List getResources() { return resources; } + @JsonAlias("services") + @JsonProperty(value = "resources", required = true) + public void setResources(List resources) { + this.resources.clear(); + this.resources.addAll(resources); + } + public Stream stream() { return resources.stream(); } @@ -69,12 +78,32 @@ public Stream lookup(Knative.Type type, String name) { // // ************************ - public static KnativeEnvironment mandatoryLoadFromSerializedString(CamelContext context, String configuration) throws IOException { + public static KnativeEnvironment mandatoryLoadFromSerializedString(String configuration) throws IOException { try (Reader reader = new StringReader(configuration)) { return Knative.MAPPER.readValue(reader, KnativeEnvironment.class); } } + public static KnativeEnvironment mandatoryLoadFromProperties(CamelContext context, Map properties) { + KnativeEnvironment environment = new KnativeEnvironment(); + + GeneratedPropertyConfigurer configurer = context.adapt(ExtendedCamelContext.class) + .getConfigurerResolver() + .resolvePropertyConfigurer(KnativeEnvironment.class.getName(), context); + + PropertyBindingSupport.build() + .withIgnoreCase(true) + .withCamelContext(context) + .withTarget(environment) + .withProperties(properties) + .withRemoveParameters(true) + .withConfigurer(configurer) + .withMandatory(true) + .bind(); + + return environment; + } + public static KnativeEnvironment mandatoryLoadFromResource(CamelContext context, String path) throws IOException { try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(context, path)) { // @@ -126,6 +155,7 @@ public static KnativeServiceBuilder serviceBuilder(Knative.Type type, String nam public static final class KnativeServiceBuilder { private final Knative.Type type; private final String name; + private Knative.EndpointKind endpointKind; private String url; private Map metadata; @@ -139,6 +169,15 @@ public KnativeServiceBuilder withUrl(String url) { return this; } + public KnativeServiceBuilder withUrlf(String format, Object... args) { + return withUrl(String.format(format, args)); + } + + public KnativeServiceBuilder withEndpointKind(Knative.EndpointKind endpointKind) { + this.endpointKind = endpointKind; + return this; + } + public KnativeServiceBuilder withMeta(Map metadata) { if (metadata == null) { return this; @@ -173,75 +212,15 @@ public KnativeServiceBuilder withMeta(String key, Enum e) { } public KnativeResource build() { - return new KnativeResource(type, name, url, metadata); - } - } - - public static final class KnativeResource { - private final String name; - private final String url; - private final Map meta; - - @JsonCreator - public KnativeResource( - @JsonProperty(value = "type", required = true) Knative.Type type, - @JsonProperty(value = "name", required = true) String name, - @JsonProperty(value = "url", required = false) String url, - @JsonProperty(value = "metadata", required = false) Map metadata) { - - this.name = name; - this.url = url; - this.meta = KnativeSupport.mergeMaps( - metadata, - Map.of( - Knative.KNATIVE_TYPE, type.name()) - ); - } - - public String getName() { - return this.name; - } - - public Map getMetadata() { - return this.meta; - } + KnativeResource answer = new KnativeResource(); + answer.setType(type); + answer.setEndpointKind(endpointKind); + answer.setName(name); + answer.setUrl(url); + answer.setMetadata(metadata); - public Knative.Type getType() { - return Knative.Type.valueOf(getMetadata().get(Knative.KNATIVE_TYPE)); - } - - public String getPath() { - return getMetadata(Knative.SERVICE_META_PATH); - } - - public String getEventType() { - return getMetadata(Knative.KNATIVE_EVENT_TYPE); - } - - public String getUrl() { - return this.url != null ? this.url : getMetadata(Knative.SERVICE_META_URL); - } - - public String getMetadata(String key) { - return getMetadata().get(key); - } - - public Optional getOptionalMetadata(String key) { - return Optional.ofNullable(getMetadata(key)); - } - - public boolean matches(Knative.Type type, String name) { - return Objects.equals(type.name(), getMetadata(Knative.KNATIVE_TYPE)) - && Objects.equals(name, getName()); - } - - @Override - public String toString() { - return "KnativeResource{" + - "name='" + name + '\'' + - ", url='" + url + '\'' + - ", meta=" + meta + - '}'; + return answer; } } + } diff --git a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeProducerFactory.java b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeProducerFactory.java index fc503df78..6c3523b7d 100644 --- a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeProducerFactory.java +++ b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeProducerFactory.java @@ -32,5 +32,5 @@ public interface KnativeProducerFactory extends Service { Producer createProducer( Endpoint endpoint, KnativeTransportConfiguration configuration, - KnativeEnvironment.KnativeResource service); + KnativeResource service); } diff --git a/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeResource.java b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeResource.java new file mode 100644 index 000000000..ace649d52 --- /dev/null +++ b/components/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/KnativeResource.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.spi; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.camel.spi.Configurer; + +@Configurer +public final class KnativeResource { + private String name; + private String url; + private Knative.Type type; + private Knative.EndpointKind endpointKind; + private Boolean reply; + private String contentType; + private String cloudEventType; + private String path; + private String objectApiVersion; + private String objectKind; + private String objectName; + private Map metadata; + private Map ceOverrides; + private Map filters; + + public String getName() { + return this.name; + } + + @JsonProperty(required = true) + public void setName(String name) { + this.name = name; + } + + @JsonProperty(required = true) + public Knative.Type getType() { + Knative.Type answer = this.type; + if (answer == null) { + String stringValue = getMetadata(Knative.KNATIVE_TYPE); + if (stringValue != null) { + answer = Knative.Type.valueOf(stringValue); + } + } + + return answer; + } + + public void setType(Knative.Type type) { + this.type = type; + } + + public Knative.EndpointKind getEndpointKind() { + Knative.EndpointKind answer = this.endpointKind; + if (answer == null) { + String stringValue = getMetadata(Knative.CAMEL_ENDPOINT_KIND); + if (stringValue != null) { + answer = Knative.EndpointKind.valueOf(stringValue); + } + } + + return answer; + } + + public void setEndpointKind(Knative.EndpointKind endpointKind) { + this.endpointKind = endpointKind; + } + + public String getUrl() { + return this.url != null ? this.url : getMetadata(Knative.SERVICE_META_URL); + } + + public void setUrl(String url) { + this.url = url; + } + + public Map getMetadata() { + if (this.metadata == null) { + this.metadata = new HashMap<>(); + } + + return this.metadata; + } + + public void setMetadata(Map metadata) { + this.metadata = metadata; + } + + @JsonIgnore + public String getMetadata(String key) { + return this.metadata != null ? metadata.get(key) : null; + } + + public void setMetadata(String key, String value) { + if (this.metadata == null) { + this.metadata = new HashMap<>(); + } + + this.metadata.put(key, value); + } + + @JsonIgnore + public Optional getOptionalMetadata(String key) { + return Optional.ofNullable(getMetadata(key)); + } + + public String getCloudEventType() { + return this.cloudEventType != null + ? this.cloudEventType + : getMetadata(Knative.KNATIVE_CLOUD_EVENT_TYPE); + } + + public void setCloudEventType(String cloudEventType) { + this.cloudEventType = cloudEventType; + } + + public String getPath() { + return this.path != null + ? this.path + : getMetadata(Knative.SERVICE_META_PATH); + } + + public void setPath(String path) { + this.path = path; + } + + public String getObjectApiVersion() { + return this.objectApiVersion != null + ? this.objectApiVersion + : getMetadata(Knative.KNATIVE_OBJECT_API_VERSION); + } + + public void setObjectApiVersion(String objectApiVersion) { + this.objectApiVersion = objectApiVersion; + } + + public String getObjectKind() { + return this.objectKind != null + ? this.objectKind + : getMetadata(Knative.KNATIVE_OBJECT_KIND); + } + + public void setObjectKind(String objectKind) { + this.objectKind = objectKind; + } + + public String getObjectName() { + return this.objectName != null + ? this.objectName + : getMetadata(Knative.KNATIVE_OBJECT_NAME); + } + + public void setObjectName(String objectName) { + this.objectName = objectName; + } + + public Map getCeOverrides() { + Map answer = new HashMap<>(); + if (this.ceOverrides != null) { + answer.putAll(this.ceOverrides); + } + if (this.metadata != null) { + for (Map.Entry entry : this.metadata.entrySet()) { + if (entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) { + final String key = entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length()); + final String val = entry.getValue(); + + answer.put(key, val); + } + } + } + + return answer; + } + + public void setCeOverrides(Map ceOverride) { + this.ceOverrides = ceOverride; + } + + public void addCeOverride(String key, String value) { + if (this.ceOverrides == null) { + this.ceOverrides = new HashMap<>(); + } + + this.ceOverrides.put(key, value); + this.ceOverrides.put(key, value); + } + + public Map getFilters() { + Map answer = new HashMap<>(); + if (this.filters != null) { + answer.putAll(this.filters); + } + if (this.metadata != null) { + for (Map.Entry entry : this.metadata.entrySet()) { + if (entry.getKey().startsWith(Knative.KNATIVE_FILTER_PREFIX)) { + final String key = entry.getKey().substring(Knative.KNATIVE_FILTER_PREFIX.length()); + final String val = entry.getValue(); + + answer.put(key, val); + } + } + } + + return answer; + } + + public void setFilters(Map filters) { + this.filters = filters; + } + + public void addFilter(String key, String value) { + if (this.filters == null) { + this.filters = new HashMap<>(); + } + + this.filters.put(key, value); + } + + public Boolean getReply() { + return this.reply != null + ? this.reply + : getOptionalMetadata(Knative.KNATIVE_REPLY).map(Boolean::parseBoolean).orElse(true); + } + + public void setReply(Boolean reply) { + this.reply = reply; + } + + public String getContentType() { + return this.contentType != null + ? this.contentType + : getMetadata(Knative.CONTENT_TYPE); + } + + public void setContentType(String contentType) { + this.contentType = contentType; + } + + public boolean matches(Knative.Type type, String name) { + if (type == null || name == null) { + return false; + } + + return Objects.equals(type, getType()) + && Objects.equals(name, getName()); + } + + @Override + public String toString() { + return "KnativeResource{" + + "name='" + name + '\'' + + ", url='" + url + '\'' + + ", metadata=" + metadata + + ", ceOverrides=" + ceOverrides + + ", filters=" + filters + + ", type=" + type + + ", endpointKind=" + endpointKind + + ", reply=" + reply + + ", contentType='" + contentType + '\'' + + '}'; + } + + public static KnativeResource from(KnativeResource resource) { + KnativeResource answer = new KnativeResource(); + + answer.name = resource.name; + answer.url = resource.url; + answer.type = resource.type; + answer.endpointKind = resource.endpointKind; + answer.reply = resource.reply; + answer.contentType = resource.contentType; + answer.cloudEventType = resource.cloudEventType; + answer.path = resource.path; + + if (resource.metadata != null) { + answer.metadata = new HashMap<>(resource.metadata); + } + if (resource.ceOverrides != null) { + answer.ceOverrides = new HashMap<>(resource.ceOverrides); + } + if (resource.filters != null) { + answer.filters = new HashMap<>(resource.filters); + } + + return answer; + } +} diff --git a/components/camel-knative/camel-knative-api/src/test/java/org/apache/camel/component/knative/spi/KnativeEnvironmentTest.java b/components/camel-knative/camel-knative-api/src/test/java/org/apache/camel/component/knative/spi/KnativeEnvironmentTest.java index eb219c735..4b60555ac 100644 --- a/components/camel-knative/camel-knative-api/src/test/java/org/apache/camel/component/knative/spi/KnativeEnvironmentTest.java +++ b/components/camel-knative/camel-knative-api/src/test/java/org/apache/camel/component/knative/spi/KnativeEnvironmentTest.java @@ -16,11 +16,17 @@ */ package org.apache.camel.component.knative.spi; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultCamelContext; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static org.apache.camel.util.CollectionHelper.mapOf; import static org.assertj.core.api.Assertions.assertThat; public class KnativeEnvironmentTest { @@ -31,12 +37,43 @@ public class KnativeEnvironmentTest { "{\"resources\":[{\"type\":\"endpoint\",\"name\":\"knative3\",\"metadata\":{\"camel.endpoint.kind\":\"source\",\"knative.apiVersion\":\"serving.knative.dev/v1\",\"knative.kind\":\"Service\",\"service.path\":\"/\"}}]}" }) public void testKnativeEnvironmentDeserializationFromString(String content) throws Exception { + KnativeEnvironment env = KnativeEnvironment.mandatoryLoadFromSerializedString(content); + List res = env.lookup(Knative.Type.endpoint, "knative3").collect(Collectors.toList()); + + assertThat(res).hasSize(1); + assertThat(res).first().satisfies(resource -> { + assertThat(resource.getName()).isEqualTo("knative3"); + assertThat(resource.getEndpointKind()).isEqualTo(Knative.EndpointKind.source); + assertThat(resource.getObjectApiVersion()).isEqualTo("serving.knative.dev/v1"); + assertThat(resource.getObjectKind()).isEqualTo("Service"); + assertThat(resource.getPath()).isEqualTo("/"); + assertThat(resource.getMetadata()).isNotEmpty(); + }); + } + + @Test + public void testKnativeEnvironmentDeserializationFromProperties() { + Map properties = mapOf( + "resources[0].name", "knative3", + "resources[0].type", "endpoint", + "resources[0].endpointKind", "source", + "resources[0].objectApiVersion", "serving.knative.dev/v1", + "resources[0].objectKind", "Service", + "resources[0].path", "/" + ); + CamelContext context = new DefaultCamelContext(); - KnativeEnvironment env = KnativeEnvironment.mandatoryLoadFromSerializedString(context, content); + KnativeEnvironment env = KnativeEnvironment.mandatoryLoadFromProperties(context, properties); + List res = env.lookup(Knative.Type.endpoint, "knative3").collect(Collectors.toList()); - assertThat(env.lookup(Knative.Type.endpoint, "knative3")) - .first() - .hasFieldOrPropertyWithValue("url", null) - .hasFieldOrProperty("metadata"); + assertThat(res).hasSize(1); + assertThat(res).first().satisfies(resource -> { + assertThat(resource.getName()).isEqualTo("knative3"); + assertThat(resource.getEndpointKind()).isEqualTo(Knative.EndpointKind.source); + assertThat(resource.getObjectApiVersion()).isEqualTo("serving.knative.dev/v1"); + assertThat(resource.getObjectKind()).isEqualTo("Service"); + assertThat(resource.getPath()).isEqualTo("/"); + assertThat(resource.getMetadata()).isEmpty(); + }); } } diff --git a/components/camel-knative/camel-knative-api/src/test/resources/log4j2-test.xml b/components/camel-knative/camel-knative-api/src/test/resources/log4j2-test.xml index 8c95e5457..0caeffa3b 100644 --- a/components/camel-knative/camel-knative-api/src/test/resources/log4j2-test.xml +++ b/components/camel-knative/camel-knative-api/src/test/resources/log4j2-test.xml @@ -30,7 +30,7 @@ - + diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index bfc057d74..792363d5f 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -39,12 +39,13 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.MessageHelper; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { private final KnativeTransportConfiguration configuration; private final Predicate filter; - private final KnativeEnvironment.KnativeResource serviceDefinition; + private final KnativeResource resource; private final Router router; private final HeaderFilterStrategy headerFilterStrategy; @@ -66,17 +67,17 @@ public class KnativeHttpConsumer extends DefaultConsumer { public KnativeHttpConsumer( KnativeTransportConfiguration configuration, Endpoint endpoint, - KnativeEnvironment.KnativeResource serviceDefinition, + KnativeResource resource, Router router, Processor processor) { super(endpoint, processor); this.configuration = configuration; - this.serviceDefinition = serviceDefinition; + this.resource = resource; this.router = router; this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); - this.filter = KnativeHttpSupport.createFilter(serviceDefinition); + this.filter = KnativeHttpSupport.createFilter(this.configuration.getCloudEvent(), resource); this.preallocateBodyBuffer = true; } @@ -107,7 +108,7 @@ public void setPreallocateBodyBuffer(boolean preallocateBodyBuffer) { @Override protected void doStart() throws Exception { if (route == null) { - String path = serviceDefinition.getPath(); + String path = resource.getPath(); if (ObjectHelper.isEmpty(path)) { path = "/"; } @@ -261,8 +262,8 @@ private Message toMessage(HttpServerRequest request, Exchange exchange) { Message message = exchange.getMessage(); String path = request.path(); - if (serviceDefinition.getPath() != null) { - String endpointPath = serviceDefinition.getPath(); + if (resource.getPath() != null) { + String endpointPath = resource.getPath(); String matchPath = path.toLowerCase(Locale.US); String match = endpointPath.toLowerCase(Locale.US); @@ -332,16 +333,21 @@ private Buffer computeResponseBody(Message message) throws NoTypeConversionAvail // we failed due an exception so print it as plain text StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); - exception.printStackTrace(pw); - // the body should then be the stacktrace - body = sw.toString().getBytes(StandardCharsets.UTF_8); - // force content type to be text/plain as that is what the stacktrace is - message.setHeader(Exchange.CONTENT_TYPE, "text/plain"); + try { + exception.printStackTrace(pw); - // and mark the exception as failure handled, as we handled it by returning - // it as the response - ExchangeHelper.setFailureHandled(message.getExchange()); + // the body should then be the stacktrace + body = sw.toString().getBytes(StandardCharsets.UTF_8); + // force content type to be text/plain as that is what the stacktrace is + message.setHeader(Exchange.CONTENT_TYPE, "text/plain"); + + // and mark the exception as failure handled, as we handled it by returning + // it as the response + ExchangeHelper.setFailureHandled(message.getExchange()); + } finally { + IOHelper.close(pw, sw); + } } return body != null diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java index 9df388c3e..0b66b431d 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java @@ -25,7 +25,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Processor; import org.apache.camel.component.knative.spi.KnativeConsumerFactory; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.support.service.ServiceSupport; @@ -53,7 +53,7 @@ public CamelContext getCamelContext() { } @Override - public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeResource service, Processor processor) { + public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeResource service, Processor processor) { Objects.requireNonNull(this.router, "router"); return new KnativeHttpConsumer( diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index 47f00eb12..62f7b54a4 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -34,7 +34,7 @@ import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.MessageHelper; @@ -46,7 +46,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class); - private final KnativeEnvironment.KnativeResource serviceDefinition; + private final KnativeResource serviceDefinition; private final Vertx vertx; private final WebClientOptions clientOptions; private final HeaderFilterStrategy headerFilterStrategy; @@ -57,7 +57,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { public KnativeHttpProducer( Endpoint endpoint, - KnativeEnvironment.KnativeResource serviceDefinition, + KnativeResource serviceDefinition, Vertx vertx, WebClientOptions clientOptions) { super(endpoint); @@ -171,7 +171,7 @@ protected void doStop() throws Exception { } } - private String getUrl(KnativeEnvironment.KnativeResource definition) { + private String getUrl(KnativeResource definition) { String url = definition.getUrl(); if (url == null) { throw new RuntimeCamelException("Unable to determine the `url` for definition: " + definition); @@ -192,7 +192,7 @@ private String getUrl(KnativeEnvironment.KnativeResource definition) { return getEndpoint().getCamelContext().resolvePropertyPlaceholders(url); } - private String getHost(KnativeEnvironment.KnativeResource definition) { + private String getHost(KnativeResource definition) { String url = getUrl(definition); try { diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java index d4466b90e..441554557 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java @@ -24,8 +24,8 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Endpoint; import org.apache.camel.Producer; -import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.knative.spi.KnativeProducerFactory; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.support.service.ServiceSupport; @@ -62,7 +62,7 @@ public CamelContext getCamelContext() { } @Override - public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeResource service) { + public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeResource service) { Objects.requireNonNull(this.vertx, "vertx"); return new KnativeHttpProducer( diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java index cf68524db..f49c4f086 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpSupport.java @@ -17,17 +17,16 @@ package org.apache.camel.component.knative.http; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Predicate; -import java.util.stream.Collectors; import io.vertx.core.http.HttpServerRequest; import org.apache.camel.Message; import org.apache.camel.component.knative.spi.CloudEvent; -import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; public final class KnativeHttpSupport { private KnativeHttpSupport() { @@ -51,40 +50,47 @@ public static void appendHeader(Map headers, String key, Object headers.put(key, value); } - public static Predicate createFilter(KnativeEnvironment.KnativeResource serviceDefinition) { - Map filters = serviceDefinition.getMetadata().entrySet().stream() - .filter(e -> e.getKey().startsWith(Knative.KNATIVE_FILTER_PREFIX)) - .collect(Collectors.toMap( - e -> e.getKey().substring(Knative.KNATIVE_FILTER_PREFIX.length()), - Map.Entry::getValue - )); + public static Predicate createFilter(CloudEvent cloudEvent, KnativeResource resource) { + final Map filters = new HashMap<>(); - return v -> { - if (filters.isEmpty()) { - return true; - } + for (Map.Entry entry: resource.getFilters().entrySet()) { + cloudEvent.attribute(entry.getKey()) + .map(CloudEvent.Attribute::http) + .ifPresentOrElse( + k -> filters.put(k, entry.getValue()), + () -> filters.put(entry.getKey(), entry.getValue()) + ); + } - for (Map.Entry entry : filters.entrySet()) { - final List values = v.headers().getAll(entry.getKey()); - if (values.isEmpty()) { - return false; + return new Predicate() { + @Override + public boolean test(HttpServerRequest request) { + if (filters.isEmpty()) { + return true; } - String val = values.get(values.size() - 1); - int idx = val.lastIndexOf(','); + for (Map.Entry entry : filters.entrySet()) { + final List values = request.headers().getAll(entry.getKey()); + if (values.isEmpty()) { + return false; + } - if (values.size() == 1 && idx != -1) { - val = val.substring(idx + 1); - val = val.trim(); - } + String val = values.get(values.size() - 1); + int idx = val.lastIndexOf(','); - boolean matches = Objects.equals(entry.getValue(), val) || val.matches(entry.getValue()); - if (!matches) { - return false; + if (values.size() == 1 && idx != -1) { + val = val.substring(idx + 1); + val = val.trim(); + } + + boolean matches = Objects.equals(entry.getValue(), val) || val.matches(entry.getValue()); + if (!matches) { + return false; + } } - } - return true; + return true; + } }; } diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 3e45b8421..0a6509e2e 100644 --- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -18,14 +18,10 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import io.restassured.RestAssured; import io.restassured.mapper.ObjectMapperType; @@ -42,7 +38,6 @@ import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.http.base.HttpOperationFailedException; import org.apache.camel.impl.DefaultCamelContext; @@ -131,7 +126,7 @@ void doTestKnativeSource(CloudEvent ce, String basePath, String path) throws Exc "myEndpoint", Map.of( Knative.SERVICE_META_PATH, ObjectHelper.supplyIfEmpty(path, () -> "/"), - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -213,7 +208,7 @@ void testInvokeEndpoint(CloudEvent ce) throws Exception { "myEndpoint", String.format("http://%s:%d/a/path", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -254,7 +249,7 @@ void testInvokeEndpointByUrl(CloudEvent ce) throws Exception { null, Map.of( Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort), - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -296,7 +291,7 @@ void testInvokeEndpointByUrlAndPath(CloudEvent ce) throws Exception { Map.of( Knative.SERVICE_META_PATH, "/with/subpath", Knative.SERVICE_META_URL, String.format("http://localhost:%d/a/path", platformHttpPort), - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -334,7 +329,7 @@ void testConsumeStructuredContent(CloudEvent ce) throws Exception { sourceEndpoint( "myEndpoint", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -448,7 +443,7 @@ void testConsumeContent(CloudEvent ce) throws Exception { sourceEndpoint( "myEndpoint", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -495,14 +490,14 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception { sourceEndpoint( "ep1", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE1" )), sourceEndpoint( "ep2", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE2" )) @@ -578,14 +573,14 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception { sourceEndpoint( "ep1", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[01234]" )), sourceEndpoint( "ep2", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + httpAttribute(ce, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE), "CE[56789]" )) @@ -731,7 +726,7 @@ void testReply(CloudEvent ce) throws Exception { sourceEndpoint( "from", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.from", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event.from", Knative.CONTENT_TYPE, "text/plain" )), endpoint( @@ -739,7 +734,7 @@ void testReply(CloudEvent ce) throws Exception { "to", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event.to", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event.to", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -777,7 +772,7 @@ void testReplyCloudEventHeaders(CloudEvent ce) throws Exception { sourceEndpoint( "from", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), endpoint( @@ -785,7 +780,7 @@ void testReplyCloudEventHeaders(CloudEvent ce) throws Exception { "to", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -825,7 +820,7 @@ void testInvokeServiceWithoutUrl(CloudEvent ce) throws Exception { "test", null, Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -853,7 +848,7 @@ void testInvokeNotExistingEndpoint(CloudEvent ce) throws Exception { "test", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -882,7 +877,7 @@ void testRemoveConsumer(CloudEvent ce) throws Exception { sourceEndpoint( "ep1", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h1" ) @@ -890,7 +885,7 @@ void testRemoveConsumer(CloudEvent ce) throws Exception { sourceEndpoint( "ep2", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h2" ) @@ -933,7 +928,7 @@ void testAddConsumer(CloudEvent ce) throws Exception { sourceEndpoint( "ep1", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h1" ) @@ -941,7 +936,7 @@ void testAddConsumer(CloudEvent ce) throws Exception { sourceEndpoint( "ep2", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_FILTER_PREFIX + "h", "h2" ) @@ -988,7 +983,7 @@ void testInvokeEndpointWithError(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1025,13 +1020,13 @@ void testEvents(CloudEvent ce) throws Exception { "default", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), sourceEvent( "default", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1070,20 +1065,20 @@ void testEventsWithResourceRef(CloudEvent ce) throws Exception { "default", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_KIND, "MyObject", - Knative.KNATIVE_API_VERSION, "v1", - Knative.KNATIVE_NAME, "myName1" + Knative.KNATIVE_OBJECT_KIND, "MyObject", + Knative.KNATIVE_OBJECT_API_VERSION, "v1", + Knative.KNATIVE_OBJECT_NAME, "myName1" )), sourceEvent( "default", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_KIND, "MyOtherObject", - Knative.KNATIVE_API_VERSION, "v2", - Knative.KNATIVE_NAME, "myName2" + Knative.KNATIVE_OBJECT_KIND, "MyOtherObject", + Knative.KNATIVE_OBJECT_API_VERSION, "v2", + Knative.KNATIVE_OBJECT_NAME, "myName2" )) ); @@ -1132,20 +1127,20 @@ void testConsumeContentWithResourceRef(CloudEvent ce) throws Exception { sourceEndpoint( "myEndpoint", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_KIND, "MyObject", - Knative.KNATIVE_API_VERSION, "v1", - Knative.KNATIVE_NAME, "myName1" + Knative.KNATIVE_OBJECT_KIND, "MyObject", + Knative.KNATIVE_OBJECT_API_VERSION, "v1", + Knative.KNATIVE_OBJECT_NAME, "myName1" )), sourceEndpoint( "myEndpoint", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_KIND, "MyObject", - Knative.KNATIVE_API_VERSION, "v2", - Knative.KNATIVE_NAME, "myName2" + Knative.KNATIVE_OBJECT_KIND, "MyObject", + Knative.KNATIVE_OBJECT_API_VERSION, "v2", + Knative.KNATIVE_OBJECT_NAME, "myName2" )) ); @@ -1198,7 +1193,7 @@ void testWrongMethod(CloudEvent ce) throws Exception { sourceEndpoint( "myEndpoint", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1230,7 +1225,7 @@ void testNoBody(CloudEvent ce) throws Exception { "myEndpoint", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1264,7 +1259,7 @@ void testNoContent(CloudEvent ce) throws Exception { "messages", null, Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), channel( @@ -1272,7 +1267,7 @@ void testNoContent(CloudEvent ce) throws Exception { "messages", String.format("http://%s:%d", platformHttpHost, platformHttpPort), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )), channel( @@ -1280,7 +1275,7 @@ void testNoContent(CloudEvent ce) throws Exception { "words", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1314,7 +1309,7 @@ void testNoReply(CloudEvent ce) throws Exception { sourceChannel( "channel", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); @@ -1350,7 +1345,7 @@ void testNoReplyMeta(CloudEvent ce) throws Exception { sourceChannel( "channel", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_REPLY, "false" )) @@ -1387,7 +1382,7 @@ void testNoReplyMetaOverride(CloudEvent ce) throws Exception { sourceChannel( "channel", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_REPLY, "true" )) @@ -1429,7 +1424,7 @@ void testHeaders(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1481,7 +1476,7 @@ void testHeadersInReply(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1526,7 +1521,7 @@ void testHeadersOverrideFromEnv(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", Knative.KNATIVE_CE_OVERRIDE_PREFIX + typeHeaderKey, typeHeaderVal, Knative.KNATIVE_CE_OVERRIDE_PREFIX + sourceHeaderKey, sourceHeaderVal @@ -1572,7 +1567,7 @@ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1618,7 +1613,7 @@ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1663,7 +1658,7 @@ void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1704,7 +1699,7 @@ void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception { "ep", String.format("http://%s:%d", server.getHost(), server.getPort()), Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" ) ) @@ -1869,7 +1864,7 @@ void testSlowConsumer(CloudEvent ce) throws Exception { sourceEndpoint( "start", Map.of( - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_CLOUD_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain" )) ); diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java index b1e4d76ee..c81ec34e2 100644 --- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java +++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Consumer; @@ -27,6 +28,7 @@ import org.apache.camel.component.knative.KnativeComponent; import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.component.platform.http.PlatformHttpComponent; import org.apache.camel.component.platform.http.PlatformHttpConstants; @@ -39,24 +41,32 @@ public final class KnativeHttpTestSupport { private KnativeHttpTestSupport() { } - public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, KnativeEnvironment.KnativeResource... definitions) { + public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, KnativeResource... definitions) { return configureKnativeComponent(context, ce, Arrays.asList(definitions)); } - public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, List definitions) { + public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, Map properties) { + return configureKnativeComponent(context, ce, KnativeEnvironment.mandatoryLoadFromProperties(context, properties)); + } + + public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, List definitions) { + return configureKnativeComponent(context, ce, new KnativeEnvironment(definitions)); + } + + public static KnativeComponent configureKnativeComponent(CamelContext context, CloudEvent ce, KnativeEnvironment environment) { KnativeComponent component = context.getComponent("knative", KnativeComponent.class); component.setCloudEventsSpecVersion(ce.version()); - component.setEnvironment(new KnativeEnvironment(definitions)); + component.setEnvironment(environment); component.setConsumerFactory(new KnativeHttpConsumerFactory() { @Override - public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeResource service, Processor processor) { + public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeResource service, Processor processor) { this.setRouter(VertxPlatformHttpRouter.lookup(context)); return super.createConsumer(endpoint, config, service, processor); } }); component.setProducerFactory(new KnativeHttpProducerFactory() { @Override - public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeEnvironment.KnativeResource service) { + public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeResource service) { this.setVertx(VertxPlatformHttpRouter.lookup(context).vertx()); return super.createProducer(endpoint, config, service); } diff --git a/components/camel-knative/camel-knative-test/src/main/java/org/apache/camel/component/knative/test/KnativeEnvironmentSupport.java b/components/camel-knative/camel-knative-test/src/main/java/org/apache/camel/component/knative/test/KnativeEnvironmentSupport.java index b00ccf1cd..0f6e72d30 100644 --- a/components/camel-knative/camel-knative-test/src/main/java/org/apache/camel/component/knative/test/KnativeEnvironmentSupport.java +++ b/components/camel-knative/camel-knative-test/src/main/java/org/apache/camel/component/knative/test/KnativeEnvironmentSupport.java @@ -20,80 +20,81 @@ import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; public final class KnativeEnvironmentSupport { private KnativeEnvironmentSupport() { } - public static KnativeEnvironment.KnativeResource endpoint(Knative.EndpointKind endpointKind, String name, String url) { + public static KnativeResource endpoint(Knative.EndpointKind endpointKind, String name, String url) { return KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, name) .withUrl(url) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .withEndpointKind(endpointKind) .build(); } - public static KnativeEnvironment.KnativeResource endpoint(Knative.EndpointKind endpointKind, String name, String url, Map metadata) { + public static KnativeResource endpoint(Knative.EndpointKind endpointKind, String name, String url, Map metadata) { return KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, name) .withUrl(url) .withMeta(metadata) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .withEndpointKind(endpointKind) .build(); } - public static KnativeEnvironment.KnativeResource sourceEndpoint(String name, Map metadata) { + public static KnativeResource sourceEndpoint(String name, Map metadata) { return KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, name) .withMeta(metadata) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source.name()) + .withEndpointKind(Knative.EndpointKind.source) .build(); } - public static KnativeEnvironment.KnativeResource channel(Knative.EndpointKind endpointKind, String name, String url) { + public static KnativeResource channel(Knative.EndpointKind endpointKind, String name, String url) { return KnativeEnvironment.serviceBuilder(Knative.Type.channel, name) .withUrl(url) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .withEndpointKind(endpointKind) .build(); } - public static KnativeEnvironment.KnativeResource channel(Knative.EndpointKind endpointKind, String name, String url, Map metadata) { + public static KnativeResource channel(Knative.EndpointKind endpointKind, String name, String url, Map metadata) { return KnativeEnvironment.serviceBuilder(Knative.Type.channel, name) .withUrl(url) .withMeta(metadata) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .withEndpointKind(endpointKind) .build(); } - public static KnativeEnvironment.KnativeResource sourceChannel(String name, Map metadata) { + public static KnativeResource sourceChannel(String name, Map metadata) { return KnativeEnvironment.serviceBuilder(Knative.Type.channel, name) .withMeta(metadata) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .withEndpointKind(Knative.EndpointKind.source) .build(); } - public static KnativeEnvironment.KnativeResource event(Knative.EndpointKind endpointKind, String name, String url) { + public static KnativeResource event(Knative.EndpointKind endpointKind, String name, String url) { return KnativeEnvironment.serviceBuilder(Knative.Type.event, name) .withUrl(url) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .withEndpointKind(endpointKind) .build(); } - public static KnativeEnvironment.KnativeResource sourceEvent(String name) { + public static KnativeResource sourceEvent(String name) { return KnativeEnvironment.serviceBuilder(Knative.Type.event, name) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .withEndpointKind(Knative.EndpointKind.source) .build(); } - public static KnativeEnvironment.KnativeResource sourceEvent(String name, Map metadata) { + public static KnativeResource sourceEvent(String name, Map metadata) { return KnativeEnvironment.serviceBuilder(Knative.Type.event, name) .withMeta(metadata) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .withEndpointKind(Knative.EndpointKind.source) .build(); } - public static KnativeEnvironment.KnativeResource event(Knative.EndpointKind endpointKind, String name, String url, Map metadata) { + public static KnativeResource event(Knative.EndpointKind endpointKind, String name, String url, Map metadata) { return KnativeEnvironment.serviceBuilder(Knative.Type.event, name) .withUrl(url) .withMeta(metadata) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind) + .withEndpointKind(endpointKind) .build(); } } diff --git a/components/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json b/components/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json index f4f9ee2f5..3dd09cbcb 100644 --- a/components/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json +++ b/components/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json @@ -22,14 +22,14 @@ "lenientProperties": false }, "componentProperties": { - "ceOverride": { "kind": "property", "displayName": "Ce Override", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "ce.override.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "CloudEvent headers to override" }, + "ceOverride": { "kind": "property", "displayName": "Ce Override", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "ce.override.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "CloudEvent headers to override" }, "cloudEventsSpecVersion": { "kind": "property", "displayName": "Cloud Events Spec Version", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "0.1", "0.2", "0.3", "1.0" ], "deprecated": false, "secret": false, "defaultValue": "1.0", "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the version of the cloudevents spec." }, "cloudEventsType": { "kind": "property", "displayName": "Cloud Events Type", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "org.apache.camel.event", "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the event-type information of the produced events." }, "configuration": { "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.KnativeConfiguration", "deprecated": false, "secret": false, "description": "Set the configuration." }, "consumerFactory": { "kind": "property", "displayName": "Consumer Factory", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.KnativeConsumerFactory", "deprecated": false, "secret": false, "description": "The protocol consumer factory." }, "environment": { "kind": "property", "displayName": "Environment", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.KnativeEnvironment", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "The environment" }, "environmentPath": { "kind": "property", "displayName": "Environment Path", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The path ot the environment definition" }, - "filters": { "kind": "property", "displayName": "Filters", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "filter.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the filters." }, + "filters": { "kind": "property", "displayName": "Filters", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "filter.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the filters." }, "producerFactory": { "kind": "property", "displayName": "Producer Factory", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.KnativeProducerFactory", "deprecated": false, "secret": false, "description": "The protocol producer factory." }, "transportOptions": { "kind": "property", "displayName": "Transport Options", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "transport.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the transport options." }, "typeId": { "kind": "property", "displayName": "Type Id", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "The name of the service to lookup from the KnativeEnvironment." }, @@ -45,11 +45,11 @@ "properties": { "type": { "kind": "path", "displayName": "Type", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.Knative.Type", "enum": [ "endpoint", "channel", "event" ], "deprecated": false, "secret": false, "description": "The Knative resource type" }, "typeId": { "kind": "path", "displayName": "Type Id", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "The identifier of the Knative resource" }, - "ceOverride": { "kind": "parameter", "displayName": "Ce Override", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "ce.override.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "CloudEvent headers to override" }, + "ceOverride": { "kind": "parameter", "displayName": "Ce Override", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "ce.override.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "CloudEvent headers to override" }, "cloudEventsSpecVersion": { "kind": "parameter", "displayName": "Cloud Events Spec Version", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "0.1", "0.2", "0.3", "1.0" ], "deprecated": false, "secret": false, "defaultValue": "1.0", "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the version of the cloudevents spec." }, "cloudEventsType": { "kind": "parameter", "displayName": "Cloud Events Type", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "defaultValue": "org.apache.camel.event", "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the event-type information of the produced events." }, "environment": { "kind": "parameter", "displayName": "Environment", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.knative.spi.KnativeEnvironment", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "The environment" }, - "filters": { "kind": "parameter", "displayName": "Filters", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "filter.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the filters." }, + "filters": { "kind": "parameter", "displayName": "Filters", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "filter.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the filters." }, "transportOptions": { "kind": "parameter", "displayName": "Transport Options", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map", "prefix": "transport.", "deprecated": false, "secret": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Set the transport options." }, "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." }, "replyWithCloudEvent": { "kind": "parameter", "displayName": "Reply With Cloud Event", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.knative.KnativeConfiguration", "configurationField": "configuration", "description": "Transforms the reply into a cloud event that will be processed by the caller. When listening to events from a Knative Broker, if this flag is enabled, replies will be published to the same Broker where the request comes from (beware that if you don't change the type of the received message, you may create a loop and receive your same reply). When this flag is disabled, CloudEvent headers are removed from the reply." }, diff --git a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index b2c51b73d..6583ed1a0 100644 --- a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -219,6 +219,7 @@ protected void doStop() throws Exception { // // ************************ + @SuppressWarnings("unchecked") @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { if (ObjectHelper.isEmpty(remaining)) { @@ -230,14 +231,14 @@ protected Endpoint createEndpoint(String uri, String remaining, Map transportOptions; @UriParam(prefix = "filter.") - private Map filters; + private Map filters; @UriParam(prefix = "ce.override.") - private Map ceOverride; + private Map ceOverride; @UriParam(label = "advanced") private String apiVersion; @UriParam(label = "advanced") @@ -141,25 +141,25 @@ public void addTransportOptions(String key, Object value) { this.transportOptions.put(key, value); } - public Map getFilters() { + public Map getFilters() { return filters; } /** * Set the filters. */ - public void setFilters(Map filters) { + public void setFilters(Map filters) { this.filters = new HashMap<>(filters); } - public Map getCeOverride() { + public Map getCeOverride() { return ceOverride; } /** * CloudEvent headers to override */ - public void setCeOverride(Map ceOverride) { + public void setCeOverride(Map ceOverride) { this.ceOverride = new HashMap<>(ceOverride); } diff --git a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index ac75eda39..ef668a1b9 100644 --- a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -16,7 +16,6 @@ */ package org.apache.camel.component.knative; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -30,8 +29,9 @@ import org.apache.camel.component.knative.ce.CloudEventProcessor; import org.apache.camel.component.knative.ce.CloudEventProcessors; import org.apache.camel.component.knative.spi.CloudEvent; +import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.processor.Pipeline; import org.apache.camel.spi.UriEndpoint; @@ -51,7 +51,8 @@ title = "Knative", category = Category.CLOUD) public class KnativeEndpoint extends DefaultEndpoint { - private final CloudEventProcessor cloudEvent; + private final CloudEvent cloudEvent; + private final CloudEventProcessor cloudEventProcessor; @UriPath(description = "The Knative resource type") private final Knative.Type type; @@ -66,7 +67,8 @@ public KnativeEndpoint(String uri, KnativeComponent component, Knative.Type type this.type = type; this.typeId = name; this.configuration = configuration; - this.cloudEvent = CloudEventProcessors.fromSpecVersion(configuration.getCloudEventsSpecVersion()); + this.cloudEvent = CloudEvents.fromSpecVersion(configuration.getCloudEventsSpecVersion()); + this.cloudEventProcessor = CloudEventProcessors.fromSpecVersion(configuration.getCloudEventsSpecVersion()); } @Override @@ -76,8 +78,8 @@ public KnativeComponent getComponent() { @Override public Producer createProducer() throws Exception { - final KnativeEnvironment.KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.sink); - final Processor ceProcessor = cloudEvent.producer(this, service); + final KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.sink); + final Processor ceProcessor = cloudEventProcessor.producer(this, service); final Producer producer = getComponent().getProducerFactory().createProducer(this, createTransportConfiguration(service), service); PropertyBindingSupport.build() @@ -93,9 +95,9 @@ public Producer createProducer() throws Exception { @Override public Consumer createConsumer(Processor processor) throws Exception { - final KnativeEnvironment.KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.source); - final Processor ceProcessor = cloudEvent.consumer(this, service); - final Processor replyProcessor = configuration.isReplyWithCloudEvent() ? cloudEvent.producer(this, service) : null; + final KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.source); + final Processor ceProcessor = cloudEventProcessor.consumer(this, service); + final Processor replyProcessor = configuration.isReplyWithCloudEvent() ? cloudEventProcessor.producer(this, service) : null; final Processor pipeline = Pipeline.newInstance(getCamelContext(), ceProcessor, processor, replyProcessor); final Consumer consumer = getComponent().getConsumerFactory().createConsumer(this, createTransportConfiguration(service), service, pipeline); @@ -125,6 +127,10 @@ public String getTypeId() { return typeId; } + public CloudEvent getCloudEvent() { + return cloudEvent; + } + public KnativeConfiguration getConfiguration() { return configuration; } @@ -140,8 +146,8 @@ protected void doInit() throws Exception { } } - KnativeEnvironment.KnativeResource lookupServiceDefinition(Knative.EndpointKind endpointKind) { - String serviceName = configuration.getTypeId(); + KnativeResource lookupServiceDefinition(Knative.EndpointKind endpointKind) { + final String resourceName = configuration.getTypeId(); // // look-up service definition by service name first then if not found try to look it up by using @@ -149,99 +155,99 @@ KnativeEnvironment.KnativeResource lookupServiceDefinition(Knative.EndpointKind // the endpoint uri but for events it is not possible so default should always be there for events // unless the service name is define as an endpoint option. // - KnativeEnvironment.KnativeResource service = lookupServiceDefinition(serviceName, endpointKind) + KnativeResource resource = lookupServiceDefinition(resourceName, endpointKind) .or(() -> lookupServiceDefinition("default", endpointKind)) - .orElseThrow(() -> new IllegalArgumentException(String.format("Unable to find a service definition for %s/%s/%s", type, endpointKind, serviceName))); + .orElseThrow(() -> new IllegalArgumentException( + String.format("Unable to find a resource definition for %s/%s/%s", type, endpointKind, resourceName)) + ); - final Map metadata = new HashMap<>(service.getMetadata()); + // + // We need to create a new resource as we need to inject additional data from the component + // configuration. + // + KnativeResource answer = KnativeResource.from(resource); - for (Map.Entry entry : configuration.getFilters().entrySet()) { + // + // Set-up filters from config + // + for (Map.Entry entry : configuration.getFilters().entrySet()) { String key = entry.getKey(); - Object val = entry.getValue(); - - if (val instanceof String) { - if (!key.startsWith(Knative.KNATIVE_FILTER_PREFIX)) { - key = Knative.KNATIVE_FILTER_PREFIX + key; - } + String val = entry.getValue(); - metadata.put(key, (String) val); + if (key.startsWith(Knative.KNATIVE_FILTER_PREFIX)) { + key = key.substring(Knative.KNATIVE_FILTER_PREFIX.length()); } + + answer.addFilter(key, val); } - for (Map.Entry entry : configuration.getCeOverride().entrySet()) { + // + // Set-up overrides from config + // + for (Map.Entry entry : configuration.getCeOverride().entrySet()) { String key = entry.getKey(); - Object val = entry.getValue(); + String val = entry.getValue(); - if (val instanceof String) { - if (!key.startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) { - key = Knative.KNATIVE_CE_OVERRIDE_PREFIX + key; - } - - metadata.put(key, (String) val); + if (key.startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) { + key = key.substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length()); } + + answer.addCeOverride(key, val); } - if (service.getType() == Knative.Type.event) { - metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName); - metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), serviceName); + // + // For event type endpoints se need to add an additional filter to filter out events received + // based on the given type. + // + if (resource.getType() == Knative.Type.event) { + answer.setCloudEventType(resourceName); + answer.addFilter(CloudEvent.CAMEL_CLOUD_EVENT_TYPE, resourceName); } - return new KnativeEnvironment.KnativeResource( - service.getType(), - service.getName(), - service.getUrl(), - metadata - ); + return answer; } - Optional lookupServiceDefinition(String name, Knative.EndpointKind endpointKind) { + Optional lookupServiceDefinition(String name, Knative.EndpointKind endpointKind) { return servicesDefinitions() .filter(definition -> definition.matches(this.type, name)) - .filter(serviceFilter(endpointKind)) + .filter(serviceFilter(this.configuration, endpointKind)) .findFirst(); } - private KnativeTransportConfiguration createTransportConfiguration(KnativeEnvironment.KnativeResource definition) { + private KnativeTransportConfiguration createTransportConfiguration(KnativeResource definition) { return new KnativeTransportConfiguration( - this.cloudEvent.cloudEvent(), + this.cloudEventProcessor.cloudEvent(), !this.configuration.isReplyWithCloudEvent(), - ObjectHelper.supplyIfEmpty( - this.configuration.getReply(), - () -> definition.getOptionalMetadata(Knative.KNATIVE_REPLY).map(Boolean::parseBoolean).orElse(true) - ) + ObjectHelper.supplyIfEmpty(this.configuration.getReply(), definition::getReply) ); } - private Stream servicesDefinitions() { + private Stream servicesDefinitions() { return Stream.concat( - getCamelContext().getRegistry().findByType(KnativeEnvironment.KnativeResource.class).stream(), + getCamelContext().getRegistry().findByType(KnativeResource.class).stream(), this.configuration.getEnvironment().stream() ); } - private Predicate serviceFilter(Knative.EndpointKind endpointKind) { - return s -> { - final String type = s.getMetadata(Knative.CAMEL_ENDPOINT_KIND); - if (!Objects.equals(endpointKind.name(), type)) { - return false; - } - - final String apiv = s.getMetadata(Knative.KNATIVE_API_VERSION); - if (configuration.getApiVersion() != null && !Objects.equals(apiv, configuration.getApiVersion())) { - return false; - } - - final String kind = s.getMetadata(Knative.KNATIVE_KIND); - if (configuration.getKind() != null && !Objects.equals(kind, configuration.getKind())) { - return false; - } + private static Predicate serviceFilter(KnativeConfiguration configuration, Knative.EndpointKind endpointKind) { + return new Predicate() { + @Override + public boolean test(KnativeResource resource) { + if (!Objects.equals(endpointKind, resource.getEndpointKind())) { + return false; + } + if (configuration.getApiVersion() != null && !Objects.equals(resource.getObjectApiVersion(), configuration.getApiVersion())) { + return false; + } + if (configuration.getKind() != null && !Objects.equals(resource.getObjectKind(), configuration.getKind())) { + return false; + } + if (configuration.getName() != null && !Objects.equals(resource.getObjectName(), configuration.getName())) { + return false; + } - final String name = s.getMetadata(Knative.KNATIVE_NAME); - if (configuration.getName() != null && !Objects.equals(name, configuration.getName())) { - return false; + return true; } - - return true; }; } } diff --git a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java index 61082a663..322f50b51 100644 --- a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java +++ b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java @@ -30,7 +30,7 @@ import org.apache.camel.component.knative.KnativeEndpoint; import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,17 +48,16 @@ public CloudEvent cloudEvent() { @SuppressWarnings("unchecked") @Override - public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeResource service) { + public Processor consumer(KnativeEndpoint endpoint, KnativeResource service) { return exchange -> { if (Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_BATCH_CONTENT_MODE)) { throw new UnsupportedOperationException("Batched CloudEvents are not yet supported"); } if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) { - final CloudEvent ce = cloudEvent(); final Map headers = exchange.getIn().getHeaders(); - for (CloudEvent.Attribute attribute: ce.attributes()) { + for (CloudEvent.Attribute attribute: cloudEvent.attributes()) { Object val = headers.remove(attribute.http()); if (val != null) { headers.put(attribute.id(), val); @@ -75,12 +74,12 @@ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeRe protected abstract void decodeStructuredContent(Exchange exchange, Map content); @Override - public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeResource service) { + public Processor producer(KnativeEndpoint endpoint, KnativeResource service) { final CloudEvent ce = cloudEvent(); final Logger logger = LoggerFactory.getLogger(getClass()); + final String contentType = service.getContentType(); return exchange -> { - final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); final Map headers = exchange.getMessage().getHeaders(); for (CloudEvent.Attribute attribute: ce.attributes()) { @@ -90,7 +89,9 @@ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeRe } } - headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + if (contentType != null) { + headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + } // // in case of events, if the type of the event is defined as URI param so we need @@ -108,10 +109,11 @@ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeRe headers.put(cloudEvent().mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), endpoint.getTypeId()); } else { setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () -> { - return service.getMetadata().getOrDefault( - Knative.KNATIVE_EVENT_TYPE, - endpoint.getConfiguration().getCloudEventsType() - ); + String eventType = service.getCloudEventType(); + if (eventType == null) { + eventType = endpoint.getConfiguration().getCloudEventsType(); + } + return eventType; }); } @@ -125,14 +127,7 @@ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeRe return eventTime; }); - for (Map.Entry entry: service.getMetadata().entrySet()) { - if (entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) { - final String key = entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length()); - final String val = entry.getValue(); - - headers.put(key, val); - } - } + headers.putAll(service.getCeOverrides()); }; } diff --git a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java index 4e392952d..860c29c64 100644 --- a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java +++ b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessor.java @@ -19,10 +19,10 @@ import org.apache.camel.Processor; import org.apache.camel.component.knative.KnativeEndpoint; import org.apache.camel.component.knative.spi.CloudEvent; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; public interface CloudEventProcessor { CloudEvent cloudEvent(); - Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeResource service); - Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeResource service); + Processor consumer(KnativeEndpoint endpoint, KnativeResource service); + Processor producer(KnativeEndpoint endpoint, KnativeResource service); } diff --git a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java index 5d3b55824..fcc491404 100644 --- a/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java +++ b/components/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java @@ -26,7 +26,7 @@ import org.apache.camel.component.knative.KnativeEndpoint; import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.CloudEvents; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import static org.apache.camel.util.ObjectHelper.ifNotEmpty; @@ -159,12 +159,12 @@ public CloudEvent cloudEvent() { } @Override - public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeResource service) { + public Processor consumer(KnativeEndpoint endpoint, KnativeResource service) { return instance.consumer(endpoint, service); } @Override - public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeResource service) { + public Processor producer(KnativeEndpoint endpoint, KnativeResource service) { return instance.producer(endpoint, service); } diff --git a/components/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeTransportNoop.java b/components/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeTransportNoop.java index cf46f16d9..64e616e3e 100644 --- a/components/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeTransportNoop.java +++ b/components/camel-knative/camel-knative/src/test/java/org/apache/camel/component/knative/KnativeTransportNoop.java @@ -22,8 +22,8 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.knative.spi.KnativeConsumerFactory; -import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.knative.spi.KnativeProducerFactory; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.DefaultProducer; @@ -38,7 +38,7 @@ public void stop() { } @Override - public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration configuration, KnativeEnvironment.KnativeResource service) { + public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration configuration, KnativeResource service) { return new DefaultProducer(endpoint) { @Override public void process(Exchange exchange) throws Exception { @@ -47,7 +47,7 @@ public void process(Exchange exchange) throws Exception { } @Override - public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration configuration, KnativeEnvironment.KnativeResource service, Processor processor) { + public Consumer createConsumer(Endpoint endpoint, KnativeTransportConfiguration configuration, KnativeResource service, Processor processor) { return new DefaultConsumer(endpoint, processor); } } diff --git a/itests/camel-k-itests-knative-sinkbinding/src/main/java/org/apache/camel/k/quarkus/it/KnativeSinkBindingApplication.java b/itests/camel-k-itests-knative-sinkbinding/src/main/java/org/apache/camel/k/quarkus/it/KnativeSinkBindingApplication.java index 1ba664cbb..9367adf7d 100644 --- a/itests/camel-k-itests-knative-sinkbinding/src/main/java/org/apache/camel/k/quarkus/it/KnativeSinkBindingApplication.java +++ b/itests/camel-k-itests-knative-sinkbinding/src/main/java/org/apache/camel/k/quarkus/it/KnativeSinkBindingApplication.java @@ -27,8 +27,7 @@ import javax.ws.rs.core.MediaType; import org.apache.camel.CamelContext; -import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.k.knative.customizer.KnativeSinkBindingContextCustomizer; @Path("/test") @@ -58,7 +57,7 @@ public JsonObject sinkbindingCustomizer() { @Path("/resource/{name}") @Produces(MediaType.APPLICATION_JSON) public JsonObject resource(@PathParam("name") String name) { - var resource = context.getRegistry().lookupByNameAndType(name, KnativeEnvironment.KnativeResource.class); + var resource = context.getRegistry().lookupByNameAndType(name, KnativeResource.class); if (resource == null) { return Json.createObjectBuilder().build(); } @@ -67,8 +66,8 @@ public JsonObject resource(@PathParam("name") String name) { .add("url", resource.getUrl()) .add("name", resource.getName()) .add("type", resource.getType().name()) - .add("apiVersion", resource.getMetadata(Knative.KNATIVE_API_VERSION)) - .add("kind", resource.getMetadata(Knative.KNATIVE_KIND)) + .add("apiVersion", resource.getObjectApiVersion()) + .add("kind", resource.getObjectKind()) .build(); } } diff --git a/itests/camel-k-itests-knative-sinkbinding/src/main/resources/application.properties b/itests/camel-k-itests-knative-sinkbinding/src/main/resources/application.properties index 3ce549334..5b8f41abc 100644 --- a/itests/camel-k-itests-knative-sinkbinding/src/main/resources/application.properties +++ b/itests/camel-k-itests-knative-sinkbinding/src/main/resources/application.properties @@ -18,6 +18,6 @@ # # Quarkus # -quarkus.log.console.enable = false +quarkus.log.console.enable = true quarkus.banner.enabled = false diff --git a/itests/camel-k-itests-knative-source-groovy/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java b/itests/camel-k-itests-knative-source-groovy/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java index c893a1d3d..57bfa050c 100644 --- a/itests/camel-k-itests-knative-source-groovy/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java +++ b/itests/camel-k-itests-knative-source-groovy/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java @@ -105,7 +105,7 @@ KnativeEnvironment environment( return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "sink") .withUrl("http://localhost:" + port + "/test/toUpper") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .build() ); } diff --git a/itests/camel-k-itests-knative-source-groovy/src/main/resources/application.properties b/itests/camel-k-itests-knative-source-groovy/src/main/resources/application.properties index cdec8cc8c..1e67872ea 100644 --- a/itests/camel-k-itests-knative-source-groovy/src/main/resources/application.properties +++ b/itests/camel-k-itests-knative-source-groovy/src/main/resources/application.properties @@ -18,7 +18,7 @@ # # Quarkus # -quarkus.log.console.enable = false +quarkus.log.console.enable = true quarkus.banner.enabled = false # diff --git a/itests/camel-k-itests-knative-source-java/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java b/itests/camel-k-itests-knative-source-java/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java index c893a1d3d..57bfa050c 100644 --- a/itests/camel-k-itests-knative-source-java/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java +++ b/itests/camel-k-itests-knative-source-java/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java @@ -105,7 +105,7 @@ KnativeEnvironment environment( return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "sink") .withUrl("http://localhost:" + port + "/test/toUpper") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .build() ); } diff --git a/itests/camel-k-itests-knative-source-js/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java b/itests/camel-k-itests-knative-source-js/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java index c893a1d3d..57bfa050c 100644 --- a/itests/camel-k-itests-knative-source-js/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java +++ b/itests/camel-k-itests-knative-source-js/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java @@ -105,7 +105,7 @@ KnativeEnvironment environment( return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "sink") .withUrl("http://localhost:" + port + "/test/toUpper") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .build() ); } diff --git a/itests/camel-k-itests-knative-source-xml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java b/itests/camel-k-itests-knative-source-xml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java index c893a1d3d..57bfa050c 100644 --- a/itests/camel-k-itests-knative-source-xml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java +++ b/itests/camel-k-itests-knative-source-xml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java @@ -105,7 +105,7 @@ KnativeEnvironment environment( return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "sink") .withUrl("http://localhost:" + port + "/test/toUpper") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .build() ); } diff --git a/itests/camel-k-itests-knative-source-yaml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java b/itests/camel-k-itests-knative-source-yaml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java index c893a1d3d..57bfa050c 100644 --- a/itests/camel-k-itests-knative-source-yaml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java +++ b/itests/camel-k-itests-knative-source-yaml/src/main/java/org/apache/camel/k/quarkus/it/knative/source/KnativeSourceApplication.java @@ -105,7 +105,7 @@ KnativeEnvironment environment( return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "sink") .withUrl("http://localhost:" + port + "/test/toUpper") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .build() ); } diff --git a/itests/camel-k-itests-knative/src/main/java/org/apache/camel/k/quarkus/it/KnativeApplication.java b/itests/camel-k-itests-knative/src/main/java/org/apache/camel/k/quarkus/it/KnativeApplication.java index 514103551..cc4259222 100644 --- a/itests/camel-k-itests-knative/src/main/java/org/apache/camel/k/quarkus/it/KnativeApplication.java +++ b/itests/camel-k-itests-knative/src/main/java/org/apache/camel/k/quarkus/it/KnativeApplication.java @@ -16,13 +16,13 @@ */ package org.apache.camel.k.quarkus.it; -import java.util.Map; +import java.util.Objects; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.json.Json; import javax.json.JsonObject; -import javax.ws.rs.Consumes; +import javax.json.bind.JsonbBuilder; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -47,29 +47,32 @@ public class KnativeApplication { @Inject FluentProducerTemplate template; - @SuppressWarnings("unchecked") @GET @Path("/inspect") - @Consumes(MediaType.TEXT_PLAIN) @Produces(MediaType.APPLICATION_JSON) public JsonObject inspect() { - var endpoint = context.getEndpoint("knative:endpoint/from", KnativeEndpoint.class); - var envMeta = endpoint.getConfiguration().getEnvironment().lookup(Knative.Type.endpoint, "from") - .filter(entry -> Knative.EndpointKind.source.name().equals(entry.getMetadata().get(Knative.CAMEL_ENDPOINT_KIND))) - .findFirst() - .map(def -> Json.createObjectBuilder((Map)def.getMetadata())) - .orElseThrow(IllegalArgumentException::new); - return Json.createObjectBuilder() - .add("env-meta", envMeta) .add("component", Json.createObjectBuilder() .add("producer-factory", context.getComponent("knative", KnativeComponent.class).getProducerFactory().getClass().getName()) .add("consumer-factory", context.getComponent("knative", KnativeComponent.class).getConsumerFactory().getClass().getName()) - .add("", "") .build()) .build(); } + @GET + @Path("/inspectResource") + @Produces(MediaType.APPLICATION_JSON) + public String inspectResource() { + return context.getEndpoint("knative:endpoint/from", KnativeEndpoint.class) + .getConfiguration() + .getEnvironment() + .lookup(Knative.Type.endpoint, "from") + .filter(entry -> Objects.equals(Knative.EndpointKind.source, entry.getEndpointKind())) + .findFirst() + .map(def -> JsonbBuilder.create().toJson(def)) + .orElseThrow(IllegalArgumentException::new); + } + @POST @Path("/execute") @Produces(MediaType.TEXT_PLAIN) @@ -83,16 +86,16 @@ KnativeEnvironment environment( return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "process") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .withEndpointKind(Knative.EndpointKind.source) .withMeta(Knative.SERVICE_META_PATH, "/knative") .build(), KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "from") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source) + .withEndpointKind(Knative.EndpointKind.source) .withMeta(Knative.SERVICE_META_PATH, "/knative") - .withMeta(Knative.KNATIVE_EVENT_TYPE, "camel.k.evt") + .withMeta(Knative.KNATIVE_CLOUD_EVENT_TYPE, "camel.k.evt") .build(), KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "process") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .withMeta(Knative.SERVICE_META_URL, String.format("http://localhost:%d/knative", port)) .build() ); diff --git a/itests/camel-k-itests-knative/src/test/java/org/apache/camel/k/quarkus/it/KnativeTest.java b/itests/camel-k-itests-knative/src/test/java/org/apache/camel/k/quarkus/it/KnativeTest.java index 07cbc49ac..acf7e89ff 100644 --- a/itests/camel-k-itests-knative/src/test/java/org/apache/camel/k/quarkus/it/KnativeTest.java +++ b/itests/camel-k-itests-knative/src/test/java/org/apache/camel/k/quarkus/it/KnativeTest.java @@ -30,6 +30,7 @@ import org.apache.camel.component.knative.spi.CloudEvent; import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; +import org.apache.camel.component.knative.spi.KnativeResource; import org.junit.jupiter.api.Test; import static io.restassured.RestAssured.given; @@ -38,11 +39,9 @@ @QuarkusTest public class KnativeTest { - @Test public void inspect() { JsonPath p = RestAssured.given() - .contentType(MediaType.TEXT_PLAIN) .accept(MediaType.APPLICATION_JSON) .get("/test/inspect") .then() @@ -51,18 +50,28 @@ public void inspect() { .body() .jsonPath(); - assertThat(p.getMap("env-meta", String.class, String.class)) - .containsEntry(Knative.KNATIVE_EVENT_TYPE, "camel.k.evt") - .containsEntry(Knative.SERVICE_META_PATH, "/knative") - .containsEntry("camel.endpoint.kind", "source"); - - assertThat(p.getString("component.consumer-factory")) .isEqualTo(KnativeHttpConsumerFactory.class.getName()); assertThat(p.getString("component.producer-factory")) .isEqualTo(KnativeHttpProducerFactory.class.getName()); } + @Test + public void inspectResource() { + KnativeResource p = RestAssured.given() + .accept(MediaType.APPLICATION_JSON) + .get("/test/inspectResource") + .then() + .statusCode(200) + .extract() + .body() + .as(KnativeResource.class); + + assertThat(p.getEndpointKind()).isEqualTo(Knative.EndpointKind.source); + assertThat(p.getMetadata()).containsEntry(Knative.KNATIVE_CLOUD_EVENT_TYPE, "camel.k.evt"); + assertThat(p.getMetadata()).containsEntry(Knative.SERVICE_META_PATH, "/knative"); + } + @Test public void invokeEndpoint() { final String payload = "test"; diff --git a/itests/camel-k-itests-runtime-yaml/src/main/java/org/apache/camel/k/quarkus/it/Application.java b/itests/camel-k-itests-runtime-yaml/src/main/java/org/apache/camel/k/quarkus/it/Application.java index 694f64ed8..72b98451e 100644 --- a/itests/camel-k-itests-runtime-yaml/src/main/java/org/apache/camel/k/quarkus/it/Application.java +++ b/itests/camel-k-itests-runtime-yaml/src/main/java/org/apache/camel/k/quarkus/it/Application.java @@ -31,7 +31,7 @@ KnativeEnvironment knativeEnvironment() { return KnativeEnvironment.on( KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, "sink") .withUrl("http://localhost:8080") - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withEndpointKind(Knative.EndpointKind.sink) .build() ); }