diff --git a/build.gradle b/build.gradle index 6d7b242..1a9417b 100644 --- a/build.gradle +++ b/build.gradle @@ -25,7 +25,6 @@ task wrapper(type: Wrapper) { subprojects { - group = "smartthings" version = rootProject.file('version.txt').text.trim() @@ -37,7 +36,6 @@ subprojects { apply plugin: 'com.github.ben-manes.versions' apply from: rootProject.file('gradle/publishing.gradle') - sourceCompatibility = "1.8" targetCompatibility = "1.8" diff --git a/ratpack-cassandra-migrate/src/main/java/smartthings/ratpack/cassandra/CassandraMigrationService.java b/ratpack-cassandra-migrate/src/main/java/smartthings/ratpack/cassandra/CassandraMigrationService.java index 0ac9bdd..91ef2fc 100644 --- a/ratpack-cassandra-migrate/src/main/java/smartthings/ratpack/cassandra/CassandraMigrationService.java +++ b/ratpack-cassandra-migrate/src/main/java/smartthings/ratpack/cassandra/CassandraMigrationService.java @@ -10,7 +10,7 @@ import smartthings.migration.MigrationParameters; import smartthings.migration.MigrationRunner; -@DependsOn(CassandraService.class) +@DependsOn(Session.class) public class CassandraMigrationService implements Service { private Logger logger = LoggerFactory.getLogger(CassandraMigrationService.class); @@ -25,7 +25,7 @@ public CassandraMigrationService(CassandraModule.Config config) { @Override public void onStart(StartEvent event) throws Exception { if (config.autoMigrate) { - Session session = event.getRegistry().get(CassandraService.class).getSession(); + Session session = event.getRegistry().get(Session.class); logger.info("Auto Migrating Cassandra"); MigrationRunner migrationRunner = new MigrationRunner(); diff --git a/ratpack-cassandra-migrate/src/test/groovy/smartthings/ratpack/cassandra/MigrationIntegrationSpec.groovy b/ratpack-cassandra-migrate/src/test/groovy/smartthings/ratpack/cassandra/MigrationIntegrationSpec.groovy index 56d1dd0..ade092e 100644 --- a/ratpack-cassandra-migrate/src/test/groovy/smartthings/ratpack/cassandra/MigrationIntegrationSpec.groovy +++ b/ratpack-cassandra-migrate/src/test/groovy/smartthings/ratpack/cassandra/MigrationIntegrationSpec.groovy @@ -1,6 +1,8 @@ package smartthings.ratpack.cassandra +import com.datastax.driver.core.Cluster import com.datastax.driver.core.Host +import com.datastax.driver.core.Session import org.cassandraunit.CassandraCQLUnit import org.cassandraunit.dataset.cql.ClassPathCQLDataSet import org.junit.Rule @@ -19,12 +21,15 @@ class MigrationIntegrationSpec extends Specification { CassandraModule.Config config = new CassandraModule.Config(keyspace: 'test', migrationFile: 'changelog.txt', seeds: seeds, autoMigrate: true) CassandraMigrationService service = new CassandraMigrationService(config) StartEvent mockStartEvent = Mock() + CassandraModule cassandraModule = new CassandraModule() + Cluster cluster = cassandraModule.cluster(config) and: - CassandraService cassandraService = new CassandraService(config) + CassandraService cassandraService = new CassandraService(new DefaultSession(cluster, config)) and: def registry = new SimpleMutableRegistry() + registry.add(Session, cassandraService.session) registry.add(CassandraService, cassandraService) and: diff --git a/ratpack-cassandra-tracing/build.gradle b/ratpack-cassandra-tracing/build.gradle new file mode 100644 index 0000000..1039f75 --- /dev/null +++ b/ratpack-cassandra-tracing/build.gradle @@ -0,0 +1,12 @@ +dependencies { + compile "io.ratpack:ratpack-core:${ratpackVersion}" + compile "io.ratpack:ratpack-guice:${ratpackVersion}" + + compile 'com.datastax.cassandra:cassandra-driver-core:3.1.0' + compile 'smartthings.brave:smartthings-brave-cassandra-latencytracker:0.1.14' + compile project(':ratpack-cassandra') + + testCompile "io.ratpack:ratpack-groovy-test:${ratpackVersion}" + //Note cassandra unit pulls in netty-all which may need changed if the version conflicts with Ratpacks + testCompile 'org.cassandraunit:cassandra-unit:3.0.0.1' +} diff --git a/ratpack-cassandra-tracing/src/main/java/smartthings/ratpack/cassandra/zipkin/CassandraTracingModule.java b/ratpack-cassandra-tracing/src/main/java/smartthings/ratpack/cassandra/zipkin/CassandraTracingModule.java new file mode 100644 index 0000000..5690ebe --- /dev/null +++ b/ratpack-cassandra-tracing/src/main/java/smartthings/ratpack/cassandra/zipkin/CassandraTracingModule.java @@ -0,0 +1,33 @@ +package smartthings.ratpack.cassandra.zipkin; + +import com.datastax.driver.core.Session; +import com.google.inject.Scopes; +import smartthings.ratpack.cassandra.CassandraHealthCheck; +import smartthings.ratpack.cassandra.CassandraModule; +import smartthings.ratpack.cassandra.CassandraService; +import smartthings.ratpack.cassandra.RatpackSession; + +public class CassandraTracingModule extends CassandraModule { + + @Override + protected void configure() { + bind(TracedSession.class).in(Scopes.SINGLETON); + bind(Session.class).to(TracedSession.class); + bind(RatpackSession.class).to(TracedSession.class); + bind(CassandraService.class).in(Scopes.SINGLETON); + bind(CassandraHealthCheck.class).in(Scopes.SINGLETON); + } + + public static class Config extends CassandraModule.Config { + String serviceName; + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + } + +} diff --git a/ratpack-cassandra-tracing/src/main/java/smartthings/ratpack/cassandra/zipkin/TracedSession.java b/ratpack-cassandra-tracing/src/main/java/smartthings/ratpack/cassandra/zipkin/TracedSession.java new file mode 100644 index 0000000..851580f --- /dev/null +++ b/ratpack-cassandra-tracing/src/main/java/smartthings/ratpack/cassandra/zipkin/TracedSession.java @@ -0,0 +1,31 @@ +package smartthings.ratpack.cassandra.zipkin; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.github.kristofa.brave.Brave; +import com.google.inject.Inject; +import smartthings.ratpack.cassandra.AbstractSession; + +public final class TracedSession extends AbstractSession { + + private final Brave brave; + private final Cluster cluster; + private final String keyspace; + private final CassandraTracingModule.Config config; + + @Inject + public TracedSession(Cluster cluster, Brave brave, CassandraTracingModule.Config config) { + super(cluster, config.getKeyspace()); + this.brave = brave; + this.cluster = cluster; + this.keyspace = config.getKeyspace(); + this.config = config; + } + + protected Session createDelegate() { + Session session = (keyspace != null && !keyspace.equals("")) ? cluster.connect(keyspace) : cluster.connect(); + + return smartthings.brave.cassandra.TracedSession.create(session, brave, config.getServiceName()); + } + +} diff --git a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/AbstractSession.java b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/AbstractSession.java new file mode 100644 index 0000000..dc10359 --- /dev/null +++ b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/AbstractSession.java @@ -0,0 +1,216 @@ +package smartthings.ratpack.cassandra; + +import com.datastax.driver.core.CloseFuture; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.RegularStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Map; +import ratpack.exec.Promise; +import ratpack.service.Service; +import ratpack.service.StartEvent; +import ratpack.service.StopEvent; + +/** + * Abstract class that transparently provides the role of both an + * injectable {@link Session} and fills the contract of a Ratpack {@link Service}. + * + * This is an abstract class to allow users to extend it for different reasons including + * the ability to provide different types for DI resolution. + */ +public abstract class AbstractSession implements RatpackSession, Session, Service { + + private final Cluster cluster; + private final String keyspace; + private Session delegate; + + public AbstractSession(Cluster cluster) { + this.cluster = cluster; + this.keyspace = null; + } + + public AbstractSession(Cluster cluster, String keyspace) { + this.cluster = cluster; + this.keyspace = keyspace; + } + + @Override + public String getLoggedKeyspace() { + return getDelegate().getLoggedKeyspace(); + } + + @Override + public Session init() { + return getDelegate().init(); + } + + @Override + public ListenableFuture initAsync() { + return getDelegate().initAsync(); + } + + @Override + public ResultSet execute(String s) { + return getDelegate().execute(s); + } + + @Override + public ResultSet execute(String s, Object... objects) { + return getDelegate().execute(s, objects); + } + + @Override + public ResultSet execute(String s, Map map) { + return getDelegate().execute(s, map); + } + + @Override + public ResultSet execute(Statement statement) { + return getDelegate().execute(statement); + } + + @Override + public ResultSetFuture executeAsync(String s) { + return getDelegate().executeAsync(s); + } + + @Override + public ResultSetFuture executeAsync(String s, Object... objects) { + return getDelegate().executeAsync(s, objects); + } + + @Override + public ResultSetFuture executeAsync(String s, Map map) { + return getDelegate().executeAsync(s, map); + } + + @Override + public ResultSetFuture executeAsync(Statement statement) { + return getDelegate().executeAsync(statement); + } + + @Override + public PreparedStatement prepare(String s) { + return getDelegate().prepare(s); + } + + @Override + public PreparedStatement prepare(RegularStatement regularStatement) { + return getDelegate().prepare(regularStatement); + } + + @Override + public ListenableFuture prepareAsync(String s) { + return getDelegate().prepareAsync(s); + } + + @Override + public + ListenableFuture prepareAsync(RegularStatement regularStatement) { + return getDelegate().prepareAsync(regularStatement); + } + + @Override + public Promise executePromise(String query) { + return Promise.async(upstream -> { + ResultSetFuture resultSetFuture = getDelegate().executeAsync(query); + upstream.accept(resultSetFuture); + }); + } + + @Override + public Promise executePromise(String query, Object... values) { + return Promise.async(upstream -> { + ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values); + upstream.accept(resultSetFuture); + }); + } + + @Override + public Promise executePromise(String query, Map values) { + return Promise.async(upstream -> { + ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values); + upstream.accept(resultSetFuture); + }); + } + + @Override + public Promise executePromise(Statement statement) { + return Promise.async(upstream -> { + ResultSetFuture resultSetFuture = getDelegate().executeAsync(statement); + upstream.accept(resultSetFuture); + }); + } + + @Override + public Promise preparePromise(String query) { + return Promise.async(upstream -> { + ListenableFuture f = getDelegate().prepareAsync(query); + upstream.accept(f); + }); + } + + @Override + public Promise preparePromise(RegularStatement statement) { + return Promise.async(upstream -> { + ListenableFuture f = getDelegate().prepareAsync(statement); + upstream.accept(f); + }); + } + + @Override + public CloseFuture closeAsync() { + return getDelegate().closeAsync(); + } + + @Override + public void close() { + getDelegate().close(); + } + + @Override + public boolean isClosed() { + return getDelegate().isClosed(); + } + + @Override + public Cluster getCluster() { + return cluster; + } + + @Override + public State getState() { + return getDelegate().getState(); + } + + @Override + public void onStart(StartEvent event) throws Exception { + delegate = getDelegate(); + } + + @Override + public void onStop(StopEvent event) throws Exception { + if (getDelegate() != null && !getDelegate().isClosed()) { + getDelegate().closeAsync(); + } + } + + protected final Session getDelegate() { + if (delegate == null) { + synchronized (this) { + if (delegate == null) { + delegate = createDelegate(); + } + } + } + return delegate; + } + + protected Session createDelegate() { + return (keyspace != null && !keyspace.equals("")) ? cluster.connect(keyspace) : cluster.connect(); + } +} diff --git a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraHealthCheck.java b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraHealthCheck.java index a377b08..7cae482 100644 --- a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraHealthCheck.java +++ b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraHealthCheck.java @@ -1,5 +1,6 @@ package smartthings.ratpack.cassandra; +import com.datastax.driver.core.Session; import com.google.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -8,15 +9,15 @@ import ratpack.registry.Registry; public class CassandraHealthCheck implements HealthCheck { - private final CassandraService cassandraService; + private final Session session; private final String validationQuery; Logger logger = LoggerFactory.getLogger(CassandraHealthCheck.class); @Inject - public CassandraHealthCheck(CassandraModule.Config cassandraConfig, CassandraService cassandraService) { + public CassandraHealthCheck(CassandraModule.Config cassandraConfig, Session session) { this.validationQuery = cassandraConfig.getValidationQuery(); - this.cassandraService = cassandraService; + this.session = session; } @Override @@ -28,7 +29,7 @@ public String getName() { public Promise check(Registry registry) throws Exception { return Promise.async(upstream -> { try { - cassandraService.getSession().execute(validationQuery); + session.execute(validationQuery); upstream.success(Result.healthy()); } catch (Exception ex) { logger.error("Cassandra connection is unhealthy", ex); diff --git a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraModule.java b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraModule.java index 71e945f..0f44826 100644 --- a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraModule.java +++ b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraModule.java @@ -1,15 +1,35 @@ package smartthings.ratpack.cassandra; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.JdkSSLOptions; +import com.datastax.driver.core.PerHostPercentileTracker; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.EC2MultiRegionAddressTranslator; +import com.datastax.driver.core.policies.PercentileSpeculativeExecutionPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.google.inject.Provides; import com.google.inject.Scopes; -import ratpack.guice.ConfigurableModule; - +import java.io.FileInputStream; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.Arrays; import java.util.List; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import ratpack.guice.ConfigurableModule; /** * Supports Cassandra for Ratpack. */ public class CassandraModule extends ConfigurableModule { + private static final Logger logger = LoggerFactory.getLogger(CassandraModule.class); + public static class Config { public Config() { @@ -29,6 +49,8 @@ public Config() { String migrationFile = "/migrations/cql.changelog"; Boolean autoMigrate = false; + List cipherSuites = Arrays.asList("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"); + List seeds; public JKSConfig getTruststore() { @@ -139,8 +161,95 @@ public void setPassword(String password) { @Override protected void configure() { + bind(DefaultSession.class).in(Scopes.SINGLETON); + bind(Session.class).to(DefaultSession.class); + bind(RatpackSession.class).to(DefaultSession.class); bind(CassandraService.class).in(Scopes.SINGLETON); bind(CassandraHealthCheck.class).in(Scopes.SINGLETON); } + @Provides + public Cluster cluster(final Config config) { + return createCluster(config); + } + + /** + * Extension point for overriding {@link Cluster} creation. + * + * @param config + * @return + */ + protected Cluster createCluster(final Config config) { + //Set the highest tracking to just above the socket timeout for the read. + PerHostPercentileTracker + tracker = PerHostPercentileTracker.builder(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS + 500).build(); + + DCAwareRoundRobinPolicy + dcAwareRoundRobinPolicy = DCAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(1).build(); + + Cluster.Builder builder = Cluster.builder() + .withLoadBalancingPolicy(new TokenAwarePolicy(dcAwareRoundRobinPolicy)) + .withSpeculativeExecutionPolicy(new PercentileSpeculativeExecutionPolicy(tracker, 0.99, 3)); + + if (config.getShareEventLoopGroup()) { + builder.withNettyOptions(new RatpackCassandraNettyOptions()); + } + + for (String seed : config.seeds) { + if (seed.contains(":")) { + String[] tokens = seed.split(":"); + builder.addContactPoint(tokens[0]).withPort(Integer.parseInt(tokens[1])); + } else { + builder.addContactPoint(seed); + } + } + + builder.withAddressTranslator(new EC2MultiRegionAddressTranslator()); + + if (config.truststore != null) { + try { + SSLContext sslContext = createSSLContext(config); + builder.withSSL( + JdkSSLOptions.builder() + .withSSLContext(sslContext) + .withCipherSuites(config.cipherSuites.toArray(new String[0])) + .build()); + } catch (Exception e) { + logger.error("Couldn't add SSL to the cluster builder.", e); + } + } + + if (config.user != null) { + builder.withCredentials(config.user, config.password); + } + + return builder.build(); + } + + /** + * Extension point for overriding {@link SSLContext} creation. + * + * @param config + * @return + */ + protected SSLContext createSSLContext(final Config config) throws Exception { + FileInputStream tsf = new FileInputStream(config.truststore.path); + FileInputStream ksf = new FileInputStream(config.keystore.path); + SSLContext ctx = SSLContext.getInstance("SSL"); + + KeyStore ts = KeyStore.getInstance("JKS"); + ts.load(tsf, config.truststore.password.toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(ksf, config.keystore.password.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + + kmf.init(ks, config.keystore.password.toCharArray()); + + ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + return ctx; + } + } diff --git a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraService.java b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraService.java index 99e6b17..8ed6bc9 100644 --- a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraService.java +++ b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraService.java @@ -1,104 +1,29 @@ package smartthings.ratpack.cassandra; -import com.datastax.driver.core.*; -import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; -import com.datastax.driver.core.policies.EC2MultiRegionAddressTranslator; -import com.datastax.driver.core.policies.PercentileSpeculativeExecutionPolicy; -import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; import com.google.inject.Inject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import ratpack.exec.Promise; +import ratpack.service.DependsOn; import ratpack.service.Service; import ratpack.service.StartEvent; import ratpack.service.StopEvent; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; -import java.io.FileInputStream; -import java.security.KeyStore; -import java.security.SecureRandom; - +/** + * Deprecated. Should use Guice DI and refer to an instance of {@link Session} instead. + */ +@Deprecated() +@DependsOn(Session.class) public class CassandraService implements Service { - private Cluster cluster; private Session session; - private final String[] cipherSuites = new String[]{"TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"}; - private final CassandraModule.Config cassandraConfig; - - private Logger logger = LoggerFactory.getLogger(CassandraService.class); @Inject - public CassandraService(CassandraModule.Config cassandraConfig) { - this.cassandraConfig = cassandraConfig; - } - - private void connect() { - //Set the highest tracking to just above the socket timeout for the read. - PerHostPercentileTracker tracker = PerHostPercentileTracker.builder(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS + 500).build(); - - DCAwareRoundRobinPolicy dcAwareRoundRobinPolicy = DCAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(1).build(); - - Cluster.Builder builder = Cluster.builder() - .withLoadBalancingPolicy(new TokenAwarePolicy(dcAwareRoundRobinPolicy)) - .withSpeculativeExecutionPolicy(new PercentileSpeculativeExecutionPolicy(tracker, 0.99, 3)); - - if (cassandraConfig.getShareEventLoopGroup()) { - builder.withNettyOptions(new RatpackCassandraNettyOptions()); - } - - for (String seed : cassandraConfig.seeds) { - if (seed.contains(":")) { - String[] tokens = seed.split(":"); - builder.addContactPoint(tokens[0]).withPort(Integer.parseInt(tokens[1])); - } else { - builder.addContactPoint(seed); - } - } - - builder.withAddressTranslator(new EC2MultiRegionAddressTranslator()); - - if (cassandraConfig.truststore != null) { - try { - SSLContext sslContext = getSSLContext(cassandraConfig.truststore.path, cassandraConfig.truststore.password, cassandraConfig.keystore.path, cassandraConfig.keystore.password); - builder.withSSL(JdkSSLOptions.builder().withSSLContext(sslContext).withCipherSuites(cipherSuites).build()); - } catch (Exception e) { - logger.error("Couldn't add SSL to the cluster builder.", e); - } - } - - if (cassandraConfig.user != null) { - builder.withCredentials(cassandraConfig.user, cassandraConfig.password); - } - - cluster = builder.build(); - - if (cassandraConfig.keyspace != null) { - session = cluster.connect(cassandraConfig.keyspace); - } else { - session = cluster.connect(); - } - } - - private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword) throws Exception { - FileInputStream tsf = new FileInputStream(truststorePath); - FileInputStream ksf = new FileInputStream(keystorePath); - SSLContext ctx = SSLContext.getInstance("SSL"); - - KeyStore ts = KeyStore.getInstance("JKS"); - ts.load(tsf, truststorePassword.toCharArray()); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - tmf.init(ts); - - KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(ksf, keystorePassword.toCharArray()); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - - kmf.init(ks, keystorePassword.toCharArray()); - - ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); - return ctx; + public CassandraService(Session session) { + this.session = session; } public Promise execute(Statement statement) { @@ -109,8 +34,8 @@ public Promise execute(Statement statement) { } @Override - public void onStart(StartEvent event) throws Exception { - connect(); + public void onStart(StartEvent event) throws Exception { + session.init(); } @Override @@ -123,7 +48,7 @@ public Session getSession() { } public Cluster getCluster() { - return cluster; + return session.getCluster(); } } diff --git a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/DefaultSession.java b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/DefaultSession.java new file mode 100644 index 0000000..6847bf5 --- /dev/null +++ b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/DefaultSession.java @@ -0,0 +1,18 @@ +package smartthings.ratpack.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; + +/** + * Default implementation of {@link AbstractSession} that is registered as the default + * implementation of {@link Session} with Guice + */ +public final class DefaultSession extends AbstractSession { + + @Inject + public DefaultSession(Cluster cluster, CassandraModule.Config config) { + super(cluster, config.keyspace); + } + +} diff --git a/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/RatpackSession.java b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/RatpackSession.java new file mode 100644 index 0000000..d4cfa18 --- /dev/null +++ b/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/RatpackSession.java @@ -0,0 +1,25 @@ +package smartthings.ratpack.cassandra; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.RegularStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import java.util.Map; +import ratpack.exec.Promise; + +public interface RatpackSession extends Session { + + Promise executePromise(String query); + + Promise executePromise(String query, Object... values); + + Promise executePromise(String query, Map values); + + Promise executePromise(Statement statement); + + Promise preparePromise(String query); + + Promise preparePromise(RegularStatement statement); + +} diff --git a/ratpack-cassandra/src/test/groovy/smartthings/ratpack/cassandra/CassandraServiceSpec.groovy b/ratpack-cassandra/src/test/groovy/smartthings/ratpack/cassandra/CassandraServiceSpec.groovy index 32eae01..18775c1 100644 --- a/ratpack-cassandra/src/test/groovy/smartthings/ratpack/cassandra/CassandraServiceSpec.groovy +++ b/ratpack-cassandra/src/test/groovy/smartthings/ratpack/cassandra/CassandraServiceSpec.groovy @@ -1,5 +1,6 @@ package smartthings.ratpack.cassandra +import com.datastax.driver.core.Cluster import com.datastax.driver.core.exceptions.NoHostAvailableException import org.cassandraunit.CassandraCQLUnit import org.cassandraunit.dataset.CQLDataSet @@ -41,9 +42,12 @@ class CassandraServiceSpec extends Specification { cassConfig.setSeeds([TEST_SEED]) CassandraService service + CassandraModule module = new CassandraModule() + Cluster cluster = module.cluster(cassConfig) + when: harness.run { - service = new CassandraService(cassConfig) + service = new CassandraService(new DefaultSession(cluster, cassConfig)) service.onStart(new StartEvent() { @Override Registry getRegistry() { @@ -68,9 +72,12 @@ class CassandraServiceSpec extends Specification { cassConfig.setSeeds(["localhost:1111"]) CassandraService service + CassandraModule module = new CassandraModule() + Cluster cluster = module.cluster(cassConfig) + when: harness.run { - service = new CassandraService(cassConfig) + service = new CassandraService(new DefaultSession(cluster, cassConfig)) service.onStart(new StartEvent() { @Override Registry getRegistry() { diff --git a/settings.gradle b/settings.gradle index 6faf914..12a2dbe 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,7 @@ include 'ratpack-cassandra' include 'ratpack-cassandra-rx' include 'ratpack-cassandra-migrate' +include 'ratpack-cassandra-tracing' rootProject.name='ratpack-cassandra-parent'