Skip to content

Commit

Permalink
[Issue-2179] Add JDBC binding to Cassandra module
Browse files Browse the repository at this point in the history
Many enterprise environments require a JDBC connection to Cassandra for
integration into legacy systems. This patch changes the parent of the
CassandraContainer to JdbcDatabaseContainer and optionally allows enables
the 'native api' (thrift-over-rcp) used by many JDBC driver implementations.

A new test was added - testCassandraJdbcQuery().

== Status

This code isn't ready for merge - it's being offered for review. It
loosely corresponds to my work on Friday creating a local fork that
integrates into our existing environment where we were already using
JDBC connections to Cassandra servers. That fork works so I know this
fork is very close to working. Of particular note is the log line

```
INFO  [main] 2019-12-14 21:31:33,520 ThriftServer.java:116 - Binding thrift service to /0.0.0.0:9160
```

since that indicates that the server is listening to the RPC port
required by the JDBC driver.

== Remaining Work testcontainers#1

The connection attempt fails with

```
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:32862 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1:32862] Channel has been closed))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:268)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:107)
    at com.datastax.driver.core.Cluster$Manager.negotiateProtocolVersionAndConnect(Cluster.java:1813)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1726)
    at com.datastax.driver.core.Cluster.init(Cluster.java:214)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:387)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:338)
    at com.github.adejanovski.cassandra.jdbc.SessionHolder.createSession(SessionHolder.java:201)
    ... 36 more
```

however I know that the same JDBC driver works elsewhere. It may be something as simple as a missing datastax jar.

== Remaining Work testcontainers#2

The test includes calls to `cassandraContainer.setWaitStrategy()`. Neither seems to have any effect.
In the other fork I only saw a wait strategy used was when I explicitly set the waitStrategy
value in the constructor.
  • Loading branch information
beargiles-snaplogic committed Dec 14, 2019
1 parent ef96100 commit 3c6ec94
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 6 deletions.
2 changes: 1 addition & 1 deletion modules/cassandra/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description = "TestContainers :: Cassandra"

dependencies {
compile project(":database-commons")
compile project(":jdbc")
compile "com.datastax.cassandra:cassandra-driver-core:3.7.1"
}
94 changes: 94 additions & 0 deletions modules/cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>org.testcontainers</artifactId>
<groupId>cassandra-2179</groupId>
<version>pr-2179</version>
<packaging>jar</packaging>

<name>cassandra</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<fork>true</fork>
<meminitial>256m</meminitial>
<maxmem>1024m</maxmem>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>

<properties>
<testcontainers.version>1.12.3</testcontainers.version>
<datastax.version>3.8.0</datastax.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainers.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>

<!-- Dependencies -->
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>database-commons</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.rnorth.duct-tape</groupId>
<artifactId>duct-tape</artifactId>
<version>1.0.7</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${datastax.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>${datastax.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>${datastax.version}</version>
</dependency>

<dependency>
<groupId>com.github.adejanovski</groupId>
<artifactId>cassandra-jdbc-wrapper</artifactId>
<version>3.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,53 @@
*
* @author Eugeny Karpov
*/
public class CassandraContainer<SELF extends CassandraContainer<SELF>> extends GenericContainer<SELF> {
public class CassandraContainer<SELF extends CassandraContainer<SELF>> extends JdbcDatabaseContainer<SELF> {

public static final String IMAGE = "cassandra";
public static final String NAME = "cassandra";
public static final Integer CQL_PORT = 9042;
public static final Integer THRIFT_PORT = 9160;
public static final Integer JMX_PORT = 7199;
private static final String CONTAINER_CONFIG_LOCATION = "/etc/cassandra";
private static final String USERNAME = "cassandra";
private static final String PASSWORD = "cassandra";
private static final String DEFAULT_DRIVER_CLASSNAME = "com.github.adejanovski.cassandra.jdbc.CassandraDriver";

public static final String IMAGE = "cassandra";
public static final String DEFAULT_TAG = "3.11.2";

private String configLocation;
private String initScriptPath;
private boolean enableNativeAPI;
private boolean enableJmxReporting;
private String driverClassname = DEFAULT_DRIVER_CLASSNAME;

public CassandraContainer() {
this(IMAGE + ":3.11.2");
this(IMAGE + ":" + DEFAULT_TAG);
}

public CassandraContainer(String dockerImageName) {
super(dockerImageName);
addExposedPort(CQL_PORT);
setStartupAttempts(3);
this.enableNativeAPI = false;
this.enableJmxReporting = false;
}

@Override
protected Integer getLivenessCheckPort() {
return getMappedPort(CQL_PORT);
}

@Override
protected void configure() {
optionallyMapResourceParameterAsVolume(CONTAINER_CONFIG_LOCATION, configLocation);
if (enableNativeAPI) {
addExposedPort(THRIFT_PORT);
addEnv("CASSANDRA_START_RPC", "true");
}
if (enableJmxReporting) {
addExposedPort(JMX_PORT);
}
}

@Override
Expand All @@ -58,7 +79,7 @@ protected void containerIsStarted(InspectContainerResponse containerInfo) {
/**
* Load init script content and apply it to the database if initScriptPath is set
*/
private void runInitScriptIfRequired() {
protected void runInitScriptIfRequired() {
if (initScriptPath != null) {
try {
URL resource = Thread.currentThread().getContextClassLoader().getResource(initScriptPath);
Expand Down Expand Up @@ -118,6 +139,24 @@ public SELF withInitScript(String initScriptPath) {
return self();
}

/**
* Initialize Cassandra with Native support (via thrift over RPC) with default driver classname.
* This is usually required for JDBC access.
*/
public SELF withNativeAPI(boolean enableNativeAPI) {
return withNativeAPI(enableNativeAPI, DEFAULT_DRIVER_CLASSNAME);
}

/**
* Initialize Cassandra with Native support (via thrift over RPC) using specified driver classname.
* This is usually required for JDBC access.
*/
public SELF withNativeAPI(boolean enableNativeAPI, String driverClassname) {
this.enableNativeAPI = enableNativeAPI;
this.driverClassname = driverClassname;
return self();
}

/**
* Initialize Cassandra client with JMX reporting enabled or disabled
*/
Expand Down Expand Up @@ -150,6 +189,64 @@ public String getPassword() {
return PASSWORD;
}

/**
* Get recommended driver classname.
*/
@Override
public String getDriverClassName() {
return driverClassname;
}

/**
* Get JDBC URL
*
* Returns appropriate JDBC URL if JDBC support is enabled. Otherwise throws UnsupportedOperationException.
* If a keyspace is used it should be appended to the URL.
*/
@Override
public String getJdbcUrl() {
if (enableNativeAPI) {
return "jdbc:cassandra://" + getContainerIpAddress() + ":" + getMappedPort(THRIFT_PORT);
}
throw new UnsupportedOperationException();
}

/**
* Gets SQL query that can be used to verify the server is up. This query returns
* the server release version.
*/
@Override
protected String getTestQueryString() {
return "SELECT release_version FROM system.local";
}

/**
* Get mapped JQL port.
*
* @return
*/
public int getCqlPort() {
return getMappedPort(CQL_PORT);
}

/**
* Get mapped Native (thrift-over-RPC) port. This is used by most JDBC driver implementations.
*
* @return thrift port, or -1 if NativeAPI is not enabled
*/
public int getNativePort() {
return enableNativeAPI ? getMappedPort(THRIFT_PORT) : -1;
}

/**
* Get mapped JMX port.
*
* @return jmx port, or -1 if JMX is not enabled
*/
public int getJmxPort() {
return enableJmxReporting ? getMappedPort(JMX_PORT) : -1;
}

/**
* Get configured Cluster
*
Expand All @@ -173,7 +270,15 @@ public static Cluster getCluster(ContainerState containerState) {
return getCluster(containerState, false);
}

private DatabaseDelegate getDatabaseDelegate() {
/**
* Close down server. This has no effect.
*/
@Override
public void close() {
// no-op
}

protected DatabaseDelegate getDatabaseDelegate() {
return new CassandraDatabaseDelegate(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.testcontainers.containers;

import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.JdbcDatabaseContainerProvider;

/**
* Implementation of JdbcDatabaseContainerProvider for Cassandra.
*/
public class CassandraContainerProvider extends JdbcDatabaseContainerProvider {
@Override
public boolean supports(String databaseType) {
return databaseType.equals(CassandraContainer.NAME);
}

@Override
public JdbcDatabaseContainer newInstance() {
return newInstance(CassandraContainer.DEFAULT_TAG);
}

@Override
public JdbcDatabaseContainer newInstance(String tag) {
return new CassandraContainer(CassandraContainer.IMAGE + ":" + tag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.datastax.driver.core.exceptions.DriverException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.testcontainers.containers.CassandraContainer;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.delegate.AbstractDatabaseDelegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@
import com.datastax.driver.core.Session;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;

import static org.junit.Assert.*;

import static java.time.temporal.ChronoUnit.SECONDS;

/**
* @author Eugeny Karpov
*/
Expand Down Expand Up @@ -106,6 +115,23 @@ public void testCassandraGetCluster() {
}
}

@Test
public void testCassandraJdbcQuery() throws SQLException {
try (CassandraContainer cassandraContainer = new CassandraContainer<>().withNativeAPI(true)) {
cassandraContainer.withLogConsumer(obj -> System.out.println(((OutputFrame) obj).getUtf8String().trim()));
// this doesn't seem to have any effect
//cassandraContainer.setWaitStrategy(new HostPortWaitStrategy().withStartupTimeout(Duration.of(90, SECONDS)));
cassandraContainer.setWaitStrategy(new CassandraQueryWaitStrategy());
cassandraContainer.start();
try (Connection conn = cassandraContainer.createConnection("");
Statement stmt = conn.createStatement();
java.sql.ResultSet rs = stmt.executeQuery("SELECT release_version FROM system.local")) {
assertTrue("no records found", rs.next());
assertEquals("Result set has no release_version", cassandraContainer.DEFAULT_TAG, rs.getString("release_version"));
}
}
}

private void testInitScript(CassandraContainer cassandraContainer) {
ResultSet resultSet = performQuery(cassandraContainer, "SELECT * FROM keySpaceTest.catalog_category");
assertTrue("Query was not applied", resultSet.wasApplied());
Expand Down

0 comments on commit 3c6ec94

Please sign in to comment.