diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
index f9c5ad982..df174947e 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvent.java
@@ -16,15 +16,73 @@
  */
 package org.apache.camel.component.knative.spi;
 
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
 public interface CloudEvent {
+    /**
+     * The CloudEvent spec version.
+     */
     String version();
-    Attributes attributes();
 
-    interface Attributes {
+    /**
+     * List of supported attributes.
+     */
+    Collection<Attribute> attributes();
+
+    /**
+     * Find attribute by id.
+     */
+    default Optional<Attribute> attribute(String id) {
+        return attributes().stream()
+            .filter(a -> Objects.equals(id, a.id()))
+            .findFirst();
+    }
+
+    /**
+     * Mandatory find attribute by id.
+     */
+    default Attribute mandatoryAttribute(String id) {
+        return attributes().stream()
+            .filter(a -> Objects.equals(id, a.id()))
+            .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException("Unable to find attribute with id: " + id));
+    }
+
+    interface Attribute {
+        /**
+         * The ID of the attributes, can be used to look it up.
+         */
         String id();
-        String source();
-        String spec();
-        String type();
-        String time();
+
+        /**
+         * The name of the http header.
+         */
+        String http();
+
+        /**
+         * The name of the json field.
+         */
+        String json();
+
+        static Attribute simple(String id, String http, String json) {
+            return new Attribute() {
+                @Override
+                public String id() {
+                    return id;
+                }
+
+                @Override
+                public String http() {
+                    return http;
+                }
+
+                @Override
+                public String json() {
+                    return json;
+                }
+            };
+        }
     }
 }
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java
deleted file mode 100644
index 845104fcc..000000000
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV01.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.spi;
-
-final class CloudEventV01 implements CloudEvent {
-    public static final String VERSION = "0.1";
-    public static final Attributes ATTRIBUTES = new Attributes() {
-        @Override
-        public String id() {
-            return "CE-EventID";
-        }
-
-        @Override
-        public String source() {
-            return "CE-Source";
-        }
-
-        @Override
-        public String spec() {
-            return "CE-CloudEventsVersion";
-        }
-
-        @Override
-        public String type() {
-            return "CE-EventType";
-        }
-
-        @Override
-        public String time() {
-            return "CE-EventTime";
-        }
-    };
-
-    @Override
-    public String version() {
-        return VERSION;
-    }
-
-    @Override
-    public Attributes attributes() {
-        return ATTRIBUTES;
-    }
-}
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java
deleted file mode 100644
index 7b28dcca4..000000000
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEventV02.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.spi;
-
-final class CloudEventV02 implements CloudEvent {
-    public static final String VERSION = "0.2";
-    public static final Attributes ATTRIBUTES = new Attributes() {
-        @Override
-        public String id() {
-            return "ce-id";
-        }
-
-        @Override
-        public String source() {
-            return "ce-source";
-        }
-
-        @Override
-        public String spec() {
-            return "ce-specversion";
-        }
-
-        @Override
-        public String type() {
-            return "ce-type";
-        }
-
-        @Override
-        public String time() {
-            return "ce-time";
-        }
-    };
-
-    @Override
-    public String version() {
-        return VERSION;
-    }
-
-    @Override
-    public Attributes attributes() {
-        return ATTRIBUTES;
-    }
-}
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
index ff5542f20..7d7e7a990 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/CloudEvents.java
@@ -16,11 +16,60 @@
  */
 package org.apache.camel.component.knative.spi;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Objects;
 
 public enum CloudEvents implements CloudEvent {
-    V01(new CloudEventV01()),
-    V02(new CloudEventV02());
+    //
+    // V0.1 - https://github.com/cloudevents/spec/blob/v0.1/spec.md
+    //
+    V01(new CloudEventImpl(
+        "0.1",
+        Arrays.asList(
+            Attribute.simple("type", "CE-EventType", "eventType"),
+            Attribute.simple("type.version", "CE-EventTypeVersion", "eventTypeVersion"),
+            Attribute.simple("version", "CE-CloudEventsVersion", "cloudEventsVersion"),
+            Attribute.simple("source", "CE-Source", "source"),
+            Attribute.simple("id", "CE-EventID", "eventID"),
+            Attribute.simple("time", "CE-EventTime", "eventTime"),
+            Attribute.simple("schema.url", "CE-SchemaURL", "schemaURL"),
+            Attribute.simple("content.type", "ContentType", "contentType"),
+            Attribute.simple("extensions", "CE-Extensions", "extensions")
+        )
+    )),
+    //
+    // V0.2 - https://github.com/cloudevents/spec/blob/v0.2/spec.md
+    //
+    V02(new CloudEventImpl(
+        "0.2",
+        Arrays.asList(
+            Attribute.simple("type", "ce-type", "type"),
+            Attribute.simple("version", "ce-specversion", "specversion"),
+            Attribute.simple("source", "ce-source", "source"),
+            Attribute.simple("id", "ce-id", "id"),
+            Attribute.simple("time", "ce-time", "time"),
+            Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"),
+            Attribute.simple("content.type", "Content-Type", "contenttype")
+        )
+    )),
+    //
+    // V0.3 - https://github.com/cloudevents/spec/blob/v0.3/spec.md
+    //
+    V03(new CloudEventImpl(
+        "0.3",
+        Arrays.asList(
+            Attribute.simple("id", "ce-id", "id"),
+            Attribute.simple("source", "ce-source", "source"),
+            Attribute.simple("version", "ce-specversion", "specversion"),
+            Attribute.simple("type", "ce-type", "type"),
+            Attribute.simple("data.content.encoding", "ce-datacontentencoding", "datacontentencoding"),
+            Attribute.simple("data.content.type", "ce-datacontenttype", "datacontenttype"),
+            Attribute.simple("schema.url", "ce-schemaurl", "schemaurl"),
+            Attribute.simple("subject", "ce-subject", "subject"),
+            Attribute.simple("time", "ce-time", "time")
+        )
+    ));
 
     private final CloudEvent instance;
 
@@ -34,7 +83,7 @@ public String version() {
     }
 
     @Override
-    public Attributes attributes() {
+    public Collection<Attribute> attributes() {
         return instance.attributes();
     }
 
@@ -47,5 +96,25 @@ public static CloudEvent fromSpecVersion(String version) {
 
         throw new IllegalArgumentException("Unable to find an implementation fo CloudEvents spec: " + version);
     }
+
+    private static class CloudEventImpl implements CloudEvent {
+        private final String version;
+        private final Collection<Attribute> attributes;
+
+        public CloudEventImpl(String version, Collection<Attribute> attributes) {
+            this.version = version;
+            this.attributes = attributes;
+        }
+
+        @Override
+        public String version() {
+            return version;
+        }
+
+        @Override
+        public Collection<Attribute> attributes() {
+            return attributes;
+        }
+    }
 }
 
diff --git a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
index 75aa80e8c..65c239368 100644
--- a/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
+++ b/camel-knative/camel-knative-api/src/main/java/org/apache/camel/component/knative/spi/Knative.java
@@ -30,6 +30,7 @@ public final class Knative {
     public static final String KNATIVE_API_VERSION = "knative.apiVersion";
     public static final String CONTENT_TYPE = "content.type";
     public static final String MIME_STRUCTURED_CONTENT_MODE = "application/cloudevents+json";
+    public static final String MIME_BATCH_CONTENT_MODE = "application/cloudevents-batch+json";
     public static final String CAMEL_ENDPOINT_KIND = "camel.endpoint.kind";
 
     public static final String SERVICE_META_HOST = "service.host";
diff --git a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
index 9898f22e3..e13acafa6 100644
--- a/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
+++ b/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java
@@ -91,7 +91,8 @@ void testCreateComponent() {
     private static Stream<Arguments> provideCloudEventsImplementations() {
         return Stream.of(
             Arguments.of(CloudEvents.V01),
-            Arguments.of(CloudEvents.V02)
+            Arguments.of(CloudEvents.V02),
+            Arguments.of(CloudEvents.V03)
         );
     }
 
@@ -129,12 +130,12 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.attributes().source(), "knative://endpoint/myEndpoint");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "knative://endpoint/myEndpoint");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -176,12 +177,12 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.attributes().id(), "myEventID");
-        mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -190,7 +191,7 @@ public void configure() throws Exception {
             e -> {
                 e.getMessage().setHeader(Exchange.CONTENT_TYPE, Knative.MIME_STRUCTURED_CONTENT_MODE);
 
-                if (Objects.equals(ce.version(), ce.version())) {
+                if (Objects.equals(CloudEvents.V01.version(), ce.version())) {
                     e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
                         "cloudEventsVersion", ce.version(),
                         "eventType", "org.apache.camel.event",
@@ -200,8 +201,7 @@ public void configure() throws Exception {
                         "contentType", "text/plain",
                         "data", "test"
                     )));
-                }
-                if (Objects.equals(CloudEvents.V02.version(), ce.version())) {
+                } else if (Objects.equals(CloudEvents.V02.version(), ce.version())) {
                     e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
                         "specversion", ce.version(),
                         "type", "org.apache.camel.event",
@@ -211,6 +211,18 @@ public void configure() throws Exception {
                         "contenttype", "text/plain",
                         "data", "test"
                     )));
+                } else if (Objects.equals(CloudEvents.V03.version(), ce.version())) {
+                    e.getMessage().setBody(new ObjectMapper().writeValueAsString(KnativeSupport.mapOf(
+                        "specversion", ce.version(),
+                        "type", "org.apache.camel.event",
+                        "id", "myEventID",
+                        "time", DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()),
+                        "source", "/somewhere",
+                        "datacontenttype", "text/plain",
+                        "data", "test"
+                    )));
+                } else {
+                    throw new IllegalArgumentException("Unknown CloudEvent spec: " + ce.version());
                 }
             }
         );
@@ -252,12 +264,12 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.attributes().id(), "myEventID");
-        mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -265,11 +277,11 @@ public void configure() throws Exception {
             "direct:source",
             e -> {
                 e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "/somewhere");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere");
                 e.getMessage().setBody("test");
             }
         );
@@ -289,7 +301,7 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE1"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE1"
                 )),
             KnativeEnvironment.endpoint(
                 Knative.EndpointKind.source,
@@ -299,7 +311,7 @@ void testConsumeContentWithFilter(CloudEvent ce) throws Exception {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE2"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE2"
                 ))
         );
 
@@ -331,41 +343,41 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock1.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1");
-        mock1.expectedHeaderReceived(ce.attributes().source(), "CE1");
+        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock2.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2");
-        mock2.expectedHeaderReceived(ce.attributes().source(), "CE2");
+        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID1");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "CE1");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1");
             }
         );
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID2");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "CE2");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2");
             }
         );
 
@@ -385,7 +397,7 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE[01234]"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[01234]"
                 )),
             KnativeEnvironment.endpoint(
                 Knative.EndpointKind.source,
@@ -395,7 +407,7 @@ void testConsumeContentWithRegExFilter(CloudEvent ce) throws Exception {
                 KnativeSupport.mapOf(
                     Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
                     Knative.CONTENT_TYPE, "text/plain",
-                    Knative.KNATIVE_FILTER_PREFIX + ce.attributes().source(), "CE[56789]"
+                    Knative.KNATIVE_FILTER_PREFIX + ce.mandatoryAttribute("source").id(), "CE[56789]"
                 ))
         );
 
@@ -427,41 +439,41 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock1.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1");
-        mock1.expectedHeaderReceived(ce.attributes().source(), "CE0");
+        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE0");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock2.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2");
-        mock2.expectedHeaderReceived(ce.attributes().source(), "CE5");
+        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE5");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID1");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "CE0");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE0");
             }
         );
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID2");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "CE5");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE5");
             }
         );
 
@@ -508,41 +520,41 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock1 = context.getEndpoint("mock:ce1", MockEndpoint.class);
-        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock1.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock1.expectedHeaderReceived(ce.attributes().type(), "event1");
-        mock1.expectedHeaderReceived(ce.attributes().id(), "myEventID1");
-        mock1.expectedHeaderReceived(ce.attributes().source(), "CE1");
+        mock1.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID1");
+        mock1.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE1");
         mock1.expectedBodiesReceived("test");
         mock1.expectedMessageCount(1);
 
         MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
-        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock2.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock2.expectedHeaderReceived(ce.attributes().type(), "event2");
-        mock2.expectedHeaderReceived(ce.attributes().id(), "myEventID2");
-        mock2.expectedHeaderReceived(ce.attributes().source(), "CE2");
+        mock2.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "event2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID2");
+        mock2.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "CE2");
         mock2.expectedBodiesReceived("test");
         mock2.expectedMessageCount(1);
 
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "event1");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID1");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "CE1");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event1");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID1");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE1");
             }
         );
         context.createProducerTemplate().send(
             "direct:source",
             e -> {
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "event2");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID2");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "CE2");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "event2");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID2");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "CE2");
             }
         );
 
@@ -862,11 +874,11 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock.expectedHeaderReceived(ce.attributes().type(), "myEvent");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -922,11 +934,11 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock.expectedHeaderReceived(ce.attributes().type(), "myEvent");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "myEvent");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("id").id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -980,12 +992,12 @@ public void configure() throws Exception {
         context.start();
 
         MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
-        mock.expectedHeaderReceived(ce.attributes().spec(), ce.version());
-        mock.expectedHeaderReceived(ce.attributes().type(), "org.apache.camel.event");
-        mock.expectedHeaderReceived(ce.attributes().id(), "myEventID");
-        mock.expectedHeaderReceived(ce.attributes().source(), "/somewhere");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("version").id(), ce.version());
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("id").id(), "myEventID");
+        mock.expectedHeaderReceived(ce.mandatoryAttribute("source").id(), "/somewhere");
         mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
-        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.attributes().time()));
+        mock.expectedMessagesMatches(e -> e.getMessage().getHeaders().containsKey(ce.mandatoryAttribute("time").id()));
         mock.expectedBodiesReceived("test");
         mock.expectedMessageCount(1);
 
@@ -993,11 +1005,11 @@ public void configure() throws Exception {
             "direct:source",
             e -> {
                 e.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
-                e.getMessage().setHeader(ce.attributes().spec(), ce.version());
-                e.getMessage().setHeader(ce.attributes().type(), "org.apache.camel.event");
-                e.getMessage().setHeader(ce.attributes().id(), "myEventID");
-                e.getMessage().setHeader(ce.attributes().time(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
-                e.getMessage().setHeader(ce.attributes().source(), "/somewhere");
+                e.getMessage().setHeader(ce.mandatoryAttribute("version").id(), ce.version());
+                e.getMessage().setHeader(ce.mandatoryAttribute("type").id(), "org.apache.camel.event");
+                e.getMessage().setHeader(ce.mandatoryAttribute("id").id(), "myEventID");
+                e.getMessage().setHeader(ce.mandatoryAttribute("time").id(), DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now()));
+                e.getMessage().setHeader(ce.mandatoryAttribute("source").id(), "/somewhere");
                 e.getMessage().setBody("test");
             }
         );
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
index 3d268de00..277f33c19 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java
@@ -32,8 +32,8 @@ public class KnativeConfiguration implements Cloneable {
     private String serviceName;
     @UriParam(defaultValue = "false")
     private boolean jsonSerializationEnabled;
-    @UriParam(defaultValue = "0.2", enums = "0.1,0.2")
-    private String cloudEventsSpecVersion = CloudEvents.V02.version();
+    @UriParam(defaultValue = "0.3", enums = "0.1,0.2,0.3")
+    private String cloudEventsSpecVersion = CloudEvents.V03.version();
     @UriParam(defaultValue = "org.apache.camel.event")
     private String cloudEventsType = "org.apache.camel.event";
     @UriParam(prefix = "transport.")
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index 09ae6f76e..087286d0e 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -81,7 +81,7 @@ public Producer createProducer() throws Exception {
             .withTarget(producer)
             .bind();
 
-        return new KnativeProducer(this, ceProcessor, ceConverter, producer);
+        return new KnativeProducer(this, ceProcessor, ceConverter, e -> e.getMessage().removeHeader("Host"), producer);
     }
 
     @Override
@@ -159,7 +159,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp
 
         if (service.get().getType() == Knative.Type.event) {
             metadata.put(Knative.KNATIVE_EVENT_TYPE, serviceName);
-            metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().attributes().type(), serviceName);
+            metadata.put(Knative.KNATIVE_FILTER_PREFIX + cloudEvent.cloudEvent().mandatoryAttribute("type").id(), serviceName);
         }
 
         return new KnativeEnvironment.KnativeServiceDefinition(
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
similarity index 65%
rename from camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java
rename to camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
index f41c52cb9..5e0ca0d38 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV02Processor.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/AbstractCloudEventProcessor.java
@@ -24,22 +24,17 @@
 import java.util.Objects;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.knative.KnativeEndpoint;
 import org.apache.camel.component.knative.spi.CloudEvent;
-import org.apache.camel.component.knative.spi.CloudEvents;
 import org.apache.camel.component.knative.spi.Knative;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
-import org.apache.commons.lang3.StringUtils;
 
-import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
-
-final class CloudEventV02Processor implements CloudEventProcessor {
+abstract class AbstractCloudEventProcessor implements CloudEventProcessor {
     private final CloudEvent cloudEvent;
 
-    public CloudEventV02Processor() {
-        this.cloudEvent = CloudEvents.V02;
+    protected AbstractCloudEventProcessor(CloudEvent cloudEvent) {
+        this.cloudEvent = cloudEvent;
     }
 
     @Override
@@ -47,35 +42,14 @@ public CloudEvent cloudEvent() {
         return cloudEvent;
     }
 
-    @Override
-    public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
-        return exchange -> {
-            String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
-            if (eventType == null) {
-                eventType = endpoint.getConfiguration().getCloudEventsType();
-            }
-
-            final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
-            final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
-            final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
-            final Map<String, Object> headers = exchange.getIn().getHeaders();
-
-            headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId());
-            headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri());
-            headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version());
-            headers.putIfAbsent(cloudEvent.attributes().type(), eventType);
-            headers.putIfAbsent(cloudEvent.attributes().time(), eventTime);
-            headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
-
-            // Always remove host so it's always computed from the URL and not inherited from the exchange
-            headers.remove("Host");
-        };
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
         return exchange -> {
+            if (Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_BATCH_CONTENT_MODE)) {
+                throw new UnsupportedOperationException("Batched CloudEvents are not yet supported");
+            }
+
             if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) {
                 //
                 // The event is not in the form of Structured Content Mode
@@ -91,25 +65,34 @@ public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe
             }
 
             try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
-                final Message message = exchange.getIn();
-                final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
+                decodeStructuredContent(exchange, Knative.MAPPER.readValue(is, Map.class));
+            }
+        };
+    }
 
-                ifNotEmpty(ce.remove("contenttype"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
-                ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
+    protected abstract void decodeStructuredContent(Exchange exchange, Map<String, Object> content);
 
-                //
-                // Map extensions to standard camel headers
-                //
-                ifNotEmpty(ce.remove("extensions"), val -> {
-                    if (val instanceof Map) {
-                        ((Map<String, Object>) val).forEach(message::setHeader);
-                    }
-                });
+    @Override
+    public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
+        final CloudEvent ce = cloudEvent();
 
-                ce.forEach((key, val) -> {
-                    message.setHeader("ce-" + StringUtils.lowerCase(key), val);
-                });
+        return exchange -> {
+            String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
+            if (eventType == null) {
+                eventType = endpoint.getConfiguration().getCloudEventsType();
             }
+
+            final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
+            final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
+            final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
+            final Map<String, Object> headers = exchange.getIn().getHeaders();
+
+            headers.putIfAbsent(ce.mandatoryAttribute("id").id(), exchange.getExchangeId());
+            headers.putIfAbsent(ce.mandatoryAttribute("source").id(), endpoint.getEndpointUri());
+            headers.putIfAbsent(ce.mandatoryAttribute("version").id(), ce.version());
+            headers.putIfAbsent(ce.mandatoryAttribute("type").id(), eventType);
+            headers.putIfAbsent(ce.mandatoryAttribute("time").id(), eventTime);
+            headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
         };
     }
 }
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
index 75f165424..cf08243d5 100644
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
+++ b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventProcessors.java
@@ -16,16 +16,108 @@
  */
 package org.apache.camel.component.knative.ce;
 
+import java.util.Map;
 import java.util.Objects;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.knative.KnativeEndpoint;
 import org.apache.camel.component.knative.spi.CloudEvent;
+import org.apache.camel.component.knative.spi.CloudEvents;
 import org.apache.camel.component.knative.spi.KnativeEnvironment;
+import org.apache.commons.lang3.StringUtils;
+
+import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
 
 public enum CloudEventProcessors implements CloudEventProcessor {
-    V01(new CloudEventV01Processor()),
-    V02(new CloudEventV02Processor());
+    V01(new AbstractCloudEventProcessor(CloudEvents.V01) {
+        @Override
+        protected void decodeStructuredContent(Exchange exchange, Map<String, Object> content) {
+            final CloudEvent ce = cloudEvent();
+            final Message message = exchange.getIn();
+
+            // body
+            ifNotEmpty(content.remove("data"), message::setBody);
+
+            ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> {
+                message.setHeader(Exchange.CONTENT_TYPE, val);
+            });
+
+            //
+            // Map extensions to standard camel headers
+            //
+            ifNotEmpty(content.remove("extensions"), val -> {
+                if (val instanceof Map) {
+                    ((Map<String, Object>) val).forEach(message::setHeader);
+                }
+            });
+
+            for (CloudEvent.Attribute attribute: ce.attributes()) {
+                ifNotEmpty(content.remove(attribute.json()), val -> {
+                    message.setHeader(attribute.id(), val);
+                });
+            }
+        }
+    }),
+    V02(new AbstractCloudEventProcessor(CloudEvents.V02) {
+        @Override
+        protected void decodeStructuredContent(Exchange exchange, Map<String, Object> content) {
+            final CloudEvent ce = cloudEvent();
+            final Message message = exchange.getIn();
+
+            // body
+            ifNotEmpty(content.remove("data"), message::setBody);
+
+            ifNotEmpty(content.remove(ce.mandatoryAttribute("content.type").json()), val -> {
+                message.setHeader(Exchange.CONTENT_TYPE, val);
+            });
+
+            for (CloudEvent.Attribute attribute: ce.attributes()) {
+                ifNotEmpty(content.remove(attribute.json()), val -> {
+                    message.setHeader(attribute.id(), val);
+                });
+            }
+
+            //
+            // Map every remaining field as it is (extensions).
+            //
+            content.forEach((key, val) -> {
+                message.setHeader(StringUtils.lowerCase(key), val);
+            });
+
+        }
+    }),
+    V03(new AbstractCloudEventProcessor(CloudEvents.V03) {
+        @Override
+        protected void decodeStructuredContent(Exchange exchange, Map<String, Object> content) {
+            final CloudEvent ce = cloudEvent();
+            final Message message = exchange.getIn();
+
+            // body
+            ifNotEmpty(content.remove("data"), message::setBody);
+
+            ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.type").json()), val -> {
+                message.setHeader(Exchange.CONTENT_TYPE, val);
+            });
+            ifNotEmpty(content.remove(ce.mandatoryAttribute("data.content.encoding").json()), val -> {
+                message.setBody(val);
+            });
+
+            for (CloudEvent.Attribute attribute: ce.attributes()) {
+                ifNotEmpty(content.remove(attribute.json()), val -> {
+                    message.setHeader(attribute.id(), val);
+                });
+            }
+
+            //
+            // Map every remaining field as it is (extensions).
+            //
+            content.forEach((key, val) -> {
+                message.setHeader(StringUtils.lowerCase(key), val);
+            });
+        }
+    });
 
     private final CloudEventProcessor instance;
 
diff --git a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java b/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java
deleted file mode 100644
index 39208357f..000000000
--- a/camel-knative/camel-knative/src/main/java/org/apache/camel/component/knative/ce/CloudEventV01Processor.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.knative.ce;
-
-import java.io.InputStream;
-import java.time.ZoneId;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.component.knative.KnativeEndpoint;
-import org.apache.camel.component.knative.spi.CloudEvent;
-import org.apache.camel.component.knative.spi.CloudEvents;
-import org.apache.camel.component.knative.spi.Knative;
-import org.apache.camel.component.knative.spi.KnativeEnvironment;
-import org.apache.commons.lang3.StringUtils;
-
-import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
-
-final class CloudEventV01Processor implements CloudEventProcessor {
-    private final CloudEvent cloudEvent;
-
-    public CloudEventV01Processor() {
-        this.cloudEvent = CloudEvents.V01;
-    }
-
-    @Override
-    public CloudEvent cloudEvent() {
-        return cloudEvent;
-    }
-
-    @Override
-    public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
-        return exchange -> {
-            String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
-            if (eventType == null) {
-                eventType = endpoint.getConfiguration().getCloudEventsType();
-            }
-
-            final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
-            final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
-            final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
-            final Map<String, Object> headers = exchange.getIn().getHeaders();
-
-            headers.putIfAbsent(cloudEvent.attributes().id(), exchange.getExchangeId());
-            headers.putIfAbsent(cloudEvent.attributes().source(), endpoint.getEndpointUri());
-            headers.putIfAbsent(cloudEvent.attributes().spec(), cloudEvent.version());
-            headers.putIfAbsent(cloudEvent.attributes().type(), eventType);
-            headers.putIfAbsent(cloudEvent.attributes().time(), eventTime);
-            headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
-
-            // Always remove host so it's always computed from the URL and not inherited from the exchange
-            headers.remove("Host");
-        };
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Processor consumer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeServiceDefinition service) {
-        return exchange -> {
-            if (!Objects.equals(exchange.getIn().getHeader(Exchange.CONTENT_TYPE), Knative.MIME_STRUCTURED_CONTENT_MODE)) {
-                //
-                // The event is not in the form of Structured Content Mode
-                // then leave it as it is.
-                //
-                // Note that this is true for http binding only.
-                //
-                // More info:
-                //
-                //   https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#32-structured-content-mode
-                //
-                return;
-            }
-
-            try (InputStream is = exchange.getIn().getBody(InputStream.class)) {
-                final Message message = exchange.getIn();
-                final Map<String, Object> ce = Knative.MAPPER.readValue(is, Map.class);
-
-                ifNotEmpty(ce.remove("contentType"), val -> message.setHeader(Exchange.CONTENT_TYPE, val));
-                ifNotEmpty(ce.remove("data"), val -> message.setBody(val));
-
-                //
-                // Map extensions to standard camel headers
-                //
-                ifNotEmpty(ce.remove("extensions"), val -> {
-                    if (val instanceof Map) {
-                        ((Map<String, Object>) val).forEach(message::setHeader);
-                    }
-                });
-
-                ce.forEach((key, val) -> {
-                    message.setHeader("CE-" + StringUtils.capitalize(key), val);
-                });
-            }
-        };
-    }
-}