Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In Cassandra3 storage, only use consistency level LOCAL_ONE if a local_dc has been defined, otherwise use ONE #1753

Merged
merged 4 commits into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ final class CassandraSpanStore implements GuavaSpanStore {
private final Function<Row, String> rowToServiceName;
private final Function<Row, Span> rowToSpan;
private final Function<List<Map<TraceIdUDT, Long>>, Map<TraceIdUDT, Long>> collapseTraceIdMaps;
private final int traceTtl;
private final int indexTtl;

CassandraSpanStore(Session session, int maxTraceCols, int indexFetchMultiplier,
Expand Down Expand Up @@ -212,7 +211,6 @@ final class CassandraSpanStore implements GuavaSpanStore {
};

KeyspaceMetadata md = Schema.getKeyspaceMetadata(session);
this.traceTtl = md.getTable(TABLE_TRACES).getOptions().getDefaultTimeToLive();
this.indexTtl = md.getTable(TABLE_TRACE_BY_SERVICE_SPAN).getOptions().getDefaultTimeToLive();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,10 +14,12 @@
package zipkin.storage.cassandra3;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.QueryLogger;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
Expand Down Expand Up @@ -123,6 +125,13 @@ static Cluster buildCluster(Cassandra3Storage cassandra) {
builder.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost(
HostDistance.LOCAL, cassandra.maxConnections
));

builder.withQueryOptions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only one I don't know just by reading it. Can you put a comment line of why we do this, or tell me and I'll add it? Thx!

Copy link
Member Author

@michaelsembwever michaelsembwever Oct 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if local_dc isn't defined then LOCAL_ONE sticks requests to the dc of the first seed host that the driver connects to.
ONE will at least in this situation connect to any dc, and then any latency awareness will bring it back to (eventually) using the local dc.

furthermore LOCAL_ONE can fail easily if you're not using 'NetworkTopologyStrategy` but that's a fix for another day.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment. (can you please squash the 4 commits!)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great info. ty

new QueryOptions()
.setConsistencyLevel(
null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE)
.setDefaultIdempotence(true));

if (cassandra.useSsl) {
builder = builder.withSSL();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -19,6 +19,7 @@
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;

/** This class was copied from org.twitter.zipkin.storage.cassandra.ZipkinRetryPolicy */
Expand All @@ -30,36 +31,47 @@ private ZipkinRetryPolicy() {
}

@Override
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl,
int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
return RetryDecision.retry(ConsistencyLevel.ONE);
public RetryDecision onReadTimeout(Statement stmt, ConsistencyLevel cl, int required,
int received, boolean retrieved, int retry) {

if (retry > 1) {
try {
Thread.sleep(100);
} catch (InterruptedException expected) {
}
}
return stmt.isIdempotent()
? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow()
: DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry);
}

@Override
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType,
int requiredAcks, int receivedAcks, int nbRetry) {
return RetryDecision.retry(ConsistencyLevel.ONE);
public RetryDecision onWriteTimeout(Statement stmt, ConsistencyLevel cl, WriteType type,
int required, int received, int retry) {

return stmt.isIdempotent()
? RetryDecision.retry(cl)
: DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry);
}

@Override
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica,
int aliveReplica, int nbRetry) {
return RetryDecision.retry(ConsistencyLevel.ONE);
public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required,
int aliveReplica, int retry) {
return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica,
retry == 1 ? 0 : retry);
}

@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl,
DriverException e, int nbRetry) {
return RetryDecision.tryNextHost(ConsistencyLevel.ONE);
public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex,
int nbRetry) {
return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry);
}

@Override
public void init(Cluster cluster) {
// nothing to do
}

@Override
public void close() {
// nothing to do
}
}