Skip to content

Commit

Permalink
Sql aggregator does not work in native mode apache#2693
Browse files Browse the repository at this point in the history
  • Loading branch information
JiriOndrusek committed Jun 2, 2021
1 parent 5d2ef77 commit 8637533
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@
package org.apache.camel.quarkus.component.sql.deployment;

import java.sql.Types;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.LinkedHashMap;

import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveHierarchyBuildItem;
import org.apache.camel.quarkus.component.sql.CamelSqlConfig;
import org.apache.camel.support.DefaultExchangeHolder;
import org.jboss.jandex.DotName;
import org.jboss.jandex.Type;

class SqlProcessor {

Expand All @@ -35,19 +42,17 @@ FeatureBuildItem feature() {
}

@BuildStep
void registerForReflection(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
void registerForReflection3(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<ReflectiveHierarchyBuildItem> reflectiveHierarchy) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, Types.class));
}
reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, true, true, DefaultExchangeHolder.class.getName()));
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, false, true, String.class.getName()));

@BuildStep
void sqlNativeImageResources(BuildProducer<NativeImageResourceBuildItem> nativeImage, CamelSqlConfig config) {
if (!config.scriptFiles.isPresent()) {
return;
}

config.scriptFiles.get()
.stream()
.map(scriptFile -> new NativeImageResourceBuildItem(scriptFile.replace("classpath:", "")))
.forEach(nativeImage::produce);
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, false, true, LinkedHashMap.class.getName(),
HashMap.class.getName(), Number.class.getName(), Integer.class.getName()));

DotName simpleName = DotName.createSimple(LinkedHashMap.class.getName());
reflectiveHierarchy.produce(new ReflectiveHierarchyBuildItem.Builder().type(Type.create(simpleName, Type.Kind.CLASS))
.serialization(true).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@
*/
package org.apache.camel.quarkus.component.sql.it;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.response.ValidatableResponse;
import io.restassured.specification.RequestSpecification;
import org.apache.camel.component.sql.SqlConstants;
import org.apache.camel.util.CollectionHelper;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.hamcrest.Matchers.is;
import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.*;

@QuarkusTest
class SqlTest {
Expand All @@ -30,9 +41,10 @@ class SqlTest {
public void testSqlComponent() {
// Create Camel species
RestAssured.given()
.contentType(ContentType.TEXT)
.body("Dromedarius")
.post("/sql/post")
.contentType(ContentType.JSON)
.queryParam("table", "camel")
.body(CollectionHelper.mapOf("species", "Dromedarius"))
.post("/sql/insert")
.then()
.statusCode(201);

Expand Down Expand Up @@ -66,4 +78,164 @@ public void testSqlStoredComponent() {
.statusCode(200)
.body(is("15"));
}

@Test
public void testConsumer() throws InterruptedException {
testConsumer(1, "consumerRoute");
}

@Test
public void testClasspathConsumer() throws InterruptedException {
testConsumer(2, "consumerClasspathRoute");
}

@Test
public void testFileConsumer() throws InterruptedException {
testConsumer(3, "consumerFileRoute");
}

private void testConsumer(int id, String routeId) throws InterruptedException {
route(routeId, "start", "Started");

Map project = CollectionHelper.mapOf("ID", id, "PROJECT", routeId, "LICENSE", "222", "PROCESSED", false);
Map updatedProject = CollectionHelper.mapOf("ID", id, "PROJECT", routeId, "LICENSE", "XXX", "PROCESSED", false);

postMapWithParam("/sql/insert",
"table", "projects",
project)
.statusCode(201);

//wait for the record to be caught
await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<Object>) RestAssured
.get("/sql/get/results/" + routeId).then().extract().as(List.class),
both(iterableWithSize(1)).and(contains(project)));

//update
postMapWithParam("/sql/update",
"table", "projects",
updatedProject)
.statusCode(201);

//wait for the record to be caught
await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<Object>) RestAssured
.get("/sql/get/results/" + routeId).then().extract().as(List.class),
both(iterableWithSize(1)).and(contains(updatedProject)));

route(routeId, "stop", "Stopped");
}

@Test
public void testTransacted() throws InterruptedException {

postMap("/sql/toDirect/transacted", CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
"insert into projects values (5, 'Transacted', 'ASF', false)",
"rollback", false))
.statusCode(204);

postMap("/sql/toDirect/transacted", CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
"select * from projects where project = 'Transacted'"))
.statusCode(200)
.body("size()", is(1));

postMap("/sql/toDirect/transacted", CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
"insert into projects values (6, 'Transacted', 'ASF', false)",
"rollback", true))
.statusCode(200)
.body(is("java.lang.Exception:forced Exception"));

postMap("/sql/toDirect/transacted",
CollectionHelper.mapOf(SqlConstants.SQL_QUERY, "select * from projects where project = 'Transacted'"))
.statusCode(200)
.body("size()", is(1));
}

@Test
public void testDefaultErrorCode() throws InterruptedException {
postMap("/sql/toDirect/transacted", CollectionHelper.mapOf(SqlConstants.SQL_QUERY, "select * from NOT_EXIST order id"))
.statusCode(200)
.body(startsWith("org.springframework.jdbc.BadSqlGrammarException"));
}

@Test
public void testIdempotentRepository() {
// add value with key 1
postMapWithParam("/sql/toDirect/idempotent",
"body", "one",
CollectionHelper.mapOf("messageId", "1"))
.statusCode(200);

// add value with key 2
postMapWithParam("/sql/toDirect/idempotent",
"body", "two",
CollectionHelper.mapOf("messageId", "2"))
.statusCode(200);

// add same value with key 3
postMapWithParam("/sql/toDirect/idempotent",
"body", "three",
CollectionHelper.mapOf("messageId", "3"))
.statusCode(200);

// add another value with key 1 -- this one is supposed to be skipped
postMapWithParam("/sql/toDirect/idempotent",
"body", "four",
CollectionHelper.mapOf("messageId", "1"))
.statusCode(200);

// get all values from the result map
await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<? extends String>) RestAssured
.get("/sql/get/results/idempotentRoute").then().extract().as(List.class),
containsInAnyOrder("one", "two", "three"));
}

@Test
@Disabled //see https://github.com/apache/camel-quarkus/issues/2693
public void testAggregationRepository() {
postMapWithParam("/sql/toDirect/aggregation", "body", "A", CollectionHelper.mapOf("messageId", "123"))
.statusCode(200);

postMapWithParam("/sql/toDirect/aggregation", "body", "B", CollectionHelper.mapOf("messageId", "123"))
.statusCode(200);

postMapWithParam("/sql/toDirect/aggregation", "body", "C", CollectionHelper.mapOf("messageId", "123"))
.statusCode(200);

postMapWithParam("/sql/toDirect/aggregation", "body", "D", CollectionHelper.mapOf("messageId", "123"))
.statusCode(200);

// get all values from the result map
await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<? extends String>) RestAssured
.get("/sql/get/results/aggregationRoute").then().extract().as(List.class),
containsInAnyOrder("ABCD"));
}

private ValidatableResponse postMap(String toUrl, Map<String, String> body) {
return postMapWithParam(toUrl, null, null, body);
}

private ValidatableResponse postMapWithParam(String toUrl, String param, String paramValue, Map<String, String> body) {
RequestSpecification rs = RestAssured.given()
.contentType(ContentType.JSON);

if (param != null) {
rs = rs.queryParam(param, paramValue);
}

return rs.body(body)
.post(toUrl)
.then();
}

private void route(String routeId, String operation, String expectedOutput) {
RestAssured.given()
.get("/sql/route/" + routeId + "/" + operation)
.then().statusCode(204);

if (expectedOutput != null) {
await().atMost(5, TimeUnit.SECONDS).until(() -> RestAssured
.get("/sql/route/" + routeId + "/status")
.then()
.extract().asString(), equalTo(expectedOutput));
}
}
}

0 comments on commit 8637533

Please sign in to comment.