Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customization of C* cluster and session creation. #14

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ task wrapper(type: Wrapper) {

subprojects {


group = "smartthings"
version = rootProject.file('version.txt').text.trim()

Expand All @@ -37,7 +36,6 @@ subprojects {
apply plugin: 'com.github.ben-manes.versions'
apply from: rootProject.file('gradle/publishing.gradle')


sourceCompatibility = "1.8"
targetCompatibility = "1.8"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import smartthings.migration.MigrationParameters;
import smartthings.migration.MigrationRunner;

@DependsOn(CassandraService.class)
@DependsOn(Session.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this works as long as the provided implementation extends @Service. It would be good to test that part more though.

public class CassandraMigrationService implements Service {

private Logger logger = LoggerFactory.getLogger(CassandraMigrationService.class);
Expand All @@ -25,7 +25,7 @@ public CassandraMigrationService(CassandraModule.Config config) {
@Override
public void onStart(StartEvent event) throws Exception {
if (config.autoMigrate) {
Session session = event.getRegistry().get(CassandraService.class).getSession();
Session session = event.getRegistry().get(Session.class);
logger.info("Auto Migrating Cassandra");
MigrationRunner migrationRunner = new MigrationRunner();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package smartthings.ratpack.cassandra

import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Host
import com.datastax.driver.core.Session
import org.cassandraunit.CassandraCQLUnit
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet
import org.junit.Rule
Expand All @@ -19,12 +21,15 @@ class MigrationIntegrationSpec extends Specification {
CassandraModule.Config config = new CassandraModule.Config(keyspace: 'test', migrationFile: 'changelog.txt', seeds: seeds, autoMigrate: true)
CassandraMigrationService service = new CassandraMigrationService(config)
StartEvent mockStartEvent = Mock()
CassandraModule cassandraModule = new CassandraModule()
Cluster cluster = cassandraModule.cluster(config)

and:
CassandraService cassandraService = new CassandraService(config)
CassandraService cassandraService = new CassandraService(new DefaultSession(cluster, config))

and:
def registry = new SimpleMutableRegistry()
registry.add(Session, cassandraService.session)
registry.add(CassandraService, cassandraService)

and:
Expand Down
12 changes: 12 additions & 0 deletions ratpack-cassandra-tracing/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
compile "io.ratpack:ratpack-core:${ratpackVersion}"
compile "io.ratpack:ratpack-guice:${ratpackVersion}"

compile 'com.datastax.cassandra:cassandra-driver-core:3.1.0'
compile 'smartthings.brave:smartthings-brave-cassandra-latencytracker:0.1.14'
compile project(':ratpack-cassandra')

testCompile "io.ratpack:ratpack-groovy-test:${ratpackVersion}"
//Note cassandra unit pulls in netty-all which may need changed if the version conflicts with Ratpacks
testCompile 'org.cassandraunit:cassandra-unit:3.0.0.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package smartthings.ratpack.cassandra.zipkin;

import com.datastax.driver.core.Session;
import com.google.inject.Scopes;
import smartthings.ratpack.cassandra.CassandraHealthCheck;
import smartthings.ratpack.cassandra.CassandraModule;
import smartthings.ratpack.cassandra.CassandraService;
import smartthings.ratpack.cassandra.RatpackSession;

public class CassandraTracingModule extends CassandraModule {

@Override
protected void configure() {
bind(TracedSession.class).in(Scopes.SINGLETON);
bind(Session.class).to(TracedSession.class);
bind(RatpackSession.class).to(TracedSession.class);
bind(CassandraService.class).in(Scopes.SINGLETON);
bind(CassandraHealthCheck.class).in(Scopes.SINGLETON);
}

public static class Config extends CassandraModule.Config {
String serviceName;

public String getServiceName() {
return serviceName;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package smartthings.ratpack.cassandra.zipkin;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.github.kristofa.brave.Brave;
import com.google.inject.Inject;
import smartthings.ratpack.cassandra.AbstractSession;

public final class TracedSession extends AbstractSession {

private final Brave brave;
private final Cluster cluster;
private final String keyspace;
private final CassandraTracingModule.Config config;

@Inject
public TracedSession(Cluster cluster, Brave brave, CassandraTracingModule.Config config) {
super(cluster, config.getKeyspace());
this.brave = brave;
this.cluster = cluster;
this.keyspace = config.getKeyspace();
this.config = config;
}

protected Session createDelegate() {
Session session = (keyspace != null && !keyspace.equals("")) ? cluster.connect(keyspace) : cluster.connect();

return smartthings.brave.cassandra.TracedSession.create(session, brave, config.getServiceName());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@llinder, the resultset callback works properly with Ratpack when we use this smartthings.brave.cassandra.TracedSession? (https://github.com/SmartThingsOSS/smartthings-brave/blob/master/brave-cassandra-latencytracker/src/main/java/smartthings/brave/cassandra/TracedSession.java#L122).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the stacktrace Brian is seeing:

SEVERE: RuntimeException while executing runnable smartthings.brave.cassandra.TracedSession$TracedResultSetFuture$$Lambda$1@30335d5c with executor MoreExecutors.directExecutor()
ratpack.exec.UnmanagedThreadException: Operation attempted on non Ratpack managed thread 'cluster1-nio-worker-2'
    at ratpack.exec.internal.DefaultExecution.require(DefaultExecution.java:104)
    at ratpack.exec.Execution.current(Execution.java:81)
    at ratpack.zipkin.internal.RatpackServerClientLocalSpanState.setCurrentServerSpan(RatpackServerClientLocalSpanState.java:104)
    at com.github.kristofa.brave.ServerSpanThreadBinder.setCurrentSpan(ServerSpanThreadBinder.java:52)
    at smartthings.brave.cassandra.TracedSession$TracedResultSetFuture.lambda$addListener$0(TracedSession.java:404)
    at smartthings.brave.cassandra.TracedSession$TracedResultSetFuture$$Lambda$1.run(Unknown Source)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:174)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:174)
    at com.datastax.driver.core.RequestHandler.access$2600(RequestHandler.java:43)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:793)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:496)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1012)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:935)

From a really quick look, my guess is that the traced version has brave internals operating in a non-ratpack way. I think the problem might be here:

https://github.com/SmartThingsOSS/ratpack-cassandra/blob/master/ratpack-cassandra/src/main/java/smartthings/ratpack/cassandra/CassandraService.java#L105

I wonder if changing to a Blocking.get would solve for both (Traced and Non-Traced).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that the current span is being injected into the Ratpack registry from a thread spawned by the Cassandra driver.

What we will need to figure out is how to call ServerSpanThreadBinder.setCurrentSpan(span) from a Ratpack managed thread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to consider using a different trace context. I can't think of a good reason it needs to interact with the Ratpack lifecycle at that level. Even a thread local context might be suitable for our needs.

I will try to do some investigation work later today.

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package smartthings.ratpack.cassandra;

import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import ratpack.exec.Promise;
import ratpack.service.Service;
import ratpack.service.StartEvent;
import ratpack.service.StopEvent;

/**
* Abstract class that transparently provides the role of both an
* injectable {@link Session} and fills the contract of a Ratpack {@link Service}.
*
* This is an abstract class to allow users to extend it for different reasons including
* the ability to provide different types for DI resolution.
*/
public abstract class AbstractSession implements RatpackSession, Session, Service {

private final Cluster cluster;
private final String keyspace;
private Session delegate;

public AbstractSession(Cluster cluster) {
this.cluster = cluster;
this.keyspace = null;
}

public AbstractSession(Cluster cluster, String keyspace) {
this.cluster = cluster;
this.keyspace = keyspace;
}

@Override
public String getLoggedKeyspace() {
return getDelegate().getLoggedKeyspace();
}

@Override
public Session init() {
return getDelegate().init();
}

@Override
public ListenableFuture<Session> initAsync() {
return getDelegate().initAsync();
}

@Override
public ResultSet execute(String s) {
return getDelegate().execute(s);
}

@Override
public ResultSet execute(String s, Object... objects) {
return getDelegate().execute(s, objects);
}

@Override
public ResultSet execute(String s, Map<String, Object> map) {
return getDelegate().execute(s, map);
}

@Override
public ResultSet execute(Statement statement) {
return getDelegate().execute(statement);
}

@Override
public ResultSetFuture executeAsync(String s) {
return getDelegate().executeAsync(s);
}

@Override
public ResultSetFuture executeAsync(String s, Object... objects) {
return getDelegate().executeAsync(s, objects);
}

@Override
public ResultSetFuture executeAsync(String s, Map<String, Object> map) {
return getDelegate().executeAsync(s, map);
}

@Override
public ResultSetFuture executeAsync(Statement statement) {
return getDelegate().executeAsync(statement);
}

@Override
public PreparedStatement prepare(String s) {
return getDelegate().prepare(s);
}

@Override
public PreparedStatement prepare(RegularStatement regularStatement) {
return getDelegate().prepare(regularStatement);
}

@Override
public ListenableFuture<PreparedStatement> prepareAsync(String s) {
return getDelegate().prepareAsync(s);
}

@Override
public
ListenableFuture<PreparedStatement> prepareAsync(RegularStatement regularStatement) {
return getDelegate().prepareAsync(regularStatement);
}

@Override
public Promise<ResultSet> executePromise(String query) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(String query, Object... values) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(String query, Map<String, Object> values) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(query, values);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<ResultSet> executePromise(Statement statement) {
return Promise.async(upstream -> {
ResultSetFuture resultSetFuture = getDelegate().executeAsync(statement);
upstream.accept(resultSetFuture);
});
}

@Override
public Promise<PreparedStatement> preparePromise(String query) {
return Promise.async(upstream -> {
ListenableFuture<PreparedStatement> f = getDelegate().prepareAsync(query);
upstream.accept(f);
});
}

@Override
public Promise<PreparedStatement> preparePromise(RegularStatement statement) {
return Promise.async(upstream -> {
ListenableFuture<PreparedStatement> f = getDelegate().prepareAsync(statement);
upstream.accept(f);
});
}

@Override
public CloseFuture closeAsync() {
return getDelegate().closeAsync();
}

@Override
public void close() {
getDelegate().close();
}

@Override
public boolean isClosed() {
return getDelegate().isClosed();
}

@Override
public Cluster getCluster() {
return cluster;
}

@Override
public State getState() {
return getDelegate().getState();
}

@Override
public void onStart(StartEvent event) throws Exception {
delegate = getDelegate();
}

@Override
public void onStop(StopEvent event) throws Exception {
if (getDelegate() != null && !getDelegate().isClosed()) {
getDelegate().closeAsync();
}
}

protected final Session getDelegate() {
if (delegate == null) {
synchronized (this) {
if (delegate == null) {
delegate = createDelegate();
}
}
}
return delegate;
}

protected Session createDelegate() {
return (keyspace != null && !keyspace.equals("")) ? cluster.connect(keyspace) : cluster.connect();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package smartthings.ratpack.cassandra;

import com.datastax.driver.core.Session;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -8,15 +9,15 @@
import ratpack.registry.Registry;

public class CassandraHealthCheck implements HealthCheck {
private final CassandraService cassandraService;
private final Session session;
private final String validationQuery;

Logger logger = LoggerFactory.getLogger(CassandraHealthCheck.class);

@Inject
public CassandraHealthCheck(CassandraModule.Config cassandraConfig, CassandraService cassandraService) {
public CassandraHealthCheck(CassandraModule.Config cassandraConfig, Session session) {
this.validationQuery = cassandraConfig.getValidationQuery();
this.cassandraService = cassandraService;
this.session = session;
}

@Override
Expand All @@ -28,7 +29,7 @@ public String getName() {
public Promise<Result> check(Registry registry) throws Exception {
return Promise.async(upstream -> {
try {
cassandraService.getSession().execute(validationQuery);
session.execute(validationQuery);
upstream.success(Result.healthy());
} catch (Exception ex) {
logger.error("Cassandra connection is unhealthy", ex);
Expand Down
Loading