From b10ca90cd7da3dfa1c736ef7c40211f39ef72d05 Mon Sep 17 00:00:00 2001 From: mck Date: Sun, 1 Oct 2017 22:31:32 +1100 Subject: [PATCH 1/4] In Cassandra3 storage, only use consistency level LOCAL_ONE if a local_dc has been defined, otherwise use ONE. Mark all writes as idempotent. Update the retry policy, honouring idempotence, consistency level, and applying appropriate back-pressure. --- .../cassandra3/CassandraSpanStore.java | 2 - .../cassandra3/DefaultSessionFactory.java | 11 ++- .../storage/cassandra3/ZipkinRetryPolicy.java | 77 +++++++++++-------- 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java index 56d2605cc7e..6f1ad3c22cc 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/CassandraSpanStore.java @@ -97,7 +97,6 @@ final class CassandraSpanStore implements GuavaSpanStore { private final Function rowToServiceName; private final Function rowToSpan; private final Function>, Map> collapseTraceIdMaps; - private final int traceTtl; private final int indexTtl; CassandraSpanStore(Session session, int maxTraceCols, int indexFetchMultiplier, @@ -212,7 +211,6 @@ final class CassandraSpanStore implements GuavaSpanStore { }; KeyspaceMetadata md = Schema.getKeyspaceMetadata(session); - this.traceTtl = md.getTable(TABLE_TRACES).getOptions().getDefaultTimeToLive(); this.indexTtl = md.getTable(TABLE_TRACE_BY_SERVICE_SPAN).getOptions().getDefaultTimeToLive(); } diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java index 06bc8acbb04..9b22826ff6a 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,10 +14,12 @@ package zipkin.storage.cassandra3; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.QueryLogger; +import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.Session; import com.datastax.driver.core.TypeCodec; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; @@ -123,6 +125,13 @@ static Cluster buildCluster(Cassandra3Storage cassandra) { builder.withPoolingOptions(new PoolingOptions().setMaxConnectionsPerHost( HostDistance.LOCAL, cassandra.maxConnections )); + + builder.withQueryOptions( + new QueryOptions() + .setConsistencyLevel( + null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE) + .setDefaultIdempotence(true)); + if (cassandra.useSsl) { builder = builder.withSSL(); } diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java index 17c29948d36..5860c28ba46 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,11 +14,11 @@ package zipkin.storage.cassandra3; -import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.Statement; import com.datastax.driver.core.WriteType; import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.RetryPolicy; /** This class was copied from org.twitter.zipkin.storage.cassandra.ZipkinRetryPolicy */ @@ -29,37 +29,54 @@ final class ZipkinRetryPolicy implements RetryPolicy { private ZipkinRetryPolicy() { } - @Override - public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, - int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry) { - return RetryDecision.retry(ConsistencyLevel.ONE); - } + @Override + public RetryDecision onReadTimeout( + Statement stmt, + ConsistencyLevel cl, + int required, + int received, + boolean retrieved, + int retry) { - @Override - public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, - int requiredAcks, int receivedAcks, int nbRetry) { - return RetryDecision.retry(ConsistencyLevel.ONE); - } + if (retry > 1) { + try { + Thread.sleep(100); + } catch (InterruptedException expected) { } + } + return stmt.isIdempotent() + ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow() + : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry); + } - @Override - public RetryDecision onUnavailable(Statement statement, ConsistencyLevel cl, int requiredReplica, - int aliveReplica, int nbRetry) { - return RetryDecision.retry(ConsistencyLevel.ONE); - } + @Override + public RetryDecision onWriteTimeout( + Statement stmt, + ConsistencyLevel cl, + WriteType type, + int required, + int received, + int retry) { - @Override - public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, - DriverException e, int nbRetry) { - return RetryDecision.tryNextHost(ConsistencyLevel.ONE); - } + return stmt.isIdempotent() + ? RetryDecision.retry(cl) + : DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry); + } - @Override - public void init(Cluster cluster) { - // nothing to do - } + @Override + public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required, int aliveReplica, int retry) { + return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica, retry == 1 ? 0 : retry); + } - @Override - public void close() { - // nothing to do - } + @Override + public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex, int nbRetry) { + return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry); + } + + @Override + public void init(com.datastax.driver.core.Cluster cluster) { + } + + @Override + public void close() { + } } From c673801ec7dda02782c7965b478adeffa777aae4 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 2 Oct 2017 09:59:53 +0800 Subject: [PATCH 2/4] formatting --- .../cassandra3/DefaultSessionFactory.java | 8 +- .../storage/cassandra3/ZipkinRetryPolicy.java | 76 +++++++++---------- 2 files changed, 39 insertions(+), 45 deletions(-) diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java index 9b22826ff6a..edc78fc2302 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java @@ -127,10 +127,10 @@ static Cluster buildCluster(Cassandra3Storage cassandra) { )); builder.withQueryOptions( - new QueryOptions() - .setConsistencyLevel( - null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE) - .setDefaultIdempotence(true)); + new QueryOptions() + .setConsistencyLevel( + null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE) + .setDefaultIdempotence(true)); if (cassandra.useSsl) { builder = builder.withSSL(); diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java index 5860c28ba46..3d53850f9b6 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java @@ -29,54 +29,48 @@ final class ZipkinRetryPolicy implements RetryPolicy { private ZipkinRetryPolicy() { } - @Override - public RetryDecision onReadTimeout( - Statement stmt, - ConsistencyLevel cl, - int required, - int received, - boolean retrieved, - int retry) { + @Override + public RetryDecision onReadTimeout(Statement stmt, ConsistencyLevel cl, int required, + int received, boolean retrieved, int retry) { - if (retry > 1) { - try { - Thread.sleep(100); - } catch (InterruptedException expected) { } + if (retry > 1) { + try { + Thread.sleep(100); + } catch (InterruptedException expected) { } - return stmt.isIdempotent() - ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow() - : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry); } + return stmt.isIdempotent() + ? retry < 10 ? RetryDecision.retry(cl) : RetryDecision.rethrow() + : DefaultRetryPolicy.INSTANCE.onReadTimeout(stmt, cl, required, received, retrieved, retry); + } - @Override - public RetryDecision onWriteTimeout( - Statement stmt, - ConsistencyLevel cl, - WriteType type, - int required, - int received, - int retry) { + @Override + public RetryDecision onWriteTimeout(Statement stmt, ConsistencyLevel cl, WriteType type, + int required, int received, int retry) { - return stmt.isIdempotent() - ? RetryDecision.retry(cl) - : DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry); - } + return stmt.isIdempotent() + ? RetryDecision.retry(cl) + : DefaultRetryPolicy.INSTANCE.onWriteTimeout(stmt, cl, type, required, received, retry); + } - @Override - public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required, int aliveReplica, int retry) { - return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica, retry == 1 ? 0 : retry); - } + @Override + public RetryDecision onUnavailable(Statement stmt, ConsistencyLevel cl, int required, + int aliveReplica, int retry) { + return DefaultRetryPolicy.INSTANCE.onUnavailable(stmt, cl, required, aliveReplica, + retry == 1 ? 0 : retry); + } - @Override - public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex, int nbRetry) { - return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry); - } + @Override + public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverException ex, + int nbRetry) { + return DefaultRetryPolicy.INSTANCE.onRequestError(stmt, cl, ex, nbRetry); + } - @Override - public void init(com.datastax.driver.core.Cluster cluster) { - } + @Override + public void init(com.datastax.driver.core.Cluster cluster) { + } - @Override - public void close() { - } + @Override + public void close() { + } } From 6269fe0bda6c26910af5918db193baea780e2566 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 2 Oct 2017 10:01:33 +0800 Subject: [PATCH 3/4] fixed import --- .../main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java index 3d53850f9b6..c4a62064cab 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/ZipkinRetryPolicy.java @@ -14,6 +14,7 @@ package zipkin.storage.cassandra3; +import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.Statement; import com.datastax.driver.core.WriteType; @@ -67,7 +68,7 @@ public RetryDecision onRequestError(Statement stmt, ConsistencyLevel cl, DriverE } @Override - public void init(com.datastax.driver.core.Cluster cluster) { + public void init(Cluster cluster) { } @Override From 0ffe528581a01a53d54a671db3ab8b87c28d8998 Mon Sep 17 00:00:00 2001 From: mck Date: Mon, 2 Oct 2017 19:54:45 +1100 Subject: [PATCH 4/4] =?UTF-8?q?SQUASH=20ME=20=E2=80=93=C2=A0add=20comment?= =?UTF-8?q?=20per=20https://github.com/openzipkin/zipkin/pull/1753/files#r?= =?UTF-8?q?142049082?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/zipkin/storage/cassandra3/DefaultSessionFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java index edc78fc2302..974c524a291 100644 --- a/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java +++ b/zipkin-storage/cassandra3/src/main/java/zipkin/storage/cassandra3/DefaultSessionFactory.java @@ -128,8 +128,10 @@ static Cluster buildCluster(Cassandra3Storage cassandra) { builder.withQueryOptions( new QueryOptions() + // if local_dc isn't defined LOCAL_ONE incorrectly sticks to first seed host that connects .setConsistencyLevel( null != cassandra.localDc ? ConsistencyLevel.LOCAL_ONE : ConsistencyLevel.ONE) + // all zipkin cql writes are idempotent .setDefaultIdempotence(true)); if (cassandra.useSsl) {