Skip to content

Commit

Permalink
camel-knative: add support for URL in knative environment apache#369
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jun 21, 2020
1 parent 273a254 commit 2bd5ef4
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class Knative {
public static final String SERVICE_META_HOST = "service.host";
public static final String SERVICE_META_ZONE = "service.zone";
public static final String SERVICE_META_PATH = "service.path";
public static final String SERVICE_META_URL = "service.url";

private Knative() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -33,8 +33,6 @@
import org.apache.camel.impl.cloud.DefaultServiceDefinition;
import org.apache.camel.support.ResourceHelper;

import static org.apache.camel.util.CollectionHelper.mapOf;

/*
* Assuming it is loaded from a json for now
*/
Expand Down Expand Up @@ -70,7 +68,6 @@ public static KnativeEnvironment mandatoryLoadFromSerializedString(CamelContext

public static KnativeEnvironment mandatoryLoadFromResource(CamelContext context, String path) throws Exception {
try (InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(context, path)) {

//
// read the knative environment from a file formatted as json, i.e. :
//
Expand Down Expand Up @@ -100,129 +97,151 @@ public static KnativeEnvironment mandatoryLoadFromResource(CamelContext context,
}

public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKind, String name, String host, int port) {
return entry(
endpointKind,
Knative.Type.endpoint,
name,
host,
port,
Collections.emptyMap()
);
return serviceBuilder(Knative.Type.endpoint, name)
.withHost(host)
.withPort(port)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
.build();
}

public static KnativeServiceDefinition endpoint(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) {
return entry(
endpointKind,
Knative.Type.endpoint,
name,
host,
port,
metadata
);
return serviceBuilder(Knative.Type.endpoint, name)
.withHost(host)
.withPort(port)
.withMeta(metadata)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
.build();
}

public static KnativeServiceDefinition sourceEndpoint(String name, Map<String, String> metadata) {
return entry(
Knative.EndpointKind.source,
Knative.Type.endpoint,
name,
null,
-1,
metadata
);
return serviceBuilder(Knative.Type.endpoint, name)
.withMeta(metadata)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source.name())
.build();
}

public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port) {
return entry(
endpointKind,
Knative.Type.channel,
name,
host,
port,
Collections.emptyMap()
);
return serviceBuilder(Knative.Type.channel, name)
.withHost(host)
.withPort(port)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
.build();
}

public static KnativeServiceDefinition channel(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) {
return entry(
endpointKind,
Knative.Type.channel,
name,
host,
port,
metadata
);
return serviceBuilder(Knative.Type.channel, name)
.withHost(host)
.withPort(port)
.withMeta(metadata)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
.build();
}

public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port) {
return entry(
endpointKind,
Knative.Type.event,
name,
host,
port,
Collections.emptyMap()
);
return serviceBuilder(Knative.Type.event, name)
.withHost(host)
.withPort(port)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
.build();
}

public static KnativeServiceDefinition sourceEvent(String name) {
return entry(
Knative.EndpointKind.source,
Knative.Type.event,
name,
null,
-1,
Collections.emptyMap()
);
return serviceBuilder(Knative.Type.event, name)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source)
.build();
}

public static KnativeServiceDefinition sourceEvent(String name, Map<String, String> metadata) {
return entry(
Knative.EndpointKind.source,
Knative.Type.event,
name,
null,
-1,
metadata
);
return serviceBuilder(Knative.Type.event, name)
.withMeta(metadata)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.source)
.build();
}

public static KnativeServiceDefinition event(Knative.EndpointKind endpointKind, String name, String host, int port, Map<String, String> metadata) {
return entry(
endpointKind,
Knative.Type.event,
name,
host,
port,
metadata
);
}

public static KnativeServiceDefinition entry(Knative.EndpointKind endpointKind, Knative.Type type, String name, String host, int port, Map<String, String> metadata) {
return new KnativeEnvironment.KnativeServiceDefinition(
type,
name,
host,
port,
KnativeSupport.mergeMaps(
metadata,
mapOf(
Knative.CAMEL_ENDPOINT_KIND, endpointKind.name()
)
)
);
return serviceBuilder(Knative.Type.event, name)
.withHost(host)
.withPort(port)
.withMeta(metadata)
.withMeta(Knative.CAMEL_ENDPOINT_KIND, endpointKind)
.build();
}

public static KnativeEnvironment on(KnativeServiceDefinition... definitions) {
return new KnativeEnvironment(Arrays.asList(definitions));
}

public static KnativeServiceBuilder serviceBuilder(Knative.Type type, String name) {
return new KnativeServiceBuilder(type, name);
}

// ************************
//
// Types
//
// ************************


public static final class KnativeServiceBuilder {
private final Knative.Type type;
private final String name;
private String host;
private Integer port;
private Map<String, String> metadata;

public KnativeServiceBuilder(Knative.Type type, String name) {
this.type = type;
this.name = name;
}

public KnativeServiceBuilder withHost(String host) {
this.host = host;
return this;
}

public KnativeServiceBuilder withPort(Integer port) {
this.port = port;
return this;
}

public KnativeServiceBuilder withMeta(Map<String, String> metadata) {
if (metadata == null) {
return this;
}
if (this.metadata == null) {
this.metadata = new HashMap<>();
}
this.metadata.putAll(metadata);
return this;
}

public KnativeServiceBuilder withMeta(String key, String value) {
if (key == null || value == null) {
return this;
}
if (this.metadata == null) {
this.metadata = new HashMap<>();
}
this.metadata.put(key, value);
return this;
}

public KnativeServiceBuilder withMeta(String key, Enum<?> e) {
if (key == null || e == null) {
return this;
}
if (this.metadata == null) {
this.metadata = new HashMap<>();
}
this.metadata.put(key, e.name());
return this;
}

public KnativeServiceDefinition build() {
return new KnativeServiceDefinition(type, name, host, port, metadata);
}
}

public static final class KnativeServiceDefinition extends DefaultServiceDefinition {
@JsonCreator
public KnativeServiceDefinition(
Expand All @@ -238,7 +257,7 @@ public KnativeServiceDefinition(
port == null ? -1 : port,
KnativeSupport.mergeMaps(
metadata,
mapOf(
Map.of(
Knative.KNATIVE_TYPE, type.name())
)
);
Expand All @@ -249,21 +268,25 @@ public Knative.Type getType() {
}

public String getPath() {
return getMetadata().get(Knative.SERVICE_META_PATH);
return getMetadata(Knative.SERVICE_META_PATH);
}

public String getPathOrDefault(String path) {
return getMetadata().getOrDefault(Knative.SERVICE_META_PATH, path);
}

public String getEventType() {
return getMetadata().get(Knative.KNATIVE_EVENT_TYPE);
return getMetadata(Knative.KNATIVE_EVENT_TYPE);
}

public int getPortOrDefault(int port) {
return getPort() != -1 ? getPort() : port;
}

public String getUrl() {
return getMetadata(Knative.SERVICE_META_URL);
}

public String getMetadata(String key) {
return getMetadata().get(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.component.knative.http;

import java.util.Map;
import java.util.function.Supplier;

import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
Expand All @@ -38,6 +39,7 @@
import org.apache.camel.support.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.function.Suppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +51,7 @@ public class KnativeHttpProducer extends DefaultAsyncProducer {
private final Vertx vertx;
private final WebClientOptions clientOptions;
private final HeaderFilterStrategy headerFilterStrategy;
private final Supplier<String> uri;

private WebClient client;

Expand All @@ -65,6 +68,7 @@ public KnativeHttpProducer(
this.vertx = ObjectHelper.notNull(vertx, "vertx");
this.clientOptions = ObjectHelper.supplyIfEmpty(clientOptions, WebClientOptions::new);
this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy();
this.uri = Suppliers.memorize(() -> computeUrl(serviceDefinition));
}

@Override
Expand Down Expand Up @@ -111,10 +115,7 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
return true;
}

final int port = serviceDefinition.getPortOrDefault(KnativeHttpTransport.DEFAULT_PORT);
final String path = serviceDefinition.getPathOrDefault(KnativeHttpTransport.DEFAULT_PATH);

client.post(port, serviceDefinition.getHost(), path)
client.postAbs(this.uri.get())
.putHeaders(headers)
.sendBuffer(Buffer.buffer(payload), response -> {
if (response.succeeded()) {
Expand All @@ -136,7 +137,7 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
if (result.statusCode() < 200 || result.statusCode() >= 300) {
String exceptionMessage = String.format(
"HTTP operation failed invoking %s with statusCode: %d, statusMessage: %s",
URISupport.sanitizeUri(getURI()),
URISupport.sanitizeUri(this.uri.get()),
result.statusCode(),
result.statusMessage()
);
Expand All @@ -148,7 +149,7 @@ public boolean process(Exchange exchange, AsyncCallback callback) {

exchange.setMessage(answer);
} else if (response.failed()) {
String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(getURI());
String exceptionMessage = "HTTP operation failed invoking " + URISupport.sanitizeUri(this.uri.get());
if (response.result() != null) {
exceptionMessage += " with statusCode: " + response.result().statusCode();
}
Expand Down Expand Up @@ -180,12 +181,20 @@ protected void doStop() throws Exception {
}
}

private String getURI() {
String p = ObjectHelper.supplyIfEmpty(serviceDefinition.getPath(), () -> KnativeHttpTransport.DEFAULT_PATH);
if (!p.startsWith("/")) {
p = "/" + p;
private static String computeUrl(KnativeEnvironment.KnativeServiceDefinition definition) {
String url = definition.getUrl();
if (url == null) {
int port = definition.getPortOrDefault(KnativeHttpTransport.DEFAULT_PORT);
String path = definition.getPathOrDefault(KnativeHttpTransport.DEFAULT_PATH);

if (!path.startsWith("/")) {
path = "/" + path;
}

url = String.format("http://%s:%d%s", definition.getHost(), port, path);
}

return String.format("http://%s:%d%s", serviceDefinition.getHost(), serviceDefinition.getPort(), p);
return url;
}

}
Loading

0 comments on commit 2bd5ef4

Please sign in to comment.