Skip to content

Commit

Permalink
Fix apache#48: add a default type to cloudevents
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Apr 19, 2019
1 parent 373cda1 commit d0522df
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class KnativeConfiguration implements Cloneable {
@UriParam(defaultValue = "0.2", enums = "0.1,0.2")
private String cloudEventsSpecVersion = "0.2";

@UriParam(defaultValue = "org.apache.camel.event")
private String cloudEventsType = "org.apache.camel.event";

public KnativeConfiguration() {
}

Expand All @@ -53,6 +56,10 @@ public boolean isJsonSerializationEnabled() {
return jsonSerializationEnabled;
}


/**
* Enables automatic serialization to JSON of the produced events.
*/
public void setJsonSerializationEnabled(boolean jsonSerializationEnabled) {
this.jsonSerializationEnabled = jsonSerializationEnabled;
}
Expand All @@ -61,10 +68,24 @@ public String getCloudEventsSpecVersion() {
return cloudEventsSpecVersion;
}

/**
* Set the version of the cloudevents spec.
*/
public void setCloudEventsSpecVersion(String cloudEventsSpecVersion) {
this.cloudEventsSpecVersion = cloudEventsSpecVersion;
}

public String getCloudEventsType() {
return cloudEventsType;
}

/**
* Set the event-type information of the produced events.
*/
public void setCloudEventsType(String cloudEventsType) {
this.cloudEventsType = cloudEventsType;
}

// ************************
//
// Cloneable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public String getName() {
return name;
}

public KnativeConfiguration getConfiguration() {
return configuration;
}

public KnativeEnvironment.KnativeServiceDefinition getService() {
return service;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ private V01() {
String uri = endpoint.getEndpointUri();

return exchange -> {
final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
if (eventType == null) {
eventType = endpoint.getConfiguration().getCloudEventsType();
}
final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ private V02() {
String uri = endpoint.getEndpointUri();

return exchange -> {
final String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
if (eventType == null) {
eventType = endpoint.getConfiguration().getCloudEventsType();
}
final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void testInvokeEndpoint() throws Exception {
port,
KnativeSupport.mapOf(
Knative.SERVICE_META_PATH, "/a/path",
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.custom-event",
Knative.CONTENT_TYPE, "text/plain"
))
));
Expand All @@ -98,7 +98,7 @@ public void configure() throws Exception {
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.custom-event");
mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
Expand All @@ -115,6 +115,94 @@ public void configure() throws Exception {
mock.assertIsSatisfied();
}

@Test
void testProduceDefaultEventType() throws Exception {
final int port1 = AvailablePortFinder.getNextAvailable();
final int port2 = AvailablePortFinder.getNextAvailable();

KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
new KnativeEnvironment.KnativeServiceDefinition(
Knative.Type.endpoint,
Knative.Protocol.http,
"myEndpoint",
"localhost",
port1,
KnativeSupport.mapOf(
Knative.SERVICE_META_PATH, "/",
Knative.CONTENT_TYPE, "text/plain"
)),
new KnativeEnvironment.KnativeServiceDefinition(
Knative.Type.endpoint,
Knative.Protocol.http,
"myEndpoint2",
"localhost",
port2,
KnativeSupport.mapOf(
Knative.SERVICE_META_PATH, "/",
Knative.CONTENT_TYPE, "text/plain"
))
));

KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
component.setCloudEventsSpecVersion(CloudEventsProcessors.v01.getVersion());
component.setEnvironment(env);

context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:source")
.to("knative:endpoint/myEndpoint");

from("direct:source2")
.to("knative:endpoint/myEndpoint2?cloudEventsType=my.type");

fromF("netty4-http:http://localhost:%d/", port1)
.to("mock:ce");

fromF("netty4-http:http://localhost:%d/", port2)
.to("mock:ce2");
}
});

context.start();

MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock.expectedHeaderReceived("CE-EventType", "org.apache.camel.event");
mock.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
mock.expectedBodiesReceived("test");

MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
mock2.expectedMessageCount(1);
mock2.expectedHeaderReceived("CE-CloudEventsVersion", CloudEventsProcessors.v01.getVersion());
mock2.expectedHeaderReceived("CE-EventType", "my.type");
mock2.expectedHeaderReceived("CE-Source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
mock2.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventTime"));
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("CE-EventID"));
mock2.expectedBodiesReceived("test2");

context.createProducerTemplate().send(
"direct:source",
e -> {
e.getIn().setBody("test");
}
);
context.createProducerTemplate().send(
"direct:source2",
e -> {
e.getIn().setBody("test2");
}
);

mock.assertIsSatisfied();
mock2.assertIsSatisfied();
}

@Test
void testConsumeStructuredContent() throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void testInvokeEndpoint() throws Exception {
port,
KnativeSupport.mapOf(
Knative.SERVICE_META_PATH, "/a/path",
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.custom-event",
Knative.CONTENT_TYPE, "text/plain"
))
));
Expand All @@ -98,7 +98,7 @@ public void configure() throws Exception {
MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock.expectedHeaderReceived("ce-type", "org.apache.camel.custom-event");
mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
Expand All @@ -115,6 +115,94 @@ public void configure() throws Exception {
mock.assertIsSatisfied();
}

@Test
void testProduceDefaultEventType() throws Exception {
final int port1 = AvailablePortFinder.getNextAvailable();
final int port2 = AvailablePortFinder.getNextAvailable();

KnativeEnvironment env = new KnativeEnvironment(Arrays.asList(
new KnativeEnvironment.KnativeServiceDefinition(
Knative.Type.endpoint,
Knative.Protocol.http,
"myEndpoint",
"localhost",
port1,
KnativeSupport.mapOf(
Knative.SERVICE_META_PATH, "/",
Knative.CONTENT_TYPE, "text/plain"
)),
new KnativeEnvironment.KnativeServiceDefinition(
Knative.Type.endpoint,
Knative.Protocol.http,
"myEndpoint2",
"localhost",
port2,
KnativeSupport.mapOf(
Knative.SERVICE_META_PATH, "/",
Knative.CONTENT_TYPE, "text/plain"
))
));

KnativeComponent component = context.getComponent("knative", KnativeComponent.class);
component.setCloudEventsSpecVersion(CloudEventsProcessors.v02.getVersion());
component.setEnvironment(env);

context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:source")
.to("knative:endpoint/myEndpoint");

from("direct:source2")
.to("knative:endpoint/myEndpoint2?cloudEventsType=my.type");

fromF("netty4-http:http://localhost:%d/", port1)
.to("mock:ce");

fromF("netty4-http:http://localhost:%d/", port2)
.to("mock:ce2");
}
});

context.start();

MockEndpoint mock = context.getEndpoint("mock:ce", MockEndpoint.class);
mock.expectedMessageCount(1);
mock.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock.expectedHeaderReceived("ce-type", "org.apache.camel.event");
mock.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint");
mock.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
mock.expectedBodiesReceived("test");

MockEndpoint mock2 = context.getEndpoint("mock:ce2", MockEndpoint.class);
mock2.expectedMessageCount(1);
mock2.expectedHeaderReceived("ce-specversion", CloudEventsProcessors.v02.getVersion());
mock2.expectedHeaderReceived("ce-type", "my.type");
mock2.expectedHeaderReceived("ce-source", "knative://endpoint/myEndpoint2?cloudEventsType=my.type");
mock2.expectedHeaderReceived(Exchange.CONTENT_TYPE, "text/plain");
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-time"));
mock2.expectedMessagesMatches(e -> e.getIn().getHeaders().containsKey("ce-id"));
mock2.expectedBodiesReceived("test2");

context.createProducerTemplate().send(
"direct:source",
e -> {
e.getIn().setBody("test");
}
);
context.createProducerTemplate().send(
"direct:source2",
e -> {
e.getIn().setBody("test2");
}
);

mock.assertIsSatisfied();
mock2.assertIsSatisfied();
}

@Test
void testConsumeStructuredContent() throws Exception {
final int port = AvailablePortFinder.getNextAvailable();
Expand Down

0 comments on commit d0522df

Please sign in to comment.