Skip to content

Commit

Permalink
CamelCloudEventXXX not converted to CE header apache#177
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 27, 2019
1 parent 2ac19c4 commit 5b60f45
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum CloudEvents implements CloudEvent {
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_ID, "CE-EventID", "eventID"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_TIME, "CE-EventTime", "eventTime"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_SCHEMA_URL, "CE-SchemaURL", "schemaURL"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "ContentType", "contentType"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_CONTENT_TYPE, "Content-Type", "contentType"),
Attribute.simple(CloudEvent.CAMEL_CLOUD_EVENT_EXTENSIONS, "CE-Extensions", "extensions")
)
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,8 +918,7 @@ void testEventsWithTypeAndVersion(CloudEvent ce) throws Exception {
public void configure() throws Exception {
from("direct:source")
.to("knative:event/myEvent?kind=MyObject&apiVersion=v1");

fromF("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
from("knative:event/myEvent?kind=MyOtherObject&apiVersion=v2")
.to("mock:ce");
}
});
Expand Down Expand Up @@ -1370,8 +1369,6 @@ void testHeadersOverrideFromURI(CloudEvent ce) throws Exception {
}
}



@ParameterizedTest
@MethodSource("provideCloudEventsImplementations")
void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
Expand Down Expand Up @@ -1434,5 +1431,114 @@ void testHeadersOverrideFromConf(CloudEvent ce) throws Exception {
server.stop();
}
}

@ParameterizedTest
@MethodSource("provideCloudEventsImplementations")
void testHeadersOverrideFromRouteWithCamelHeader(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
"localhost",
port,
KnativeSupport.mapOf(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();

Undertow server = Undertow.builder()
.addHttpListener(port, "localhost")
.setHandler(se -> {
exchange.set(se);
latch.countDown();
})
.build();

RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("myType")
.to("knative:endpoint/ep");
});

context.start();
try {
server.start();
template.sendBody("direct:start", "");

latch.await();

HeaderMap headers = exchange.get().getRequestHeaders();

assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("myType");
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}

@ParameterizedTest
@MethodSource("provideCloudEventsImplementations")
void testHeadersOverrideFromRouteWithCEHeader(CloudEvent ce) throws Exception {
configureKnativeComponent(
context,
ce,
endpoint(
Knative.EndpointKind.sink,
"ep",
"localhost",
port,
KnativeSupport.mapOf(
Knative.KNATIVE_EVENT_TYPE, "org.apache.camel.event",
Knative.CONTENT_TYPE, "text/plain"
)
)
);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<HttpServerExchange> exchange = new AtomicReference<>();

Undertow server = Undertow.builder()
.addHttpListener(port, "localhost")
.setHandler(se -> {
exchange.set(se);
latch.countDown();
})
.build();

RouteBuilder.addRoutes(context, b -> {
b.from("direct:start")
.setHeader(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http()).constant("fromCEHeader")
.setHeader(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).constant("fromCamelHeader")
.to("knative:endpoint/ep");
});

context.start();
try {
server.start();
template.sendBody("direct:start", "");

latch.await();

HeaderMap headers = exchange.get().getRequestHeaders();

assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http())).isEqualTo(ce.version());
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http())).isEqualTo("fromCEHeader");
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http())).isNotNull();
assertThat(headers.getFirst(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http())).isEqualTo("knative://endpoint/ep");
assertThat(headers.getFirst(Exchange.CONTENT_TYPE)).isEqualTo("text/plain");
} finally {
server.stop();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<!--
<AppenderRef ref="STDOUT"/>
-->
<AppenderRef ref="NONE"/>
<AppenderRef/>
</Root>
</Loggers>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
Expand Down Expand Up @@ -75,23 +76,31 @@ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe
final CloudEvent ce = cloudEvent();

return exchange -> {
String eventType = service.getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
if (eventType == null) {
eventType = endpoint.getConfiguration().getCloudEventsType();
final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
final Map<String, Object> headers = exchange.getMessage().getHeaders();

for (CloudEvent.Attribute attribute: ce.attributes()) {
Object value = headers.get(attribute.id());
if (value != null) {
headers.putIfAbsent(attribute.http(), value);
}
}

final String contentType = service.getMetadata().get(Knative.CONTENT_TYPE);
final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);
final Map<String, Object> headers = exchange.getIn().getHeaders();

headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_ID).http(), exchange.getExchangeId());
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_SOURCE).http(), endpoint.getEndpointUri());
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_VERSION).http(), ce.version());
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(), eventType);
headers.putIfAbsent(ce.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TIME).http(), eventTime);
headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);

setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_ID, exchange::getExchangeId);
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_SOURCE, endpoint::getEndpointUri);
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_VERSION, ce::version);
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TYPE, () -> {
return service.getMetadata().getOrDefault(Knative.KNATIVE_EVENT_TYPE, endpoint.getConfiguration().getCloudEventsType());
});
setCloudEventHeader(headers, CloudEvent.CAMEL_CLOUD_EVENT_TIME, () -> {
final ZonedDateTime created = exchange.getCreated().toInstant().atZone(ZoneId.systemDefault());
final String eventTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(created);

return eventTime;
});

for (Map.Entry<String, String> entry: service.getMetadata().entrySet()) {
if (entry.getKey().startsWith(Knative.KNATIVE_CE_OVERRIDE_PREFIX)) {
final String key = entry.getKey().substring(Knative.KNATIVE_CE_OVERRIDE_PREFIX.length());
Expand All @@ -102,4 +111,8 @@ public Processor producer(KnativeEndpoint endpoint, KnativeEnvironment.KnativeSe
}
};
}

protected void setCloudEventHeader(Map<String, Object> headers, String id, Supplier<Object> supplier) {
headers.putIfAbsent(cloudEvent().mandatoryAttribute(id).http(), supplier.get());
}
}

0 comments on commit 5b60f45

Please sign in to comment.