From d0522dfa33fdb49527469d7313bf70af71e1851b Mon Sep 17 00:00:00 2001 From: nferraro Date: Fri, 19 Apr 2019 11:47:47 +0200 Subject: [PATCH] Fix #48: add a default type to cloudevents --- .../knative/KnativeConfiguration.java | 21 +++++ .../component/knative/KnativeEndpoint.java | 4 + .../camel/component/knative/ce/V01.java | 5 +- .../camel/component/knative/ce/V02.java | 5 +- .../component/knative/CloudEventsV01Test.java | 92 ++++++++++++++++++- .../component/knative/CloudEventsV02Test.java | 92 ++++++++++++++++++- 6 files changed, 213 insertions(+), 6 deletions(-) diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java index 49564748a..c4c06ddc5 100644 --- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java +++ b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeConfiguration.java @@ -29,6 +29,9 @@ public class KnativeConfiguration implements Cloneable { @UriParam(defaultValue = "0.2", enums = "0.1,0.2") private String cloudEventsSpecVersion = "0.2"; + @UriParam(defaultValue = "org.apache.camel.event") + private String cloudEventsType = "org.apache.camel.event"; + public KnativeConfiguration() { } @@ -53,6 +56,10 @@ public boolean isJsonSerializationEnabled() { return jsonSerializationEnabled; } + + /** + * Enables automatic serialization to JSON of the produced events. + */ public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) { this.jsonSerializationEnabled = jsonSerializationEnabled; } @@ -61,10 +68,24 @@ public String getCloudEventsSpecVersion() { return cloudEventsSpecVersion; } + /** + * Set the version of the cloudevents spec. + */ public void setCloudEventsSpecVersion(String cloudEventsSpecVersion) { this.cloudEventsSpecVersion = cloudEventsSpecVersion; } + public String getCloudEventsType() { + return cloudEventsType; + } + + /** + * Set the event-type information of the produced events. + */ + public void setCloudEventsType(String cloudEventsType) { + this.cloudEventsType = cloudEventsType; + } + // ************************ // // Cloneable diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index cd84a8de8..412136b8d 100644 --- a/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -134,6 +134,10 @@ public String getName() { return name; } + public KnativeConfiguration getConfiguration() { + return configuration; + } + public KnativeEnvironment.KnativeServiceDefinition getService() { return service; } diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java b/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java index 4505d8ecf..1b369fa88 100644 --- a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java +++ b/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V01.java @@ -43,7 +43,10 @@ private V01() { String uri = endpoint.getEndpointUri(); return exchange -> { - final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + if (eventType == null) { + eventType = endpoint.getConfiguration().getCloudEventsType(); + } final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); diff --git a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java b/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java index 321c50298..5b124b6fe 100644 --- a/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java +++ b/camel-knative/src/main/java/org/apache/camel/component/knative/ce/V02.java @@ -43,7 +43,10 @@ private V02() { String uri = endpoint.getEndpointUri(); return exchange -> { - final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE); + if (eventType == null) { + eventType = endpoint.getConfiguration().getCloudEventsType(); + } final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE); final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault()); final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created); diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java index e4155d10b..f0c4d4aa3 100644 --- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java +++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV01Test.java @@ -73,7 +73,7 @@ void testInvokeEndpoint() throws Exception { port, KnativeSupport.mapOf( Knative.SERVICE_META_PATH, "/a/path", - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.custom-event", Knative.CONTENT_TYPE, "text/plain" )) )); @@ -98,7 +98,7 @@ public void configure() throws Exception { MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); mock.expectedMessageCount(1); mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); - mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.custom-event"); mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); @@ -115,6 +115,94 @@ public void configure() throws Exception { mock.assertIsSatisfied(); } + @Test + void testProduceDefaultEventType() throws Exception { + final int port1 = AvailablePortFinder.getNextAvailable(); + final int port2 = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port1, + KnativeSupport.mapOf( + Knative.SERVICE_META_PATH, "/", + Knative.CONTENT_TYPE, "text/plain" + )), + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint2", + "localhost", + port2, + KnativeSupport.mapOf( + Knative.SERVICE_META_PATH, "/", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:source") + .to("knative:endpoint/myEndpoint"); + + from("direct:source2") + .to("knative:endpoint/myEndpoint2?cloudEventsType=my.type"); + + fromF("netty4-http:http://localhost:%d/", port1) + .to("mock:ce"); + + fromF("netty4-http:http://localhost:%d/", port2) + .to("mock:ce2"); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event"); + mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); + mock.expectedBodiesReceived("test"); + + MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); + mock2.expectedMessageCount(1); + mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion()); + mock2.expectedHeaderReceived("CE-EventType", "my.type"); + mock2.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type"); + mock2.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime")); + mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID")); + mock2.expectedBodiesReceived("test2"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setBody("test"); + } + ); + context.createProducerTemplate().send( + "direct:source2", + e -> { + e.getIn().setBody("test2"); + } + ); + + mock.assertIsSatisfied(); + mock2.assertIsSatisfied(); + } + @Test void testConsumeStructuredContent() throws Exception { final int port = AvailablePortFinder.getNextAvailable(); diff --git a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java index 71faad411..e727dc273 100644 --- a/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java +++ b/camel-knative/src/test/java/org/apache/camel/component/knative/CloudEventsV02Test.java @@ -73,7 +73,7 @@ void testInvokeEndpoint() throws Exception { port, KnativeSupport.mapOf( Knative.SERVICE_META_PATH, "/a/path", - Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event", + Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.custom-event", Knative.CONTENT_TYPE, "text/plain" )) )); @@ -98,7 +98,7 @@ public void configure() throws Exception { MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); mock.expectedMessageCount(1); mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); - mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock.expectedHeaderReceived("ce-type", "org.apache.camel.custom-event"); mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint"); mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); @@ -115,6 +115,94 @@ public void configure() throws Exception { mock.assertIsSatisfied(); } + @Test + void testProduceDefaultEventType() throws Exception { + final int port1 = AvailablePortFinder.getNextAvailable(); + final int port2 = AvailablePortFinder.getNextAvailable(); + + KnativeEnvironment env = new KnativeEnvironment(Arrays.asList( + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint", + "localhost", + port1, + KnativeSupport.mapOf( + Knative.SERVICE_META_PATH, "/", + Knative.CONTENT_TYPE, "text/plain" + )), + new KnativeEnvironment.KnativeServiceDefinition( + Knative.Type.endpoint, + Knative.Protocol.http, + "myEndpoint2", + "localhost", + port2, + KnativeSupport.mapOf( + Knative.SERVICE_META_PATH, "/", + Knative.CONTENT_TYPE, "text/plain" + )) + )); + + KnativeComponent component = context.getComponent("knative", KnativeComponent.class); + component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion()); + component.setEnvironment(env); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:source") + .to("knative:endpoint/myEndpoint"); + + from("direct:source2") + .to("knative:endpoint/myEndpoint2?cloudEventsType=my.type"); + + fromF("netty4-http:http://localhost:%d/", port1) + .to("mock:ce"); + + fromF("netty4-http:http://localhost:%d/", port2) + .to("mock:ce2"); + } + }); + + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock.expectedHeaderReceived("ce-type", "org.apache.camel.event"); + mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint"); + mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id")); + mock.expectedBodiesReceived("test"); + + MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class); + mock2.expectedMessageCount(1); + mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion()); + mock2.expectedHeaderReceived("ce-type", "my.type"); + mock2.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type"); + mock2.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain"); + mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time")); + mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id")); + mock2.expectedBodiesReceived("test2"); + + context.createProducerTemplate().send( + "direct:source", + e -> { + e.getIn().setBody("test"); + } + ); + context.createProducerTemplate().send( + "direct:source2", + e -> { + e.getIn().setBody("test2"); + } + ); + + mock.assertIsSatisfied(); + mock2.assertIsSatisfied(); + } + @Test void testConsumeStructuredContent() throws Exception { final int port = AvailablePortFinder.getNextAvailable();