Skip to content

Commit

Permalink
In Cassandra3 storage, only use consistency level LOCAL_ONE if a loca…
Browse files Browse the repository at this point in the history
…l_dc has been defined, otherwise use ONE (#1753)

Mark all writes as idempotent.
Update the retry policy, honouring idempotence, consistency level, and applying appropriate back-pressure.
  • Loading branch information
michaelsembwever authored and adriancole committed Oct 2, 2017
1 parent 2e7bc7f commit 4420e3b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ final class CassandraSpanStore implements GuavaSpanStore {
private final Function<Row, String> rowToServiceName;
private final Function<Row, Span> rowToSpan;
private final Function<List<Map<TraceIdUDT, Long>>, Map<TraceIdUDT, Long>> collapseTraceIdMaps;
private final int traceTtl;
private final int indexTtl;

CassandraSpanStore(Session session, int maxTraceCols, int indexFetchMultiplier,
Expand Down Expand Up @@ -212,7 +211,6 @@ final class CassandraSpanStore implements GuavaSpanStore {
};

KeyspaceMetadata md = Schema.getKeyspaceMetadata(session);
this.traceTtl = md.getTable(TABLE_TRACES).getOptions().getDefaultTimeToLive();
this.indexTtl = md.getTable(TABLE_TRACE_BY_SERVICE_SPAN).getOptions().getDefaultTimeToLive();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -14,10 +14,12 @@
package zipkin.storage.cassandra3;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.QueryLogger;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
Expand Down Expand Up @@ -123,6 +125,15 @@ static Cluster buildCluster(Cassandra3Storage cassandra) {
builder.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost(
HostDistance.LOCAL, cassandra.maxConnections
));

builder.withQueryOptions(
new QueryOptions()
// if local_dc isn't defined LOCAL_ONE incorrectly sticks to first seed host that connects
.setConsistencyLevel(
null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE)
// all zipkin cql writes are idempotent
.setDefaultIdempotence(true));

if (cassandra.useSsl) {
builder = builder.withSSL();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -19,6 +19,7 @@
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;

/** This class was copied from org.twitter.zipkin.storage.cassandra.ZipkinRetryPolicy */
Expand All @@ -30,36 +31,47 @@ private ZipkinRetryPolicy() {
}

@Override
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl,
int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) {
return RetryDecision.retry(ConsistencyLevel.ONE);
public RetryDecision onReadTimeout(Statement stmt, ConsistencyLevel cl, int required,
int received, boolean retrieved, int retry) {

if (retry > 1) {
try {
Thread.sleep(100);
} catch (InterruptedException expected) {
}
}
return stmt.isIdempotent()
? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow()
: DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry);
}

@Override
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType,
int requiredAcks, int receivedAcks, int nbRetry) {
return RetryDecision.retry(ConsistencyLevel.ONE);
public RetryDecision onWriteTimeout(Statement stmt, ConsistencyLevel cl, WriteType type,
int required, int received, int retry) {

return stmt.isIdempotent()
? RetryDecision.retry(cl)
: DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry);
}

@Override
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica,
int aliveReplica, int nbRetry) {
return RetryDecision.retry(ConsistencyLevel.ONE);
public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required,
int aliveReplica, int retry) {
return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica,
retry == 1 ? 0 : retry);
}

@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl,
DriverException e, int nbRetry) {
return RetryDecision.tryNextHost(ConsistencyLevel.ONE);
public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex,
int nbRetry) {
return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry);
}

@Override
public void init(Cluster cluster) {
// nothing to do
}

@Override
public void close() {
// nothing to do
}
}

0 comments on commit 4420e3b

Please sign in to comment.