Skip to content

Commit

Permalink
platform-http: split initialization and start phase
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed May 6, 2020
1 parent be83bcf commit 58d1bb0
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.security.KeyStore;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -52,6 +51,7 @@ public final class PlatformHttpServer extends ServiceSupport {
private final CamelContext context;
private final PlatformHttpServiceConfiguration configuration;
private final Vertx vertx;
private final Router router;
private final ExecutorService executor;

private HttpServer server;
Expand All @@ -60,27 +60,12 @@ public PlatformHttpServer(CamelContext context, PlatformHttpServiceConfiguration
this.context = context;
this.configuration = configuration;
this.vertx = vertx;
this.router = Router.router(vertx);
this.executor = executor;
}

@Override
protected void doStart() throws Exception {
startAsync().toCompletableFuture().join();
}

@Override
protected void doStop() throws Exception {
try {
if (server != null) {
stopAsync().toCompletableFuture().join();
}
} finally {
this.server = null;
}
}

private CompletionStage<Void> startAsync() {
final Router router = Router.router(vertx);
protected void doInit() throws Exception {
final Router subRouter = Router.router(vertx);

if (configuration.getCors().isEnabled()) {
Expand All @@ -107,8 +92,11 @@ private CompletionStage<Void> startAsync() {
}

server = vertx.createHttpServer(options);
}

return CompletableFuture.runAsync(
@Override
protected void doStart() throws Exception {
CompletableFuture.runAsync(
() -> {
CountDownLatch latch = new CountDownLatch(1);
server.requestHandler(router).listen(configuration.getBindPort(), configuration.getBindHost(), result -> {
Expand Down Expand Up @@ -136,41 +124,48 @@ private CompletionStage<Void> startAsync() {
}
},
executor
);
).toCompletableFuture().join();
}

protected CompletionStage<Void> stopAsync() {
return CompletableFuture.runAsync(
() -> {
CountDownLatch latch = new CountDownLatch(1);

// remove the platform-http component
context.removeComponent(PlatformHttpConstants.PLATFORM_HTTP_COMPONENT_NAME);

server.close(result -> {
try {
if (result.failed()) {
LOGGER.warn("Failed to close Vert.x HttpServer reason: {}",
result.cause().getMessage()
);

throw new RuntimeException(result.cause());
@Override
protected void doStop() throws Exception {
try {
if (server != null) {
CompletableFuture.runAsync(
() -> {
CountDownLatch latch = new CountDownLatch(1);

// remove the platform-http component
context.removeComponent(PlatformHttpConstants.PLATFORM_HTTP_COMPONENT_NAME);

server.close(result -> {
try {
if (result.failed()) {
LOGGER.warn("Failed to close Vert.x HttpServer reason: {}",
result.cause().getMessage()
);

throw new RuntimeException(result.cause());
}

LOGGER.info("Vert.x HttpServer stopped");
} finally {
latch.countDown();
}
});

try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

LOGGER.info("Vert.x HttpServer stopped");
} finally {
latch.countDown();
}
});

try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
},
executor
);
},
executor
).toCompletableFuture().join();
}
} finally {
this.server = null;
}
}

private Handler<RoutingContext> createBodyHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public int getOrder() {
@Override
public void apply(CamelContext camelContext) {
endpoint = new PlatformHttpServiceEndpoint(camelContext, this);
endpoint.start();
//endpoint.init();

try {
camelContext.addService(endpoint, true, true);
camelContext.addService(endpoint);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public PlatformHttpServiceEndpoint(CamelContext context, PlatformHttpServiceConf
}

@Override
protected void doStart() throws Exception {
protected void doInit() throws Exception {
vertx = CamelContextHelper.findByType(context, Vertx.class);
executor = context.getExecutorServiceManager().newSingleThreadExecutor(this, "platform-http-service");

Expand All @@ -64,6 +64,11 @@ protected void doStart() throws Exception {
}

vertxHttpServer = new PlatformHttpServer(context, configuration, vertx, executor);
vertxHttpServer.init();
}

@Override
protected void doStart() throws Exception {
vertxHttpServer.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.k.http;

import java.net.ConnectException;
import java.util.Arrays;

import io.vertx.core.http.HttpMethod;
Expand All @@ -41,6 +42,7 @@

import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.hamcrest.Matchers.equalTo;

public class PlatformHttpServiceCustomizerTest {
Expand All @@ -59,24 +61,30 @@ public void testPlatformHttpServiceCustomizer(String path) throws Exception {

httpService.apply(runtime.getCamelContext());

PlatformHttp.lookup(runtime.getCamelContext()).router().route(HttpMethod.GET, "/my/path")
.handler(routingContext -> {
JsonObject response = new JsonObject();
response.put("status", "UP");

routingContext.response()
.putHeader("content-type", "application/json")
.setStatusCode(200)
.end(Json.encodePrettily(response));
});

given()
.port(httpService.getBindPort())
.when()
.get(path + "/my/path")
.then()
.statusCode(200)
.body("status", equalTo("UP"));
try {
runtime.getCamelContext().start();

PlatformHttp.lookup(runtime.getCamelContext()).router().route(HttpMethod.GET, "/my/path")
.handler(routingContext -> {
JsonObject response = new JsonObject();
response.put("status", "UP");

routingContext.response()
.putHeader("content-type", "application/json")
.setStatusCode(200)
.end(Json.encodePrettily(response));
});

given()
.port(httpService.getBindPort())
.when()
.get(path + "/my/path")
.then()
.statusCode(200)
.body("status", equalTo("UP"));
} finally {
runtime.getCamelContext().stop();
}
}

@ParameterizedTest
Expand Down Expand Up @@ -152,6 +160,31 @@ public void configure() throws Exception {
}
}

@Test
public void testPlatformHttpServiceNotAvailableBeforeCamelContextStarts() throws Exception {
Runtime runtime = Runtime.on(new DefaultCamelContext());

var httpService = new PlatformHttpServiceContextCustomizer();
httpService.setBindPort(AvailablePortFinder.getNextAvailable());
httpService.apply(runtime.getCamelContext());

PlatformHttp.lookup(runtime.getCamelContext())
.router()
.route(HttpMethod.GET, "/my/path")
.handler(routingContext -> routingContext.response().setStatusCode(200).end());

assertThatExceptionOfType(ConnectException.class).isThrownBy(
() -> {
given()
.port(httpService.getBindPort())
.when()
.get("/my/path")
.then()
.extract();
}
);
}

@Test
public void testPlatformHttpComponentSSL() throws Exception {
KeyStoreParameters keystoreParameters = new KeyStoreParameters();
Expand Down

0 comments on commit 58d1bb0

Please sign in to comment.