diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java index f9c5ad982..df174947e 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java @@ -16,15 +16,73 @@ */ package org.apache.camel.component.knative.spi; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; + public interface CloudEvent { + /** + * The CloudEvent spec version. + */ String version(); - Attributes attributes(); - interface Attributes { + /** + * List of supported attributes. + */ + Collection attributes(); + + /** + * Find attribute by id. + */ + default Optional attribute(String id) { + return attributes().stream() + .filter(a -> Objects.equals(id, a.id())) + .findFirst(); + } + + /** + * Mandatory find attribute by id. + */ + default Attribute mandatoryAttribute(String id) { + return attributes().stream() + .filter(a -> Objects.equals(id, a.id())) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Unable to find attribute with id: " + id)); + } + + interface Attribute { + /** + * The ID of the attributes, can be used to look it up. + */ String id(); - String source(); - String spec(); - String type(); - String time(); + + /** + * The name of the http header. + */ + String http(); + + /** + * The name of the json field. + */ + String json(); + + static Attribute simple(String id, String http, String json) { + return new Attribute() { + @Override + public String id() { + return id; + } + + @Override + public String http() { + return http; + } + + @Override + public String json() { + return json; + } + }; + } } } diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java deleted file mode 100644 index 845104fcc..000000000 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.spi; - -final class CloudEventV01 implements CloudEvent { - public static final String VERSION = "0.1"; - public static final Attributes ATTRIBUTES = new Attributes() { - @Override - public String id() { - return "CE-EventID"; - } - - @Override - public String source() { - return "CE-Source"; - } - - @Override - public String spec() { - return "CE-CloudEventsVersion"; - } - - @Override - public String type() { - return "CE-EventType"; - } - - @Override - public String time() { - return "CE-EventTime"; - } - }; - - @Override - public String version() { - return VERSION; - } - - @Override - public Attributes attributes() { - return ATTRIBUTES; - } -} diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java deleted file mode 100644 index 7b28dcca4..000000000 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.spi; - -final class CloudEventV02 implements CloudEvent { - public static final String VERSION = "0.2"; - public static final Attributes ATTRIBUTES = new Attributes() { - @Override - public String id() { - return "ce-id"; - } - - @Override - public String source() { - return "ce-source"; - } - - @Override - public String spec() { - return "ce-specversion"; - } - - @Override - public String type() { - return "ce-type"; - } - - @Override - public String time() { - return "ce-time"; - } - }; - - @Override - public String version() { - return VERSION; - } - - @Override - public Attributes attributes() { - return ATTRIBUTES; - } -} diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java index ff5542f20..7d7e7a990 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java @@ -16,11 +16,60 @@ */ package org.apache.camel.component.knative.spi; +import java.util.Arrays; +import java.util.Collection; import java.util.Objects; public enum CloudEvents implements CloudEvent { - V01(new CloudEventV01()), - V02(new CloudEventV02()); + // + // V0.1 - https://github.com/cloudevents/spec/blob/v0.1/spec.md + // + V01(new CloudEventImpl( + "0.1", + Arrays.asList( + Attribute.simple("type", "CE-EventType", "eventType"), + Attribute.simple("type.version", "CE-EventTypeVersion", "eventTypeVersion"), + Attribute.simple("version", "CE-CloudEventsVersion", "cloudEventsVersion"), + Attribute.simple("source", "CE-Source", "source"), + Attribute.simple("id", "CE-EventID", "eventID"), + Attribute.simple("time", "CE-EventTime", "eventTime"), + Attribute.simple("schema.url", "CE-SchemaURL", "schemaURL"), + Attribute.simple("content.type", "ContentType", "contentType"), + Attribute.simple("extensions", "CE-Extensions", "extensions") + ) + )), + // + // V0.2 - https://github.com/cloudevents/spec/blob/v0.2/spec.md + // + V02(new CloudEventImpl( + "0.2", + Arrays.asList( + Attribute.simple("type", "ce-type", "type"), + Attribute.simple("version", "ce-specversion", "specversion"), + Attribute.simple("source", "ce-source", "source"), + Attribute.simple("id", "ce-id", "id"), + Attribute.simple("time", "ce-time", "time"), + Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"), + Attribute.simple("content.type", "Content-Type", "contenttype") + ) + )), + // + // V0.3 - https://github.com/cloudevents/spec/blob/v0.3/spec.md + // + V03(new CloudEventImpl( + "0.3", + Arrays.asList( + Attribute.simple("id", "ce-id", "id"), + Attribute.simple("source", "ce-source", "source"), + Attribute.simple("version", "ce-specversion", "specversion"), + Attribute.simple("type", "ce-type", "type"), + Attribute.simple("data.content.encoding", "ce-datacontentencoding", "datacontentencoding"), + Attribute.simple("data.content.type", "ce-datacontenttype", "datacontenttype"), + Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"), + Attribute.simple("subject", "ce-subject", "subject"), + Attribute.simple("time", "ce-time", "time") + ) + )); private final CloudEvent instance; @@ -34,7 +83,7 @@ public String version() { } @Override - public Attributes attributes() { + public Collection attributes() { return instance.attributes(); } @@ -47,5 +96,25 @@ public static CloudEvent fromSpecVersion(String version) { throw new IllegalArgumentException("Unable to find an implementation fo CloudEvents spec: " + version); } + + private static class CloudEventImpl implements CloudEvent { + private final String version; + private final Collection attributes; + + public CloudEventImpl(String version, Collection attributes) { + this.version = version; + this.attributes = attributes; + } + + @Override + public String version() { + return version; + } + + @Override + public Collection attributes() { + return attributes; + } + } } diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java index 75aa80e8c..65c239368 100644 --- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java +++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java @@ -30,6 +30,7 @@ public final class Knative { public static final String KNATIVE_API_VERSION = "knative.apiVersion"; public static final String CONTENT_TYPE = "content.type"; public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json"; + public static final String MIME_BATCH_CONTENT_MODE = "application/cloudevents-batch+json"; public static final String CAMEL_ENDPOINT_KIND = "camel.endpoint.kind"; public static final String SERVICE_META_HOST = "service.host"; diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 9898f22e3..e13acafa6 100644 --- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -91,7 +91,8 @@ void testCreateComponent() { private static Stream provideCloudEventsImplementations() { return Stream.of( Arguments.of(CloudEvents.V01), - Arguments.of(CloudEvents.V02) + Arguments.of(CloudEvents.V02), + Arguments.of(CloudEvents.V03) ); } @@ -129,12 +130,12 @@ public void configure() throws Exception { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.attributes().source(), "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "knative://endpoint/myEndpoint"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -176,12 +177,12 @@ public void configure() throws Exception { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.attributes().id(), "myEventID"); - mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -190,7 +191,7 @@ public void configure() throws Exception { e -> { e.getMessage().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE); - if (Objects.equals(ce.version(), ce.version())) { + if (Objects.equals(CloudEvents.V01.version(), ce.version())) { e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf( "cloudEventsVersion", ce.version(), "eventType", "org.apache.camel.event", @@ -200,8 +201,7 @@ public void configure() throws Exception { "contentType", "text/plain", "data", "test" ))); - } - if (Objects.equals(CloudEvents.V02.version(), ce.version())) { + } else if (Objects.equals(CloudEvents.V02.version(), ce.version())) { e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf( "specversion", ce.version(), "type", "org.apache.camel.event", @@ -211,6 +211,18 @@ public void configure() throws Exception { "contenttype", "text/plain", "data", "test" ))); + } else if (Objects.equals(CloudEvents.V03.version(), ce.version())) { + e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf( + "specversion", ce.version(), + "type", "org.apache.camel.event", + "id", "myEventID", + "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()), + "source", "/somewhere", + "datacontenttype", "text/plain", + "data", "test" + ))); + } else { + throw new IllegalArgumentException("Unknown CloudEvent spec: " + ce.version()); } } ); @@ -252,12 +264,12 @@ public void configure() throws Exception { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.attributes().id(), "myEventID"); - mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -265,11 +277,11 @@ public void configure() throws Exception { "direct:source", e -> { e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain"); - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "/somewhere"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere"); e.getMessage().setBody("test"); } ); @@ -289,7 +301,7 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE1" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE1" )), KnativeEnvironment.endpoint( Knative.EndpointKind.source, @@ -299,7 +311,7 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE2" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE2" )) ); @@ -331,41 +343,41 @@ public void configure() throws Exception { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock1.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.attributes().source(), "CE1"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock2.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.attributes().source(), "CE2"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID1"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "CE1"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1"); } ); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID2"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "CE2"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2"); } ); @@ -385,7 +397,7 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE[01234]" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[01234]" )), KnativeEnvironment.endpoint( Knative.EndpointKind.source, @@ -395,7 +407,7 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception { KnativeSupport.mapOf( Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", Knative.CONTENT_TYPE, "text/plain", - Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE[56789]" + Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[56789]" )) ); @@ -427,41 +439,41 @@ public void configure() throws Exception { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock1.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.attributes().source(), "CE0"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE0"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock2.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.attributes().source(), "CE5"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE5"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID1"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "CE0"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE0"); } ); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID2"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "CE5"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE5"); } ); @@ -508,41 +520,41 @@ public void configure() throws Exception { context.start(); MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class); - mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock1.expectedHeaderReceived(ce.attributes().type(), "event1"); - mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1"); - mock1.expectedHeaderReceived(ce.attributes().source(), "CE1"); + mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1"); + mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1"); mock1.expectedBodiesReceived("test"); mock1.expectedMessageCount(1); MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); - mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock2.expectedHeaderReceived(ce.attributes().type(), "event2"); - mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2"); - mock2.expectedHeaderReceived(ce.attributes().source(), "CE2"); + mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2"); + mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2"); mock2.expectedBodiesReceived("test"); mock2.expectedMessageCount(1); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "event1"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID1"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "CE1"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event1"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1"); } ); context.createProducerTemplate().send( "direct:source", e -> { - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "event2"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID2"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "CE2"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event2"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2"); } ); @@ -862,11 +874,11 @@ public void configure() throws Exception { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock.expectedHeaderReceived(ce.attributes().type(), "myEvent"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -922,11 +934,11 @@ public void configure() throws Exception { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock.expectedHeaderReceived(ce.attributes().type(), "myEvent"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -980,12 +992,12 @@ public void configure() throws Exception { context.start(); MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); - mock.expectedHeaderReceived(ce.attributes().spec(), ce.version()); - mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event"); - mock.expectedHeaderReceived(ce.attributes().id(), "myEventID"); - mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version()); + mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID"); + mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); - mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time())); + mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id())); mock.expectedBodiesReceived("test"); mock.expectedMessageCount(1); @@ -993,11 +1005,11 @@ public void configure() throws Exception { "direct:source", e -> { e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain"); - e.getMessage().setHeader(ce.attributes().spec(), ce.version()); - e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event"); - e.getMessage().setHeader(ce.attributes().id(), "myEventID"); - e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); - e.getMessage().setHeader(ce.attributes().source(), "/somewhere"); + e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version()); + e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event"); + e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID"); + e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere"); e.getMessage().setBody("test"); } ); diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java index 3d268de00..277f33c19 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java @@ -32,8 +32,8 @@ public class KnativeConfiguration implements Cloneable { private String serviceName; @UriParam(defaultValue = "false") private boolean jsonSerializationEnabled; - @UriParam(defaultValue = "0.2", enums = "0.1,0.2") - private String cloudEventsSpecVersion = CloudEvents.V02.version(); + @UriParam(defaultValue = "0.3", enums = "0.1,0.2,0.3") + private String cloudEventsSpecVersion = CloudEvents.V03.version(); @UriParam(defaultValue = "org.apache.camel.event") private String cloudEventsType = "org.apache.camel.event"; @UriParam(prefix = "transport.") diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index 09ae6f76e..087286d0e 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -81,7 +81,7 @@ public Producer createProducer() throws Exception { .withTarget(producer) .bind(); - return new KnativeProducer(this, ceProcessor, ceConverter, producer); + return new KnativeProducer(this, ceProcessor, ceConverter, e -> e.getMessage().removeHeader("Host"), producer); } @Override @@ -159,7 +159,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp if (service.get().getType() == Knative.Type.event) { metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName); - metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().attributes().type(), serviceName); + metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute("type").id(), serviceName); } return new KnativeEnvironment.KnativeServiceDefinition( diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java similarity index 65% rename from camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java index f41c52cb9..5e0ca0d38 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java @@ -24,22 +24,17 @@ import java.util.Objects; import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.knative.KnativeEndpoint; import org.apache.camel.component.knative.spi.CloudEvent; -import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.Knative; import org.apache.camel.component.knative.spi.KnativeEnvironment; -import org.apache.commons.lang3.StringUtils; -import static org.apache.camel.util.ObjectHelper.ifNotEmpty; - -final class CloudEventV02Processor implements CloudEventProcessor { +abstract class AbstractCloudEventProcessor implements CloudEventProcessor { private final CloudEvent cloudEvent; - public CloudEventV02Processor() { - this.cloudEvent = CloudEvents.V02; + protected AbstractCloudEventProcessor(CloudEvent cloudEvent) { + this.cloudEvent = cloudEvent; } @Override @@ -47,35 +42,14 @@ public CloudEvent cloudEvent() { return cloudEvent; } - @Override - public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { - return exchange -> { - String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); - if (eventType == null) { - eventType = endpoint.getConfiguration().getCloudEventsType(); - } - - final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); - final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); - final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); - final Map headers = exchange.getIn().getHeaders(); - - headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId()); - headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri()); - headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version()); - headers.putIfAbsent(cloudEvent.attributes().type(), eventType); - headers.putIfAbsent(cloudEvent.attributes().time(), eventTime); - headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); - - // Always remove host so it's always computed from the URL and not inherited from the exchange - headers.remove("Host"); - }; - } - @SuppressWarnings("unchecked") @Override public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { return exchange -> { + if (Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_BATCH_CONTENT_MODE)) { + throw new UnsupportedOperationException("Batched CloudEvents are not yet supported"); + } + if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) { // // The event is not in the form of Structured Content Mode @@ -91,25 +65,34 @@ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe } try (InputStream is = exchange.getIn().getBody(InputStream.class)) { - final Message message = exchange.getIn(); - final Map ce = Knative.MAPPER.readValue(is, Map.class); + decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class)); + } + }; + } - ifNotEmpty(ce.remove("contenttype"), val -> message.setHeader(Exchange.CONTENT_TYPE, val)); - ifNotEmpty(ce.remove("data"), val -> message.setBody(val)); + protected abstract void decodeStructuredContent(Exchange exchange, Map content); - // - // Map extensions to standard camel headers - // - ifNotEmpty(ce.remove("extensions"), val -> { - if (val instanceof Map) { - ((Map) val).forEach(message::setHeader); - } - }); + @Override + public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { + final CloudEvent ce = cloudEvent(); - ce.forEach((key, val) -> { - message.setHeader("ce-" + StringUtils.lowerCase(key), val); - }); + return exchange -> { + String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + if (eventType == null) { + eventType = endpoint.getConfiguration().getCloudEventsType(); } + + final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); + final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); + final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); + final Map headers = exchange.getIn().getHeaders(); + + headers.putIfAbsent(ce.mandatoryAttribute("id").id(), exchange.getExchangeId()); + headers.putIfAbsent(ce.mandatoryAttribute("source").id(), endpoint.getEndpointUri()); + headers.putIfAbsent(ce.mandatoryAttribute("version").id(), ce.version()); + headers.putIfAbsent(ce.mandatoryAttribute("type").id(), eventType); + headers.putIfAbsent(ce.mandatoryAttribute("time").id(), eventTime); + headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); }; } } diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java index 75f165424..cf08243d5 100644 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java +++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java @@ -16,16 +16,108 @@ */ package org.apache.camel.component.knative.ce; +import java.util.Map; import java.util.Objects; +import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.knative.KnativeEndpoint; import org.apache.camel.component.knative.spi.CloudEvent; +import org.apache.camel.component.knative.spi.CloudEvents; import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.commons.lang3.StringUtils; + +import static org.apache.camel.util.ObjectHelper.ifNotEmpty; public enum CloudEventProcessors implements CloudEventProcessor { - V01(new CloudEventV01Processor()), - V02(new CloudEventV02Processor()); + V01(new AbstractCloudEventProcessor(CloudEvents.V01) { + @Override + protected void decodeStructuredContent(Exchange exchange, Map content) { + final CloudEvent ce = cloudEvent(); + final Message message = exchange.getIn(); + + // body + ifNotEmpty(content.remove("data"), message::setBody); + + ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> { + message.setHeader(Exchange.CONTENT_TYPE, val); + }); + + // + // Map extensions to standard camel headers + // + ifNotEmpty(content.remove("extensions"), val -> { + if (val instanceof Map) { + ((Map) val).forEach(message::setHeader); + } + }); + + for (CloudEvent.Attribute attribute: ce.attributes()) { + ifNotEmpty(content.remove(attribute.json()), val -> { + message.setHeader(attribute.id(), val); + }); + } + } + }), + V02(new AbstractCloudEventProcessor(CloudEvents.V02) { + @Override + protected void decodeStructuredContent(Exchange exchange, Map content) { + final CloudEvent ce = cloudEvent(); + final Message message = exchange.getIn(); + + // body + ifNotEmpty(content.remove("data"), message::setBody); + + ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> { + message.setHeader(Exchange.CONTENT_TYPE, val); + }); + + for (CloudEvent.Attribute attribute: ce.attributes()) { + ifNotEmpty(content.remove(attribute.json()), val -> { + message.setHeader(attribute.id(), val); + }); + } + + // + // Map every remaining field as it is (extensions). + // + content.forEach((key, val) -> { + message.setHeader(StringUtils.lowerCase(key), val); + }); + + } + }), + V03(new AbstractCloudEventProcessor(CloudEvents.V03) { + @Override + protected void decodeStructuredContent(Exchange exchange, Map content) { + final CloudEvent ce = cloudEvent(); + final Message message = exchange.getIn(); + + // body + ifNotEmpty(content.remove("data"), message::setBody); + + ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.type").json()), val -> { + message.setHeader(Exchange.CONTENT_TYPE, val); + }); + ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.encoding").json()), val -> { + message.setBody(val); + }); + + for (CloudEvent.Attribute attribute: ce.attributes()) { + ifNotEmpty(content.remove(attribute.json()), val -> { + message.setHeader(attribute.id(), val); + }); + } + + // + // Map every remaining field as it is (extensions). + // + content.forEach((key, val) -> { + message.setHeader(StringUtils.lowerCase(key), val); + }); + } + }); private final CloudEventProcessor instance; diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java deleted file mode 100644 index 39208357f..000000000 --- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.knative.ce; - -import java.io.InputStream; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import java.util.Objects; - -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.Processor; -import org.apache.camel.component.knative.KnativeEndpoint; -import org.apache.camel.component.knative.spi.CloudEvent; -import org.apache.camel.component.knative.spi.CloudEvents; -import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; -import org.apache.commons.lang3.StringUtils; - -import static org.apache.camel.util.ObjectHelper.ifNotEmpty; - -final class CloudEventV01Processor implements CloudEventProcessor { - private final CloudEvent cloudEvent; - - public CloudEventV01Processor() { - this.cloudEvent = CloudEvents.V01; - } - - @Override - public CloudEvent cloudEvent() { - return cloudEvent; - } - - @Override - public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { - return exchange -> { - String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); - if (eventType == null) { - eventType = endpoint.getConfiguration().getCloudEventsType(); - } - - final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); - final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); - final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); - final Map headers = exchange.getIn().getHeaders(); - - headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId()); - headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri()); - headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version()); - headers.putIfAbsent(cloudEvent.attributes().type(), eventType); - headers.putIfAbsent(cloudEvent.attributes().time(), eventTime); - headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); - - // Always remove host so it's always computed from the URL and not inherited from the exchange - headers.remove("Host"); - }; - } - - @SuppressWarnings("unchecked") - @Override - public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) { - return exchange -> { - if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) { - // - // The event is not in the form of Structured Content Mode - // then leave it as it is. - // - // Note that this is true for http binding only. - // - // More info: - // - // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode - // - return; - } - - try (InputStream is = exchange.getIn().getBody(InputStream.class)) { - final Message message = exchange.getIn(); - final Map ce = Knative.MAPPER.readValue(is, Map.class); - - ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val)); - ifNotEmpty(ce.remove("data"), val -> message.setBody(val)); - - // - // Map extensions to standard camel headers - // - ifNotEmpty(ce.remove("extensions"), val -> { - if (val instanceof Map) { - ((Map) val).forEach(message::setHeader); - } - }); - - ce.forEach((key, val) -> { - message.setHeader("CE-" + StringUtils.capitalize(key), val); - }); - } - }; - } -}