Skip to content

Commit

Permalink
Use span name table instead of materialized view (#1392)
Browse files Browse the repository at this point in the history
- This switches span name and service name lookups to use a table instead of a materialized view.  This change will fix #1360
- Change limit logic to limit on a set of trace IDs instead of limiting on the provided collection first. In pracitce this didn't make a noticiable difference in the results but it seems like the intended logic.
- update stress profiles for new span_name_by_service tables

references:
 - #1392
 - #1360
 - #1374
  • Loading branch information
michaelsembwever authored and adriancole committed Nov 6, 2016
1 parent 8f56d23 commit 194fff8
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public Session session() {
for (String cf : ImmutableList.of(
Schema.TABLE_TRACES,
Schema.TABLE_TRACE_BY_SERVICE_SPAN,
Schema.TABLE_SERVICE_SPANS,
Schema.TABLE_DEPENDENCIES
)) {
futures.add(session.get().executeAsync(format("TRUNCATE %s", cf)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,21 @@
import zipkin.storage.cassandra3.Schema.BinaryAnnotationUDT;
import zipkin.storage.guava.GuavaSpanConsumer;

import static com.google.common.util.concurrent.Futures.transform;
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.storage.cassandra3.CassandraUtil.bindWithName;
import static zipkin.storage.cassandra3.CassandraUtil.durationIndexBucket;
import static com.google.common.util.concurrent.Futures.transform;

final class CassandraSpanConsumer implements GuavaSpanConsumer {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanConsumer.class);
private static final Function<Object, Void> TO_VOID = Functions.<Void>constant(null);

private static final long WRITTEN_NAMES_TTL
= Long.getLong("zipkin.store.cassandra3.internal.writtenNamesTtl", 60 * 60 * 1000);

private final Session session;
private final PreparedStatement insertSpan;
private final PreparedStatement insertTraceServiceSpanName;
private final PreparedStatement insertServiceSpanName;
private final Schema.Metadata metadata;

Expand All @@ -70,7 +74,7 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("binary_annotations", QueryBuilder.bindMarker("binary_annotations"))
.value("all_annotations", QueryBuilder.bindMarker("all_annotations")));

insertServiceSpanName = session.prepare(
insertTraceServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_TRACE_BY_SERVICE_SPAN)
.value("service_name", QueryBuilder.bindMarker("service_name"))
Expand All @@ -79,6 +83,12 @@ final class CassandraSpanConsumer implements GuavaSpanConsumer {
.value("ts", QueryBuilder.bindMarker("ts"))
.value("trace_id", QueryBuilder.bindMarker("trace_id"))
.value("duration", QueryBuilder.bindMarker("duration")));

insertServiceSpanName = session.prepare(
QueryBuilder
.insertInto(Schema.TABLE_SERVICE_SPANS)
.value("service_name", QueryBuilder.bindMarker("service_name"))
.value("span_name", QueryBuilder.bindMarker("span_name")));
}

/**
Expand All @@ -98,13 +108,14 @@ public ListenableFuture<Void> accept(List<Span> rawSpans) {
for (String serviceName : span.serviceNames()) {
// QueryRequest.min/maxDuration
if (timestamp != null) {
// Contract for Repository.storeServiceSpanName is to store the span twice, once with
// Contract for Repository.storeTraceServiceSpanName is to store the span twice, once with
// the span name and another with empty string.
futures.add(storeServiceSpanName(serviceName, span.name, timestamp, span.duration,
futures.add(storeTraceServiceSpanName(serviceName, span.name, timestamp, span.duration,
traceId));
if (!span.name.isEmpty()) { // If span.name == "", this would be redundant
futures.add(storeServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
futures.add(storeTraceServiceSpanName(serviceName, "", timestamp, span.duration, traceId));
}
futures.add(storeServiceSpanName(serviceName, span.name));
}
}
}
Expand Down Expand Up @@ -162,32 +173,50 @@ ListenableFuture<?> storeSpan(Span span, BigInteger traceId, Long timestamp) {
}
}

ListenableFuture<?> storeServiceSpanName(
ListenableFuture<?> storeTraceServiceSpanName(
String serviceName,
String spanName,
long timestamp_micro,
Long duration,
BigInteger traceId) {

int bucket = durationIndexBucket(timestamp_micro);
UUID ts = new UUID(
UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits());
try {
BoundStatement bound =
bindWithName(insertServiceSpanName, "insert-service-span-name")
bindWithName(insertTraceServiceSpanName, "insert-trace-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName)
.setInt("bucket", bucket)
.setUUID("ts", new UUID(
UUIDs.startOf(timestamp_micro / 1000).getMostSignificantBits(),
UUIDs.random().getLeastSignificantBits()))
.setUUID("ts", ts)
.setVarint("trace_id", traceId);

if (null != duration) {
bound = bound.setLong("duration", duration);
}

return session.executeAsync(bound);

} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

ListenableFuture<?> storeServiceSpanName(
String serviceName,
String spanName
) {
try {
BoundStatement bound = bindWithName(insertServiceSpanName, "insert-service-span-name")
.setString("service_name", serviceName)
.setString("span_name", spanName);

return session.executeAsync(bound);
} catch (RuntimeException ex) {
return Futures.immediateFailedFuture(ex);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Function;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
Expand Down Expand Up @@ -67,6 +67,7 @@
import static zipkin.internal.Util.getDays;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACES;
import static zipkin.storage.cassandra3.Schema.TABLE_TRACE_BY_SERVICE_SPAN;
import static zipkin.storage.cassandra3.Schema.TABLE_SERVICE_SPANS;

final class CassandraSpanStore implements GuavaSpanStore {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSpanStore.class);
Expand Down Expand Up @@ -116,11 +117,11 @@ public int compare(List<Span> left, List<Span> right) {
selectServiceNames = session.prepare(
QueryBuilder.select("service_name")
.distinct()
.from(Schema.VIEW_TRACE_BY_SERVICE));
.from(TABLE_SERVICE_SPANS));

selectSpanNames = session.prepare(
QueryBuilder.select("span_name")
.from(Schema.VIEW_TRACE_BY_SERVICE)
.from(TABLE_SERVICE_SPANS)
.where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name")))
.limit(QueryBuilder.bindMarker("limit_")));

Expand Down Expand Up @@ -223,8 +224,9 @@ public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest request)
}
return transform(traceIds, new AsyncFunction<Collection<BigInteger>, List<List<Span>>>() {
@Override public ListenableFuture<List<List<Span>>> apply(Collection<BigInteger> traceIds) {
traceIds = FluentIterable.from(traceIds).limit(request.limit).toSet();
return transform(getSpansByTraceIds(ImmutableSet.copyOf(traceIds), maxTraceCols), AdjustTraces.INSTANCE);
ImmutableSet<BigInteger> set = ImmutableSet.copyOf(traceIds);
set = ImmutableSet.copyOf(Iterators.limit(set.iterator(), request.limit));
return transform(getSpansByTraceIds(set, maxTraceCols), AdjustTraces.INSTANCE);
}

@Override public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ final class Schema {

static final String TABLE_TRACES = "traces";
static final String TABLE_TRACE_BY_SERVICE_SPAN = "trace_by_service_span";
static final String TABLE_SERVICE_SPANS = "span_name_by_service";
static final String TABLE_DEPENDENCIES = "dependencies";
static final String VIEW_TRACE_BY_SERVICE = "trace_by_service";

static final String DEFAULT_KEYSPACE = "zipkin3";
private static final String SCHEMA_RESOURCE = "/cassandra3-schema.cql";
Expand Down
37 changes: 19 additions & 18 deletions zipkin-storage/cassandra3/src/main/resources/cassandra3-schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,44 @@ CREATE TABLE IF NOT EXISTS zipkin3.traces (
duration bigint,
annotations list<frozen<annotation>>,
binary_annotations list<frozen<binary_annotation>>,
all_annotations text, // can't do SASI on set<text>: comma-joined until CASSANDRA-11182
all_annotations text, //-- can't do SASI on set<text>: comma-joined until CASSANDRA-11182
PRIMARY KEY (trace_id, ts_uuid, id)
)
WITH CLUSTERING ORDER BY (ts_uuid DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
AND default_time_to_live = 604800;


CREATE TABLE zipkin3.trace_by_service_span (
service_name text, // service name
span_name text, // span name, or blank for queries without span name
bucket int, // time bucket, calculated as ts/interval (in microseconds), for some pre-configured interval like 1 day.
ts timeuuid, // start timestamp of the span, truncated to millisecond precision
trace_id varint, // trace ID
duration bigint, // span duration, in microseconds
CREATE TABLE IF NOT EXISTS zipkin3.trace_by_service_span (
service_name text, //-- service name
span_name text, //-- span name, or blank for queries without span name
bucket int, //-- time bucket, calculated as ts/interval (in microseconds), for some pre-configured interval like 1 day.
ts timeuuid, //-- start timestamp of the span, truncated to millisecond precision
trace_id varint, //-- trace ID
duration bigint, //-- span duration, in microseconds
PRIMARY KEY ((service_name, span_name, bucket), ts)
)
WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'}
AND default_time_to_live = 259200;

CREATE TABLE IF NOT EXISTS zipkin3.span_name_by_service (
service_name text,
span_name text,
PRIMARY KEY (service_name, span_name)
)
WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
AND default_time_to_live = 259200;

CREATE MATERIALIZED VIEW IF NOT EXISTS zipkin3.trace_by_service
AS SELECT service_name, span_name, bucket, ts, trace_id FROM trace_by_service_span
WHERE service_name is not null AND span_name is not null AND bucket is not null AND ts is not null
PRIMARY KEY (service_name, bucket, span_name, ts)
WITH CLUSTERING ORDER BY (bucket DESC, ts DESC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
AND default_time_to_live = 259200;
CREATE CUSTOM INDEX ON zipkin3.traces (all_annotations) USING 'org.apache.cassandra.index.sasi.SASIIndex'
CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin3.traces (all_annotations) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {
'mode': 'CONTAINS',
'analyzed': 'true',
'analyzer_class':'org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer',
'case_sensitive': 'false'
};

CREATE CUSTOM INDEX ON zipkin3.trace_by_service_span (duration) USING 'org.apache.cassandra.index.sasi.SASIIndex'
CREATE CUSTOM INDEX IF NOT EXISTS ON zipkin3.trace_by_service_span (duration) USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {'mode': 'PREFIX'};


Expand All @@ -80,3 +79,5 @@ CREATE TABLE IF NOT EXISTS zipkin3.dependencies (
WITH compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'unchecked_tombstone_compaction': 'true', 'tombstone_threshold': '0.2'}
AND default_time_to_live = 259200;

//-- Empty span name list in UI with Cassandra3 storage component – https://github.com/openzipkin/zipkin/issues/1360
DROP MATERIALIZED VIEW IF EXISTS zipkin3.trace_by_service;
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2015-2016 The OpenZipkin Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
#
###
### Stress test for zipkin3 service_span_name_index table
###
### Stress testing is done using the `cassandra-stress` tool
###
### For example
### cassandra-stress user profile=span_name_by_service-stress.yaml ops\(insert=1\) duration=1m -rate threads=4 throttle=50/s
###
### after a benchmark has been run with only writes, a mixed read-write benchmark can be run with
### cassandra-stress user profile=span_name_by_service-stress.yaml ops\(insert=1,select=1,select_span_names=1\) duration=1m -rate threads=4 throttle=50/s

# Keyspace Name
keyspace: zipkin3

# Table name
table: span_name_by_service


### Column Distribution Specifications ###

columnspec:
- name: service_name
size: uniform(5..20)
population: uniform(1..100)

- name: span_name
size: uniform(5..20)
population: uniform(1..100)


### Batch Ratio Distribution Specifications ###

insert:
partitions: fixed(1) # 1 pertition key at a time inserts to model a message being generated
select: fixed(1)/1000
batchtype: UNLOGGED # Unlogged batches


#
# A set of basic queries
#
queries:
select:
cql: SELECT DISTINCT service_name FROM span_name_by_service
fields: samerow
select_span_names:
cql: SELECT span_name FROM span_name_by_service WHERE service_name = ? LIMIT 1000
fields: samerow

0 comments on commit 194fff8

Please sign in to comment.