From 02fbd246cb9907f5cc689f51da082fcad658a9d2 Mon Sep 17 00:00:00 2001 From: Johan Oskarsson Date: Wed, 13 Jun 2012 12:46:51 -0700 Subject: [PATCH] Reduce the garbage generated in the collector. Before we got about 400kb of garbage per sampled request, after it's at about 300kb. Includes these changes * don't recalculate the lowercase version of service names in span each time they are used * don't create the incoming data log message unless debug is turned on * reuse some data instead of recalculating when indexing in Cassandra Author: johanoskarsson Pull Request: #25 URL: https://github.com/twitter/zipkin/pull/25 --- .../com/twitter/zipkin/common/Span.scala | 7 +++--- .../com/twitter/zipkin/common/SpanSpec.scala | 10 ++++++-- .../zipkin/collector/WriteQueueWorker.scala | 2 +- .../sampler/EverythingGlobalSampler.scala | 24 +++++++++++++++++++ .../storage/cassandra/CassandraIndex.scala | 10 ++++---- 5 files changed, 42 insertions(+), 11 deletions(-) create mode 100644 zipkin-server/src/main/scala/com/twitter/zipkin/collector/sampler/EverythingGlobalSampler.scala diff --git a/zipkin-common/src/main/scala/com/twitter/zipkin/common/Span.scala b/zipkin-common/src/main/scala/com/twitter/zipkin/common/Span.scala index 005920ff200..9e694cfb52b 100644 --- a/zipkin-common/src/main/scala/com/twitter/zipkin/common/Span.scala +++ b/zipkin-common/src/main/scala/com/twitter/zipkin/common/Span.scala @@ -80,9 +80,10 @@ case class Span(traceId: Long, name: String, id: Long, parentId: Option[Long], binaryAnnotations) } - def serviceNames: Set[String] = { - annotations.flatMap(a => a.host.map(h => h.serviceName.toLowerCase)).toSet - } + /** + * @return the unique set of service names as lower case + */ + lazy val serviceNames: Set[String] = annotations.flatMap(a => a.host.map(h => h.serviceName.toLowerCase)).toSet /** * Tries to extract the best possible service name diff --git a/zipkin-common/src/test/scala/com/twitter/zipkin/common/SpanSpec.scala b/zipkin-common/src/test/scala/com/twitter/zipkin/common/SpanSpec.scala index 98e09eb8429..95668f9514a 100644 --- a/zipkin-common/src/test/scala/com/twitter/zipkin/common/SpanSpec.scala +++ b/zipkin-common/src/test/scala/com/twitter/zipkin/common/SpanSpec.scala @@ -27,7 +27,7 @@ class SpanSpec extends Specification { List(expectedAnnotation), Nil) val annotation1 = Annotation(1, "value1", Some(Endpoint(1, 2, "service"))) - val annotation2 = Annotation(2, "value2", Some(Endpoint(3, 4, "service"))) + val annotation2 = Annotation(2, "value2", Some(Endpoint(3, 4, "Service"))) // upper case service name val annotation3 = Annotation(3, "value3", Some(Endpoint(5, 6, "service"))) val spanWith3Annotations = Span(12345, "methodcall", 666, None, @@ -52,7 +52,13 @@ class SpanSpec extends Specification { Span.fromThrift(noBinaryAnnotationsSpan) mustEqual Span(0, "name", 0, None, List(), Seq()) } - "getAnnotationsAsMap" in { + "serviceNames is lowercase" in { + val names = spanWith3Annotations.serviceNames + names.size mustBe 1 + names(0) mustBe "service" + } + + "serviceNames" in { val map = expectedSpan.getAnnotationsAsMap val actualAnnotation = map.get(annotationValue).get expectedAnnotation mustEqual actualAnnotation diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala index d302c8e9a17..dce63950512 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/WriteQueueWorker.scala @@ -45,7 +45,7 @@ class WriteQueueWorker(queue: BlockingQueue[List[String]], def processScribeMessage(msg: String) { try { val span = Stats.time("deserializeSpan") { deserializer.fromString(msg) } - log.debug("Processing span: " + span + " from " + msg) + log.ifDebug("Processing span: " + span + " from " + msg) processSpan(Span.fromThrift(span)) } catch { case e: Exception => { diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/collector/sampler/EverythingGlobalSampler.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/sampler/EverythingGlobalSampler.scala new file mode 100644 index 00000000000..aed8b0a60a1 --- /dev/null +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/collector/sampler/EverythingGlobalSampler.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2012 Twitter Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.twitter.zipkin.collector.sampler + +/** + * Let through everything. + */ +object EverythingGlobalSampler extends GlobalSampler { + override def apply(traceId: Long): Boolean = true +} diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala index 1e1b373b393..5fc62312b00 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/storage/cassandra/CassandraIndex.scala @@ -220,14 +220,13 @@ trait CassandraIndex extends Index with Cassandra { val futures = serviceNames.map(serviceName => { WRITE_REQUEST_COUNTER.incr() - val serviceSpanIndexKey = serviceName.toLowerCase + "." + span.name.toLowerCase + val serviceSpanIndexKey = serviceName + "." + span.name.toLowerCase val serviceSpanIndexCol = Column[Long, Long](timestamp, span.traceId).ttl(config.tracesTimeToLive) val serviceSpanNameFuture = serviceSpanNameIndex.insert(serviceSpanIndexKey, serviceSpanIndexCol) WRITE_REQUEST_COUNTER.incr() - val serviceIndexKey = serviceName.toLowerCase val serviceIndexCol = Column[Long, Long](timestamp, span.traceId).ttl(config.tracesTimeToLive) - val serviceNameFuture = serviceNameIndex.insert(serviceIndexKey, serviceIndexCol) + val serviceNameFuture = serviceNameIndex.insert(serviceName, serviceIndexCol) List(serviceSpanNameFuture, serviceNameFuture) }).toList.flatten Future.join(futures) @@ -264,9 +263,10 @@ trait CassandraIndex extends Index with Cassandra { ba.host match { case Some(endpoint) => { WRITE_REQUEST_COUNTER.incr(2) + val key = encode(endpoint.serviceName, ba.key).getBytes val col = Column[Long, Long](timestamp, span.traceId).ttl(config.tracesTimeToLive) - batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName, ba.key).getBytes ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) - batch.insert(ByteBuffer.wrap(encode(endpoint.serviceName, ba.key).getBytes), col) + batch.insert(ByteBuffer.wrap(key ++ INDEX_DELIMITER.getBytes ++ Util.getArrayFromBuffer(ba.value)), col) + batch.insert(ByteBuffer.wrap(key), col) } case None => }