Skip to content

Commit

Permalink
Fixes Cassandra indexes that lost traces in the same millisecond (#1153)
Browse files Browse the repository at this point in the history
A schema bug resulted in Cassandra not indexing more than bucket count
(10) trace ids per millisecond+search input. This manifested as less
traces retrieved by UI search or Api query than expected. For example,
if you had 1000 traces that happened on the same service in the same
millisecond, only 10 would return.

The indexes affected are `service_span_name_index`, `service_name_index`
and `annotations_index` and this was a schema-only change. Those with
existing zipkin installations should recreate these indexes to solve the
problem.

Fixes #1142
  • Loading branch information
adriancole authored Jun 29, 2016
1 parent 34b2bc9 commit 0d51d90
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 18 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<guava.version>18.0</guava.version>
<junit.version>4.12</junit.version>
<mockito.version>1.10.19</mockito.version>
<hamcrest.version>1.3</hamcrest.version>
<assertj.version>3.3.0</assertj.version>
<okhttp.version>3.3.1</okhttp.version>

Expand Down
2 changes: 1 addition & 1 deletion zipkin-guava/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
6 changes: 6 additions & 0 deletions zipkin-storage/cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public static final class Builder {
String username;
String password;
int maxTraceCols = 100000;
/**
* Used to avoid hot spots when writing indexes used to query by service name or annotation.
*
* <p>This controls the amount of buckets, or partitions writes to {@code service_name_index}
* and {@code annotations_index}. This must be the same for all query servers, and has
* historically always been 10.
*
* See https://github.com/openzipkin/zipkin/issues/623 for further explanation
*/
int bucketCount = 10;
int spanTtl = (int) TimeUnit.DAYS.toSeconds(7);
int indexTtl = (int) TimeUnit.DAYS.toSeconds(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetEncoder;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -35,6 +36,7 @@
import zipkin.storage.QueryRequest;

import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.checkArgument;
import static zipkin.internal.Util.sortedList;

final class CassandraUtil {
Expand Down Expand Up @@ -100,6 +102,10 @@ static List<String> annotationKeys(Span span) {
}

static List<String> annotationKeys(QueryRequest request) {
if (request.annotations.isEmpty() && request.binaryAnnotations.isEmpty()) {
return Collections.emptyList();
}
checkArgument(request.serviceName != null, "serviceName needed with annotation query");
Set<String> annotationKeys = new LinkedHashSet<>();
for (String a : request.annotations) {
annotationKeys.add(request.serviceName + ":" + a);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
CREATE KEYSPACE IF NOT EXISTS zipkin WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

CREATE TABLE IF NOT EXISTS zipkin.service_span_name_index (
service_span_name text,
ts timestamp,
trace_id bigint,
PRIMARY KEY (service_span_name, ts)
service_span_name text, // Endpoint.serviceName + "." + Span.name
ts timestamp, // start timestamp of the span, truncated to millisecond precision
trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely)
PRIMARY KEY (service_span_name, ts, trace_id)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'}
AND default_time_to_live = 259200;

CREATE TABLE IF NOT EXISTS zipkin.service_name_index (
service_name text,
bucket int,
ts timestamp,
trace_id bigint,
PRIMARY KEY ((service_name, bucket), ts)
service_name text, // Endpoint.serviceName
bucket int, // avoids hot spots by distributing writes across each bucket, usually 0-9
ts timestamp, // start timestamp of the span, truncated to millisecond precision
trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely)
PRIMARY KEY ((service_name, bucket), ts, trace_id)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'}
Expand All @@ -31,11 +31,11 @@ CREATE TABLE IF NOT EXISTS zipkin.span_names (
AND default_time_to_live = 259200;

CREATE TABLE IF NOT EXISTS zipkin.annotations_index (
annotation blob,
bucket int,
ts timestamp,
trace_id bigint,
PRIMARY KEY ((annotation, bucket), ts)
annotation blob, // Annotation.value or BinaryAnnotation.key
bucket int, // avoids hot spots by distributing writes across each bucket, usually 0-9
ts timestamp, // start timestamp of the span, truncated to millisecond precision
trace_id bigint, // trace ID. Included as a clustering column to avoid clashes (however unlikely)
PRIMARY KEY ((annotation, bucket), ts, trace_id)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy', 'max_window_size_seconds': '86400'}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.cassandra;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import zipkin.TraceKeys;
import zipkin.storage.QueryRequest;

import static org.assertj.core.api.Assertions.assertThat;

public class CassandraUtilTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void annotationKeys_emptyRequest() {
assertThat(CassandraUtil.annotationKeys(QueryRequest.builder().build()))
.isEmpty();
}

@Test
public void annotationKeys_serviceNameRequired() {
thrown.expect(IllegalArgumentException.class);

CassandraUtil.annotationKeys(QueryRequest.builder().addAnnotation("sr").build());
}

@Test
public void annotationKeys() {
assertThat(CassandraUtil.annotationKeys(QueryRequest.builder()
.serviceName("service")
.addAnnotation("sr")
.addBinaryAnnotation(TraceKeys.HTTP_METHOD, "GET").build()))
.containsExactly("service:http.method:GET", "service:sr");
}

@Test
public void annotationKeys_dedupes() {
assertThat(CassandraUtil.annotationKeys(QueryRequest.builder()
.serviceName("service")
.addAnnotation("sr")
.addAnnotation("sr").build()))
.containsExactly("service:sr");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,31 @@
*/
package zipkin.storage.cassandra;

import org.junit.Rule;
import org.junit.rules.ExpectedException;
import zipkin.storage.SpanStoreTest;

public class CassandraWithOriginalSchemaSpanStoreTest extends SpanStoreTest {
@Rule
public ExpectedException thrown = ExpectedException.none();

private final CassandraStorage storage;

public CassandraWithOriginalSchemaSpanStoreTest() {
this.storage = CassandraWithOriginalSchemaTestGraph.INSTANCE.storage.get();
}

/**
* The old schema PRIMARY KEY doesn't consider trace_id, so will only see bucket count traces to a
* service per millisecond.
*/
@Override public void getTraces_manyTraces() {
thrown.expect(AssertionError.class);
thrown.expectMessage("Expected size:<1000> but was:<10>");

super.getTraces_manyTraces();
}

@Override protected CassandraStorage storage() {
return storage;
}
Expand Down
2 changes: 1 addition & 1 deletion zipkin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
5 changes: 3 additions & 2 deletions zipkin/src/test/java/zipkin/TestObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ public final class TestObjects {
/** Reuse a builder as it is significantly slows tests to create 100000 of these! */
static Span.Builder spanBuilder() {
Endpoint e = Endpoint.create("service", 127 << 24 | 1, 8080);
Annotation ann = Annotation.create(System.currentTimeMillis() * 1000, SERVER_RECV, e);
return Span.builder().name("get").addAnnotation(ann);
Annotation sr = Annotation.create(System.currentTimeMillis() * 1000, SERVER_RECV, e);
Annotation ss = Annotation.create(sr.timestamp + 1000, SERVER_SEND, e);
return Span.builder().name("get").addAnnotation(sr).addAnnotation(ss);
}

/**
Expand Down
29 changes: 29 additions & 0 deletions zipkin/src/test/java/zipkin/storage/SpanStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin.storage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.GregorianCalendar;
Expand Down Expand Up @@ -278,6 +279,34 @@ public void getTraces_serviceNameInBinaryAnnotation() {
.containsExactly(asList(localTrace));
}

/**
* Formerly, a bug was present where cassandra didn't index more than bucket count traces per
* millisecond. This stores a lot of spans to ensure indexes work under high-traffic scenarios.
*/
@Test
public void getTraces_manyTraces() {
int traceCount = 1000;
Span span = TestObjects.LOTS_OF_SPANS[0];
Annotation a = span.annotations.get(0);

accept(Arrays.copyOfRange(TestObjects.LOTS_OF_SPANS, 0, traceCount));

assertThat(store().getTraces(new QueryRequest.Builder().limit(traceCount).build()))
.hasSize(traceCount);

QueryRequest.Builder builder =
QueryRequest.builder().limit(traceCount).serviceName(a.endpoint.serviceName);

assertThat(store().getTraces(builder.build()))
.hasSize(traceCount);

assertThat(store().getTraces(builder.spanName(span.name).build()))
.hasSize(traceCount);

assertThat(store().getTraces(builder.addAnnotation(a.value).build()))
.hasSize(traceCount);
}

/** Shows that duration queries go against the root span, not the child */
@Test
public void getTraces_duration() {
Expand Down

0 comments on commit 0d51d90

Please sign in to comment.