Skip to content

Commit

Permalink
Add runtime support for Knative sinkbinding apache#365
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jun 21, 2020
1 parent 2bd5ef4 commit 178d158
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
*/
package org.apache.camel.k.loader.knative;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.k.Source;
import org.apache.camel.k.SourceLoader;
import org.apache.camel.k.annotation.LoaderInterceptor;
import org.apache.camel.k.support.RuntimeSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.ToDefinition;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,16 +54,22 @@ public Optional<RoutesBuilder> builder() {
return RuntimeSupport.afterConfigure(result.builder(), builder -> {
final CamelContext camelContext = builder.getContext();
final List<RouteDefinition> definitions = builder.getRouteCollection().getRoutes();

if (definitions.size() == 1) {
final String sink = camelContext.resolvePropertyPlaceholders("{{env:KNATIVE_SINK:sink}}");
final String uri = String.format("knative://endpoint/%s", sink);
final String sinkName = camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}");
final String sinkUri = String.format("knative://endpoint/%s", sinkName);
final RouteDefinition definition = definitions.get(0);

LOGGER.info("Add sink:{} to route:{}", uri, definition.getId());
createSyntheticDefinition(camelContext, sinkName).ifPresent(serviceDefinition -> {
// publish the synthetic service definition
camelContext.getRegistry().bind(sinkName, serviceDefinition);
});

LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId());

// assuming that route is linear like there's no content based routing
// or ant other EIP that would branch the flow
definition.getOutputs().add(new ToDefinition(uri));
definition.getOutputs().add(new ToDefinition(sinkUri));
} else {
LOGGER.warn("Cannot determine route to enrich. the knative enpoint need to explicitly be defined");
}
Expand All @@ -68,4 +82,43 @@ public Optional<Object> configuration() {
}
};
}

private static Optional<KnativeEnvironment.KnativeServiceDefinition> createSyntheticDefinition(
CamelContext camelContext,
String sinkName) {

final String kSinkUrl = camelContext.resolvePropertyPlaceholders("{{k.sink:}}");
final String kCeOverride = camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}");

if (ObjectHelper.isNotEmpty(kSinkUrl)) {
// create a synthetic service definition to target the K_SINK url
var serviceBuilder = KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, sinkName)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink)
.withMeta(Knative.SERVICE_META_URL, kSinkUrl);

if (ObjectHelper.isNotEmpty(kCeOverride)) {
try (Reader reader = new StringReader(kCeOverride)) {
// assume K_CE_OVERRIDES is defined as simple key/val json
var overrides = Knative.MAPPER.readValue(
reader,
new TypeReference<HashMap<String, String>>() {});

for (var entry: overrides.entrySet()) {
// generate proper ce-override meta-data for the service
// definition
serviceBuilder.withMeta(
Knative.KNATIVE_CE_OVERRIDE_PREFIX + entry.getKey(),
entry.getValue()
);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

return Optional.of(serviceBuilder.build());
}

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Stream;

import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.knative.KnativeComponent;
import org.apache.camel.component.knative.KnativeConstants;
import org.apache.camel.component.knative.spi.CloudEvent;
import org.apache.camel.component.knative.spi.CloudEvents;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.mock.MockEndpoint;
Expand Down Expand Up @@ -74,11 +80,8 @@ public void testWrapLoader(String uri) throws Exception {
KnativeEnvironment.endpoint(Knative.EndpointKind.sink, "sink", "localhost", runtime.port)
));


CamelContext context = runtime.getCamelContext();
context.disableJMX();
context.setStreamCaching(true);
context.addComponent("knative", component);
context.addComponent(KnativeConstants.SCHEME, component);

Source source = Sources.fromURI(uri);
SourceLoader loader = RoutesConfigurer.load(runtime, source);
Expand Down Expand Up @@ -112,11 +115,82 @@ public void configure() throws Exception {
mock.expectedMessageCount(1);
mock.expectedBodiesReceived(data);

context.createProducerTemplate().sendBodyAndHeader(
"direct:start",
"",
"MyHeader",
data);
context.createFluentProducerTemplate()
.to("direct:start")
.withHeader("MyHeader", data)
.send();

mock.assertIsSatisfied();
} finally {
context.stop();
}
}

@ParameterizedTest
@MethodSource("parameters")
public void testWrapLoaderWithSyntheticServiceDefinition(String uri) throws Exception {
LOGGER.info("uri: {}", uri);

final String data = UUID.randomUUID().toString();
final TestRuntime runtime = new TestRuntime();
final String typeHeaderKey = CloudEvents.v1_0.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http();
final String typeHeaderVal = UUID.randomUUID().toString();
final String url = String.format("http://localhost:%d", runtime.port);

KnativeComponent component = new KnativeComponent();
component.setEnvironment(new KnativeEnvironment(Collections.emptyList()));

Properties properties = new Properties();
properties.put("knative.sink", "mySynk");
properties.put("k.sink", String.format("http://localhost:%d", runtime.port));
properties.put("k.ce.overrides", Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal)));

CamelContext context = runtime.getCamelContext();
context.getPropertiesComponent().setInitialProperties(properties);
context.addComponent(KnativeConstants.SCHEME, component);

Source source = Sources.fromURI(uri);
SourceLoader loader = RoutesConfigurer.load(runtime, source);

assertThat(loader.getSupportedLanguages()).contains(source.getLanguage());
assertThat(runtime.builders).hasSize(1);

try {
context.addRoutes(runtime.builders.get(0));
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
fromF("platform-http:/")
.routeId("http")
.to("mock:result");
}
});
context.start();

var definitions = context.adapt(ModelCamelContext.class).getRouteDefinitions();
var services = context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class);

assertThat(definitions).hasSize(2);
assertThat(definitions).first().satisfies(d -> {
assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue(
"endpointUri",
"knative://endpoint/mySynk"
);
});

assertThat(services).hasSize(1);
assertThat(services).first().hasFieldOrPropertyWithValue("name", "mySynk");
assertThat(services).first().hasFieldOrPropertyWithValue("url", url);

MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class);
mock.expectedMessageCount(1);
mock.expectedBodiesReceived(data);
mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal);

context.createFluentProducerTemplate()
.to("direct:start")
.withHeader("MyHeader", data)
.send();

mock.assertIsSatisfied();
} finally {
Expand All @@ -131,6 +205,9 @@ static class TestRuntime implements Runtime {

public TestRuntime() {
this.camelContext = new DefaultCamelContext();
this.camelContext.disableJMX();
this.camelContext.setStreamCaching(true);

this.builders = new ArrayList<>();
this.port = AvailablePortFinder.getNextAvailable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
* limitations under the License.
*/
from('direct:start')
.setBody().simple('${header[MyHeader]}')
.setBody().header('MyHeader')
.to('log:knative')
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ public class MyRoutes extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:start")
.setBody().simple("${header[MyHeader]}")
.setBody().header("MyHeader")
.to("log:knative");
}

@BindToRegistry("my-bean")
public static String myBean() {
return "my-bean-string";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
* limitations under the License.
*/
from('direct:start')
.setBody().simple('${header[MyHeader]}')
.setBody().header('MyHeader')
.to('log:knative');
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
* limitations under the License.
*/
from("direct:start")
.setBody().simple("\${header[MyHeader]}")
.setBody().header("MyHeader")
.to("log:knative")

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<route>
<from uri="direct:start"/>
<setBody>
<simple>${header[MyHeader]}</simple>
<header>MyHeader</header>
</setBody>
<to uri="log:knative"/>
</route>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -263,11 +265,48 @@ public KnativeServiceDefinition(
);
}

@Override
public String getHost() {
String urlAsString = getUrl();
if (urlAsString != null) {
try {
return new URL(urlAsString).getHost();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}

return super.getHost();
}

@Override
public int getPort() {
String urlAsString = getUrl();
if (urlAsString != null) {
try {
return new URL(urlAsString).getPort();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}

return super.getPort();
}

public Knative.Type getType() {
return Knative.Type.valueOf(getMetadata().get(Knative.KNATIVE_TYPE));
}

public String getPath() {
String urlAsString = getUrl();
if (urlAsString != null) {
try {
return new URL(urlAsString).getPath();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}

return getMetadata(Knative.SERVICE_META_PATH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ KnativeEnvironment.KnativeServiceDefinition lookupServiceDefinition(Knative.Endp
//
KnativeEnvironment.KnativeServiceDefinition service = lookupServiceDefinition(serviceName, endpointKind)
.or(() -> lookupServiceDefinition("default", endpointKind))
.orElseThrow(() -> new IllegalArgumentException(String.format("Unable to find a service definition for %s/%s/%s", type, serviceName, endpointKind)));
.orElseThrow(() -> new IllegalArgumentException(String.format("Unable to find a service definition for %s/%s/%s", type, endpointKind, serviceName)));

final Map<String, String> metadata = new HashMap<>(service.getMetadata());

Expand Down

0 comments on commit 178d158

Please sign in to comment.