Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesnetherton committed Nov 15, 2024
1 parent 7c1583d commit b83a77c
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-quarkus-maintenance-branch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: Tar Maven Repo
shell: bash
run: |
tar -czf ${{ runner.temp }}/maven-repo.tgz -C ~ build-data .m2/repository
tar -czf ${{ runner.temp }}/maven-repo.tgz -C ~ .m2/repository
ls -lh ${{ runner.temp }}/maven-repo.tgz
df -h /
- name: Persist Maven Repo
Expand Down
13 changes: 13 additions & 0 deletions integration-tests/cassandraql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,23 @@
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel.kamelets</groupId>
<artifactId>camel-kamelets-catalog</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kamelet</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-cassandraql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class CassandraqlProducers {

@Named
public CqlSession customCqlSession(
@ConfigProperty(name = "quarkus.cassandra.auth.username") String username,
@ConfigProperty(name = "quarkus.cassandra.auth.password") String password,
@ConfigProperty(name = "quarkus.cassandra.contact-points") String dbUrl) {
@ConfigProperty(name = "cassandra.auth.username") String username,
@ConfigProperty(name = "cassandra.auth.password") String password,
@ConfigProperty(name = "cassandra.contact-points") String dbUrl) {
String[] urlParts = dbUrl.split(":");
CqlSessionBuilder sessionBuilder = CqlSession.builder();
sessionBuilder.addContactPoint(new InetSocketAddress(urlParts[0], Integer.parseInt(urlParts[1])));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,52 @@
*/
package org.apache.camel.quarkus.component.cassandraql.it;

import com.datastax.oss.quarkus.runtime.api.session.QuarkusCqlSession;
import java.net.URI;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.cassandra.CassandraAggregationRepository;
import org.apache.camel.processor.aggregate.cassandra.NamedCassandraAggregationRepository;
import org.apache.camel.processor.idempotent.cassandra.NamedCassandraIdempotentRepository;
import org.apache.camel.spi.AggregationRepository;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
public class CassandraqlRoutes extends RouteBuilder {
public static final String KEYSPACE = "test";

@ConfigProperty(name = "quarkus.cassandra.contact-points")
@ConfigProperty(name = "cassandra.contact-points")
String dbUrl;

@ConfigProperty(name = "quarkus.cassandra.auth.username")
@ConfigProperty(name = "cassandra.auth.username")
String userName;

@ConfigProperty(name = "quarkus.cassandra.auth.password")
@ConfigProperty(name = "cassandra.auth.password")
String password;

@Inject
@BindToRegistry("quarkusCqlSession")
QuarkusCqlSession session;
// @Inject
// @BindToRegistry("quarkusCqlSession")
// QuarkusCqlSession session;

@Override
public void configure() throws Exception {
URI uri = URI.create("http://" + dbUrl);

fromF("kamelet:cassandra-source?keyspace=%s&connectionPort=%s&connectionHost=%s&query=select * from employee&password=%s&username=%s",
KEYSPACE, uri.getPort(), uri.getHost(), userName, password)
.log("===============> ${body}");

from("direct:create")
.toF("cql://%s/%s?username=%s&password=%s&cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)", dbUrl,
KEYSPACE, userName, password);

from("direct:createIdempotent")
.idempotentConsumer(simple("${body[0]}"), new NamedCassandraIdempotentRepository(session, "ID"))
.toF("cql://%s/%s?username=%s&password=%s&cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)", dbUrl,
KEYSPACE, userName, password);
// from("direct:createIdempotent")
// .idempotentConsumer(simple("${body[0]}"), new NamedCassandraIdempotentRepository(session, "ID"))
// .toF("cql://%s/%s?username=%s&password=%s&cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)", dbUrl,
// KEYSPACE, userName, password);

from("direct:createCustomSession")
.toF("cql:bean:customCqlSession?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)");

from("direct:createQuarkusSession")
.to("cql:bean:quarkusCqlSession?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)");
// from("direct:createQuarkusSession")
// .to("cql:bean:quarkusCqlSession?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)");

from("direct:createCustomLoadBalancingPolicy")
.toF("cql://%s/%s?username=%s&password=%s&loadBalancingPolicyClass=%s&cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)",
Expand All @@ -83,12 +84,12 @@ public void configure() throws Exception {
.autoStartup(false)
.to("seda:employees");

from("direct:aggregate")
.aggregate(simple("${body.id}"), createAggregationStrategy())
.completionSize(3)
.completionTimeout(5000)
.aggregationRepository(createAggregationRepository())
.to("seda:employees");
// from("direct:aggregate")
// .aggregate(simple("${body.id}"), createAggregationStrategy())
// .completionSize(3)
// .completionTimeout(5000)
// .aggregationRepository(createAggregationRepository())
// .to("seda:employees");

from("direct:readWithCustomStrategy")
.toF("cql://%s/%s?username=%s&password=%s&resultSetConversionStrategy=#customResultSetConversionStrategy&cql=SELECT * FROM employee WHERE id = ?",
Expand Down Expand Up @@ -118,9 +119,9 @@ private AggregationStrategy createAggregationStrategy() {
};
}

private AggregationRepository createAggregationRepository() {
CassandraAggregationRepository repository = new NamedCassandraAggregationRepository();
repository.setSession(session);
return repository;
}
// private AggregationRepository createAggregationRepository() {
// CassandraAggregationRepository repository = new NamedCassandraAggregationRepository();
// repository.setSession(session);
// return repository;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

# Required for camel-cassandra AggregationStrategy
quarkus.camel.native.reflection.serialization-enabled = true
quarkus.camel.kamelet.identifiers=cassandra-source
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -92,6 +93,7 @@ public void customSession() {
}
}

@Disabled
@Test
public void quarkusCassandraSession() {
try {
Expand All @@ -110,6 +112,7 @@ public void quarkusCassandraSession() {
}
}

@Disabled
@Test
public void idempotent() {
try {
Expand All @@ -123,6 +126,7 @@ public void idempotent() {
}
}

@Disabled
@Test
public void aggregate() {
Stream.of("foo", "bar", "cheese").forEach(name -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public Map<String, String> start() {
// Note: The cassandra component does not depend on any of these being set.
// They're added to test the component with the (optional) QuarkusCqlSession
// produced by the Quarkus extension
"quarkus.cassandra.contact-points", cassandraUrl,
"quarkus.cassandra.local-datacenter", "datacenter1",
"quarkus.cassandra.auth.username", container.getUsername(),
"quarkus.cassandra.auth.password", container.getPassword(),
"quarkus.cassandra.keyspace", CassandraqlRoutes.KEYSPACE);
"cassandra.contact-points", cassandraUrl,
"cassandra.local-datacenter", "datacenter1",
"cassandra.auth.username", container.getUsername(),
"cassandra.auth.password", container.getPassword(),
"cassandra.keyspace", CassandraqlRoutes.KEYSPACE);

} catch (Exception e) {
LOGGER.error("Container does not start", e);
Expand Down

0 comments on commit b83a77c

Please sign in to comment.