diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala index 7483d435b1c..e572446238f 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/QueryService.scala @@ -17,29 +17,35 @@ package com.twitter.zipkin.query import com.twitter.conversions.time._ +import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver} +import com.twitter.finagle.tracing.{Trace => FTrace} import com.twitter.logging.Logger -import com.twitter.ostrich.stats.Stats import com.twitter.ostrich.admin.Service -import com.twitter.finagle.tracing.{Trace => FTrace} import com.twitter.util.Future +import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} +import com.twitter.zipkin.common.TraceSummary import com.twitter.zipkin.gen import com.twitter.zipkin.query.adjusters.Adjuster import com.twitter.zipkin.storage.{Aggregates, TraceIdDuration, Index, Storage} import java.nio.ByteBuffer import org.apache.thrift.TException import scala.collection.Set -import com.twitter.zipkin.common.TraceSummary -import com.twitter.zipkin.adapter.{ThriftQueryAdapter, ThriftAdapter} +import java.util.concurrent.atomic.AtomicBoolean /** * Able to respond to users queries regarding the traces. Usually does so * by lookup the information in the index and then fetch the required trace data * from the storage. */ -class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjusterMap: Map[gen.Adjust, Adjuster]) - extends gen.ZipkinQuery.FutureIface with Service { +class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjusterMap: Map[gen.Adjust, Adjuster], + statsReceiver: StatsReceiver = NullStatsReceiver) extends gen.ZipkinQuery.FutureIface with Service { private val log = Logger.get - private var running = false + private val running = new AtomicBoolean(false) + + private val stats = statsReceiver.scope("QueryService") + private val methodStats = stats.scope("methods") + private val errorStats = stats.scope("errors") + private val timingStats = stats.scope("timing") // how to sort the trace summaries private val OrderByDurationDesc = { @@ -60,330 +66,214 @@ class QueryService(storage: Storage, index: Index, aggregates: Aggregates, adjus var traceDurationFetchBatchSize = 500 def start() { - running = true + running.set(true) } def shutdown() { - running = false + running.set(false) storage.close } def getTraceIdsBySpanName(serviceName: String, spanName: String, endTs: Long, limit: Int, order: gen.Order): Future[Seq[Long]] = { - checkIfRunning - Stats.incr("query.get_trace_ids_name") - log.debug("getTraceIdsByName. serviceName: " + serviceName + " spanName: " + spanName + - " endTs: " + endTs + " limit: " + limit + " order:" + order) - - if (serviceName == null || "".equals(serviceName)) { - Stats.incr("query.error_get_trace_ids_name_no_service") - return Future.exception(gen.QueryException("No service name provided, we need one")) - } + val method = "getTraceIdsBySpanName" + log.debug("%s. serviceName: %s spanName: %s endTs: %s limit: %s order: %s".format(method, serviceName, spanName, + endTs, limit, order)) + call(method) { + if (serviceName == null || "".equals(serviceName)) { + errorStats.counter("%s_no_service".format(method)).incr() + return Future.exception(gen.QueryException("No service name provided")) + } - // do we have a valid span name to query indexes by? - val span = convertToOption(spanName) + // do we have a valid span name to query indexes by? + val span = convertToOption(spanName) - FTrace.recordBinary("serviceName", serviceName) - FTrace.recordBinary("spanName", spanName) - FTrace.recordBinary("endTs", endTs) - FTrace.recordBinary("limit", limit) - FTrace.recordBinary("order", order) + FTrace.recordBinary("serviceName", serviceName) + FTrace.recordBinary("spanName", spanName) + FTrace.recordBinary("endTs", endTs) + FTrace.recordBinary("limit", limit) + FTrace.recordBinary("order", order) - Stats.timeFutureMillis("query.getTraceIdsByName") { - { - val traceIds = index.getTraceIdsByName(serviceName, span, endTs, limit) - sortTraceIds(traceIds, limit, order) - } rescue { - case e: Exception => { - log.error(e, "getTraceIdsByName query failed") - Stats.incr("query.error_get_trace_ids_name_exception") - Future.exception(gen.QueryException(e.toString)) - } - } + val traceIds = index.getTraceIdsByName(serviceName, span, endTs, limit) + sortTraceIds(traceIds, limit, order) } } def getTraceIdsByServiceName(serviceName: String, endTs: Long, limit: Int, order: gen.Order): Future[Seq[Long]] = { - checkIfRunning - Stats.incr("query.get_trace_ids_by_service_name") - - log.debug("getTraceIdsByServiceName. serviceName: " + serviceName + " endTs: " + - endTs + " limit: " + limit + " order:" + order) + val method = "getTraceIdsByServiceName" + log.debug("%s. serviceName: %s endTs: %s limit: %s order: %s".format(method, serviceName, endTs, limit, order)) + call(method) { + if (serviceName == null || "".equals(serviceName)) { + errorStats.counter("%s_no_service".format(method)).incr() + return Future.exception(gen.QueryException("No service name provided")) + } - if (serviceName == null || "".equals(serviceName)) { - Stats.incr("query.error_get_trace_ids_by_service_name_no_service") - return Future.exception(gen.QueryException("No service name provided, we need one")) - } + FTrace.recordBinary("serviceName", serviceName) + FTrace.recordBinary("endTs", endTs) + FTrace.recordBinary("limit", limit) + FTrace.recordBinary("order", order) - FTrace.recordBinary("serviceName", serviceName) - FTrace.recordBinary("endTs", endTs) - FTrace.recordBinary("limit", limit) - FTrace.recordBinary("order", order) - - Stats.timeFutureMillis("query.getTraceIdsByServiceName") { - { - val traceIds = index.getTraceIdsByName(serviceName, None, endTs, limit) - sortTraceIds(traceIds, limit, order) - } rescue { - case e: Exception => - log.error(e, "getTraceIdsByServiceName query failed") - Stats.incr("query.error_get_trace_ids_by_service_name_exception") - Future.exception(gen.QueryException(e.toString)) - } + val traceIds = index.getTraceIdsByName(serviceName, None, endTs, limit) + sortTraceIds(traceIds, limit, order) } } def getTraceIdsByAnnotation(serviceName: String, annotation: String, value: ByteBuffer, endTs: Long, limit: Int, order: gen.Order): Future[Seq[Long]] = { - checkIfRunning - Stats.incr("query.get_trace_ids_by_annotation") + val method = "getTraceIdsByAnnotation" + log.debug("%s. serviceName: %s annotation: %s value: %s endTs: %s limit: %s order: %s".format(method, serviceName, + annotation, value, endTs, limit, order)) + call(method) { + if (annotation == null || "".equals(annotation)) { + errorStats.counter("%s_no_annotation").incr() + return Future.exception(gen.QueryException("No annotation provided")) + } - log.debug("getTraceIdsByAnnotation. serviceName: " + serviceName + " annotation: " + annotation + " value: " + value + - " endTs: " + endTs + " limit: " + limit + " order:" + order) + // do we have a valid annotation value to query indexes by? + val valueOption = convertToOption(value) - if (annotation == null || "".equals(annotation)) { - Stats.incr("query.error_get_trace_ids_by_annotation_no_annotation") - return Future.exception(gen.QueryException("No annotation provided, we need one")) - } + FTrace.recordBinary("serviceName", serviceName) + FTrace.recordBinary("annotation", annotation) + FTrace.recordBinary("endTs", endTs) + FTrace.recordBinary("limit", limit) + FTrace.recordBinary("order", order) - // do we have a valid annotation value to query indexes by? - val valueOption = convertToOption(value) - - FTrace.recordBinary("serviceName", serviceName) - FTrace.recordBinary("annotation", annotation) - FTrace.recordBinary("endTs", endTs) - FTrace.recordBinary("limit", limit) - FTrace.recordBinary("order", order) - - Stats.timeFutureMillis("query.getTraceIdsByAnnotation") { - { - val traceIds = index.getTraceIdsByAnnotation(serviceName, annotation, valueOption, endTs, limit) - sortTraceIds(traceIds, limit, order) - } rescue { - case e: Exception => - log.error(e, "getTraceIdsByAnnotation query failed") - Stats.incr("query.error_get_trace_ids_by_annotation_exception") - Future.exception(gen.QueryException(e.toString)) - } + val traceIds = index.getTraceIdsByAnnotation(serviceName, annotation, valueOption, endTs, limit) + sortTraceIds(traceIds, limit, order) } } def getTracesByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.Trace]] = { - checkIfRunning - Stats.incr("query.get_trace_by_id") log.debug("getTracesByIds. " + traceIds + " adjust " + adjust) + call("getTracesByIds") { + val adjusters = getAdjusters(adjust) + FTrace.recordBinary("numIds", traceIds.length) - val adjusters = getAdjusters(adjust) - - FTrace.recordBinary("numIds", traceIds.length) - - Stats.timeFutureMillis("query.getTracesByIds") { storage.getTracesByIds(traceIds).map { traces => traces.map { trace => ThriftQueryAdapter(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))) } - } rescue { - case e: Exception => - log.error(e, "getTracesByIds query failed") - Stats.incr("query.error_get_trace_by_id_exception") - Future.exception(gen.QueryException(e.toString)) } } } def getTraceTimelinesByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.TraceTimeline]] = { - checkIfRunning - Stats.incr("query.get_trace_timelines_by_ids") log.debug("getTraceTimelinesByIds. " + traceIds + " adjust " + adjust) + call("getTraceTimelinesByIds") { + val adjusters = getAdjusters(adjust) + FTrace.recordBinary("numIds", traceIds.length) - val adjusters = getAdjusters(adjust) - - FTrace.recordBinary("numIds", traceIds.length) - - Stats.timeFutureMillis("query.getTraceTimelinesByIds") { storage.getTracesByIds(traceIds).map { traces => traces.flatMap { trace => TraceTimeline(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftQueryAdapter(_)) } - } rescue { - case e: Exception => - log.error(e, "getTraceTimelinesByIds query failed") - Stats.incr("query.error_get_trace_timelines_by_ids_exception") - Future.exception(gen.QueryException(e.toString)) } } } def getTraceSummariesByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.TraceSummary]] = { - checkIfRunning - Stats.incr("query.get_traces_summary_id") log.debug("getTraceSummariesByIds. traceIds: " + traceIds + " adjust " + adjust) + call("getTraceSummariesByIds") { + val adjusters = getAdjusters(adjust) + FTrace.recordBinary("numIds", traceIds.length) - val adjusters = getAdjusters(adjust) - - FTrace.recordBinary("numIds", traceIds.length) - - Stats.timeFutureMillis("query.getTraceSummariesByIds") { storage.getTracesByIds(traceIds.toList).map { traces => traces.flatMap { trace => TraceSummary(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t))).map(ThriftAdapter(_)) } - } rescue { - case e: Exception => - log.error(e, "getTraceSummariesByIds query failed") - Stats.incr("query.error_get_traces_summary_id_exception") - Future.exception(gen.QueryException(e.toString)) } } } def getTraceCombosByIds(traceIds: Seq[Long], adjust: Seq[gen.Adjust]): Future[Seq[gen.TraceCombo]] = { - checkIfRunning - Stats.incr("query.get_trace_combo_by_ids") log.debug("getTraceComboByIds. traceIds: " + traceIds + " adjust " + adjust) + call("getTraceComboByIds") { + val adjusters = getAdjusters(adjust) + FTrace.recordBinary("numIds", traceIds.length) - val adjusters = getAdjusters(adjust) - - FTrace.recordBinary("numIds", traceIds.length) - - Stats.timeFutureMillis("query.getTraceComboByIds") { storage.getTracesByIds(traceIds).map { traces => traces.map { trace => ThriftQueryAdapter(TraceCombo(adjusters.foldLeft(trace)((t, adjuster) => adjuster.adjust(t)))) } - } rescue { - case e: Exception => - log.error(e, "getTraceCombosByIds query failed") - Stats.incr("query.error_get_trace_combo_by_ids_exception") - Future.exception(gen.QueryException(e.toString)) } } } def getDataTimeToLive: Future[Int] = { - checkIfRunning - Stats.incr("query.get_data_ttl") log.debug("getDataTimeToLive") - - Stats.timeFutureMillis("query.getDataTimeToLive") { - { - Future(storage.getDataTimeToLive) - } rescue { - case e: Exception => - log.error(e, "getDataTimeToLive failed") - Stats.incr("query.error_get_data_ttl_exception") - Future.exception(gen.QueryException(e.toString)) - } + call("getDataTimeToLive") { + Future(storage.getDataTimeToLive) } } def getServiceNames: Future[Set[String]] = { - checkIfRunning - Stats.incr("query.get_services_names") log.debug("getServiceNames") - - Stats.timeFutureMillis("query.getServiceNames") { - { - index.getServiceNames - } rescue { - case e: Exception => - log.error(e, "getServiceNames query failed") - Stats.incr("query.error_get_services_names_exception") - Future.exception(gen.QueryException(e.toString)) - } + call("getServiceNames") { + index.getServiceNames } } def getSpanNames(service: String): Future[Set[String]] = { - checkIfRunning - Stats.incr("query.get_span_names") log.debug("getSpanNames") - - Stats.timeFutureMillis("query.getSpanNames") { - { - index.getSpanNames(service) - } rescue { - case e: Exception => - log.error(e, "getSpanNames query failed") - Stats.incr("query.error_get_span_names_exception") - Future.exception(gen.QueryException(e.toString)) - } + call("getSpanNames") { + index.getSpanNames(service) } } def setTraceTimeToLive(traceId: Long, ttlSeconds: Int): Future[Unit] = { - checkIfRunning - Stats.getCounter("query.set_ttl").incr() log.debug("setTimeToLive: " + traceId + " " + ttlSeconds) - - Stats.timeFutureMillis("query.setTimeToLive") { - { - storage.setTimeToLive(traceId, ttlSeconds.seconds) - } rescue { - case e: Exception => - log.error(e, "setTimeToLive failed") - Stats.getCounter("query.error_set_ttl_exception").incr() - Future.exception(gen.QueryException(e.toString)) - } + call("setTraceTimeToLive") { + storage.setTimeToLive(traceId, ttlSeconds.seconds) } } def getTraceTimeToLive(traceId: Long): Future[Int] = { - checkIfRunning - Stats.getCounter("query.get_ttl").incr() log.debug("getTimeToLive: " + traceId) - - Stats.timeFutureMillis("query.getTimeToLive") { - { - storage.getTimeToLive(traceId).map(_.inSeconds) - } rescue { - case e: Exception => - log.error(e, "getTimeToLive failed") - Stats.getCounter("query.error_get_ttl_exception").incr() - Future.exception(gen.QueryException(e.toString)) - } + call("getTraceTimeToLive") { + storage.getTimeToLive(traceId).map(_.inSeconds) } } def getTopAnnotations(serviceName: String): Future[Seq[String]] = { - checkIfRunning() - Stats.getCounter("query.get_top_annotations").incr() log.debug("getTopAnnotations: " + serviceName) - - Stats.timeFutureMillis("query.getTopAnnotations") { - aggregates.getTopAnnotations(serviceName) onFailure { e => - log.error(e, "getTopAnnotations failed") - Stats.getCounter("query.error_get_top_annotations").incr() - Future.exception(gen.QueryException(e.toString)) - } + call("getTopAnnotations") { + aggregates.getTopAnnotations(serviceName) } } def getTopKeyValueAnnotations(serviceName: String): Future[Seq[String]] = { - checkIfRunning() - Stats.getCounter("query.get_top_key_value_annotations").incr() log.debug("getTopKeyValueAnnotations: " + serviceName) - - Stats.timeFutureMillis("query.getTopKeyValueAnnotations") { - aggregates.getTopKeyValueAnnotations(serviceName) onFailure { e => - log.error(e, "getTopKeyValueAnnotations failed") - Stats.getCounter("query.error_get_top_key_value_annotations").incr() - Future.exception(gen.QueryException(e.toString)) - } + call("getTopKeyValueAnnotations") { + aggregates.getTopKeyValueAnnotations(serviceName) } } private def checkIfRunning() = { - if (!running) { + if (!running.get) { log.warning("Server not running, throwing exception") throw new TException("Server not running") } } + private[this] def call[T](name: String)(f: => Future[T]): Future[T] = { + checkIfRunning() + methodStats.counter(name).incr() + + timingStats.timeFuture(name) { + f rescue { + case e: Exception => { + log.error(e, "%s failed".format(name)) + errorStats.counter(name).incr() + Future.exception(gen.QueryException(e.toString)) + } + } + } + } + /** * Convert incoming Thrift order by enum into sort function. */ diff --git a/zipkin-server/src/main/scala/com/twitter/zipkin/query/ZipkinQuery.scala b/zipkin-server/src/main/scala/com/twitter/zipkin/query/ZipkinQuery.scala index 6cb8e4c7688..c900527c400 100644 --- a/zipkin-server/src/main/scala/com/twitter/zipkin/query/ZipkinQuery.scala +++ b/zipkin-server/src/main/scala/com/twitter/zipkin/query/ZipkinQuery.scala @@ -41,7 +41,7 @@ class ZipkinQuery( log.info("Starting query thrift service on addr " + serverAddr) val cluster = new ZookeeperServerSetCluster(serverSet) - val queryService = new QueryService(storage, index, aggregates, config.adjusterMap) + val queryService = new QueryService(storage, index, aggregates, config.adjusterMap, config.statsReceiver) queryService.start() ServiceTracker.register(queryService)