Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customization of C* cluster and session creation. #14

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import smartthings.migration.MigrationParameters;
import smartthings.migration.MigrationRunner;

@DependsOn(CassandraService.class)
@DependsOn(Session.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this works as long as the provided implementation extends @Service. It would be good to test that part more though.

public class CassandraMigrationService implements Service {

private Logger logger = LoggerFactory.getLogger(CassandraMigrationService.class);
Expand All @@ -25,7 +25,7 @@ public CassandraMigrationService(CassandraModule.Config config) {
@Override
public void onStart(StartEvent event) throws Exception {
if (config.autoMigrate) {
Session session = event.getRegistry().get(CassandraService.class).getSession();
Session session = event.getRegistry().get(Session.class);
logger.info("Auto Migrating Cassandra");
MigrationRunner migrationRunner = new MigrationRunner();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package smartthings.ratpack.cassandra

import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Host
import com.datastax.driver.core.Session
import org.cassandraunit.CassandraCQLUnit
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet
import org.junit.Rule
Expand All @@ -19,12 +21,15 @@ class MigrationIntegrationSpec extends Specification {
CassandraModule.Config config = new CassandraModule.Config(keyspace: 'test', migrationFile: 'changelog.txt', seeds: seeds, autoMigrate: true)
CassandraMigrationService service = new CassandraMigrationService(config)
StartEvent mockStartEvent = Mock()
CassandraModule cassandraModule = new CassandraModule()
Cluster cluster = cassandraModule.cluster(config)

and:
CassandraService cassandraService = new CassandraService(config)
CassandraService cassandraService = new CassandraService(new DefaultSession(cluster, config))

and:
def registry = new SimpleMutableRegistry()
registry.add(Session, cassandraService.session)
registry.add(CassandraService, cassandraService)

and:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package smartthings.ratpack.cassandra;

import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.Optional;
import ratpack.exec.Promise;
import ratpack.service.Service;
import ratpack.service.StartEvent;
import ratpack.service.StopEvent;

/**
* Abstract class that transparently provides the role of both an
* injectable {@link Session} and fills the contract of a Ratpack {@link Service}.
*
* This is an abstract class to allow users to extend it for different reasons including
* the ability to provide different types for DI resolution.
*/
public abstract class AbstractSession implements RatpackSession, Session, Service {

private final Cluster cluster;
private final Optional<String> keyspace;
private Session delegate;

AbstractSession(Cluster cluster) {
this.cluster = cluster;
this.keyspace = Optional.empty();
}

AbstractSession(Cluster cluster, String keyspace) {
this.cluster = cluster;
this.keyspace = Optional.of(keyspace);
}

@Override
public String getLoggedKeyspace() {
return getDelegate().getLoggedKeyspace();
}

@Override
public Session init() {
return getDelegate().init();
}

@Override
public ListenableFuture<Session> initAsync() {
return getDelegate().initAsync();
}

@Override
public ResultSet execute(String s) {
return getDelegate().execute(s);
}

@Override
public ResultSet execute(String s, Object... objects) {
return getDelegate().execute(s, objects);
}

@Override
public ResultSet execute(String s, Map<String, Object> map) {
return getDelegate().execute(s, map);
}

@Override
public ResultSet execute(Statement statement) {
return getDelegate().execute(statement);
}

@Override
public ResultSetFuture executeAsync(String s) {
return getDelegate().executeAsync(s);
}

@Override
public ResultSetFuture executeAsync(String s, Object... objects) {
return getDelegate().executeAsync(s, objects);
}

@Override
public ResultSetFuture executeAsync(String s, Map<String, Object> map) {
return getDelegate().executeAsync(s, map);
}

@Override
public ResultSetFuture executeAsync(Statement statement) {
return getDelegate().executeAsync(statement);
}

@Override
public PreparedStatement prepare(String s) {
return getDelegate().prepare(s);
}

@Override
public PreparedStatement prepare(RegularStatement regularStatement) {
return getDelegate().prepare(regularStatement);
}

@Override
public ListenableFuture<PreparedStatement> prepareAsync(String s) {
return getDelegate().prepareAsync(s);
}

@Override
public
ListenableFuture<PreparedStatement> prepareAsync(RegularStatement regularStatement) {
return getDelegate().prepareAsync(regularStatement);
}

@Override
public Promise<ResultSet> executePromise(String query) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(String query, Object... values) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(String query, Map<String, Object> values) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(Statement statement) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(statement);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<PreparedStatement> preparePromise(String query) {
return Promise.async(upstream -> {
ListenableFuture<PreparedStatement> f = getDelegate().prepareAsync(query);
upstream.accept(f);
});
}

@Override
public Promise<PreparedStatement> preparePromise(RegularStatement statement) {
return Promise.async(upstream -> {
ListenableFuture<PreparedStatement> f = getDelegate().prepareAsync(statement);
upstream.accept(f);
});
}

@Override
public CloseFuture closeAsync() {
return getDelegate().closeAsync();
}

@Override
public void close() {
getDelegate().close();
}

@Override
public boolean isClosed() {
return getDelegate().isClosed();
}

@Override
public Cluster getCluster() {
return cluster;
}

@Override
public State getState() {
return getDelegate().getState();
}

@Override
public void onStart(StartEvent event) throws Exception {
delegate = getDelegate();
}

@Override
public void onStop(StopEvent event) throws Exception {
if (getDelegate() != null && !getDelegate().isClosed()) {
getDelegate().closeAsync();
}
}

protected final Session getDelegate() {
if (delegate == null) {
synchronized (this) {
if (delegate == null) {
delegate = createDelegate();
}
}
}
return delegate;
}

protected Session createDelegate() {
return (keyspace.isPresent()) ? cluster.connect(keyspace.get()) : cluster.connect();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package smartthings.ratpack.cassandra;

import com.datastax.driver.core.Session;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -8,15 +9,15 @@
import ratpack.registry.Registry;

public class CassandraHealthCheck implements HealthCheck {
private final CassandraService cassandraService;
private final Session session;
private final String validationQuery;

Logger logger = LoggerFactory.getLogger(CassandraHealthCheck.class);

@Inject
public CassandraHealthCheck(CassandraModule.Config cassandraConfig, CassandraService cassandraService) {
public CassandraHealthCheck(CassandraModule.Config cassandraConfig, Session session) {
this.validationQuery = cassandraConfig.getValidationQuery();
this.cassandraService = cassandraService;
this.session = session;
}

@Override
Expand All @@ -28,7 +29,7 @@ public String getName() {
public Promise<Result> check(Registry registry) throws Exception {
return Promise.async(upstream -> {
try {
cassandraService.getSession().execute(validationQuery);
session.execute(validationQuery);
upstream.success(Result.healthy());
} catch (Exception ex) {
logger.error("Cassandra connection is unhealthy", ex);
Expand Down
Loading