diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java index 56d2605cc7e..6f1ad3c22cc 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java @@ -97,7 +97,6 @@ final class CassandraSpanStore implements GuavaSpanStore { private final Function rowToServiceName; private final Function rowToSpan; private final Function>, Map> collapseTraceIdMaps; - private final int traceTtl; private final int indexTtl; CassandraSpanStore(Session session, int maxTraceCols, int indexFetchMultiplier, @@ -212,7 +211,6 @@ final class CassandraSpanStore implements GuavaSpanStore { }; KeyspaceMetadata md = Schema.getKeyspaceMetadata(session); - this.traceTtl = md.getTable(TABLE_TRACES).getOptions().getDefaultTimeToLive(); this.indexTtl = md.getTable(TABLE_TRACE_BY_SERVICE_SPAN).getOptions().getDefaultTimeToLive(); } diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java index 06bc8acbb04..974c524a291 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,10 +14,12 @@ package zipkin.storage.cassandra3; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.QueryLogger; +import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.Session; import com.datastax.driver.core.TypeCodec; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; @@ -123,6 +125,15 @@ static Cluster buildCluster(Cassandra3Storage cassandra) { builder.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost( HostDistance.LOCAL, cassandra.maxConnections )); + + builder.withQueryOptions( + new QueryOptions() + // if local_dc isn't defined LOCAL_ONE incorrectly sticks to first seed host that connects + .setConsistencyLevel( + null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE) + // all zipkin cql writes are idempotent + .setDefaultIdempotence(true)); + if (cassandra.useSsl) { builder = builder.withSSL(); } diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java index 17c29948d36..c4a62064cab 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -19,6 +19,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.WriteType; import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.RetryPolicy; /** This class was copied from org.twitter.zipkin.storage.cassandra.ZipkinRetryPolicy */ @@ -30,36 +31,47 @@ private ZipkinRetryPolicy() { } @Override - public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, - int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) { - return RetryDecision.retry(ConsistencyLevel.ONE); + public RetryDecision onReadTimeout(Statement stmt, ConsistencyLevel cl, int required, + int received, boolean retrieved, int retry) { + + if (retry > 1) { + try { + Thread.sleep(100); + } catch (InterruptedException expected) { + } + } + return stmt.isIdempotent() + ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow() + : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry); } @Override - public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, - int requiredAcks, int receivedAcks, int nbRetry) { - return RetryDecision.retry(ConsistencyLevel.ONE); + public RetryDecision onWriteTimeout(Statement stmt, ConsistencyLevel cl, WriteType type, + int required, int received, int retry) { + + return stmt.isIdempotent() + ? RetryDecision.retry(cl) + : DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry); } @Override - public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica, - int aliveReplica, int nbRetry) { - return RetryDecision.retry(ConsistencyLevel.ONE); + public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required, + int aliveReplica, int retry) { + return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica, + retry == 1 ? 0 : retry); } @Override - public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, - DriverException e, int nbRetry) { - return RetryDecision.tryNextHost(ConsistencyLevel.ONE); + public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex, + int nbRetry) { + return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry); } @Override public void init(Cluster cluster) { - // nothing to do } @Override public void close() { - // nothing to do } }