Skip to content

Commit

Permalink
MySQL performance and connectivity improvements
Browse files Browse the repository at this point in the history
We have identified several improvements for Zipkin deployments that using a MySQL database for storage,

* Automatic reconnect for dropped connections so that the collector and query services do not need to be restarted
* Automatic retry for transient/recoverable exceptions so that web app users and clients sending traces to the collector service are not affected if seamless recovery is possible
* Proper back-pressure for maintaining stability under heavy load conditions instead of unbounded resource (Java heap, native threads) growth
* Database connection pooling for significantly improved collector service performance under heavy load conditions
* Prevent constraint violation errors if duplicate traces are sent
* Document recommended indexes for optimal performance for MySQL deployments
* Query service performance improved significantly for getServiceNames, getSpanNames, getTraceIdsByName, and getTraceIdsByAnnotation - includes both updated SQL and table schema additions
* Add timers for all SQL executions that can be viewed using the admin API

Signed-off-by: Adrian Cole <[email protected]>

RB_ID=710632
TBR=true
  • Loading branch information
Andrew Olson authored and Adrian Cole committed Jul 4, 2015
1 parent 6f478db commit 2f5f5f5
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 89 deletions.
3 changes: 2 additions & 1 deletion project/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ object Zipkin extends Build {
).settings(
libraryDependencies ++= Seq(
"com.typesafe.play" %% "anorm" % "2.3.7",
"org.apache.commons" % "commons-dbcp2" % "2.1",
anormDriverDependencies("sqlite-persistent")
) ++ scalaTestDeps,
) ++ testDependencies ++ scalaTestDeps,

/* Add configs to resource path for ConfigSpec */
unmanagedResourceDirectories in Test <<= baseDirectory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,20 @@ import AnormThreads.inNewThread
* The top annotations methods are stubbed because they're not currently
* used anywhere; that feature was never completed.
*/
case class AnormAggregates(db: DB, openCon: Option[Connection] = None) extends Aggregates {
// Database connection object
private implicit val conn = openCon match {
case None => db.getConnection()
case Some(con) => con
}

/**
* Close the index
*/
def close() { conn.close() }
case class AnormAggregates(db: DB, openCon: Option[Connection] = None) extends Aggregates with DBPool {

/**
* Get the dependencies in a time range.
*
* endDate is optional and if not passed defaults to startDate plus one day.
*/
def getDependencies(startDate: Option[Time], endDate: Option[Time]=None): Future[Dependencies] = inNewThread {
def getDependencies(startDate: Option[Time], endDate: Option[Time]=None): Future[Dependencies] = db.inNewThreadWithRecoverableRetry {
val startMs = startDate.getOrElse(Time.now - 1.day).inMicroseconds
val endMs = endDate.getOrElse(Time.now).inMicroseconds

implicit val (conn, borrowTime) = borrowConn()
try {

val links: List[DependencyLink] = SQL(
"""SELECT parent, child, m0, m1, m2, m3, m4
|FROM zipkin_dependency_links AS l
Expand All @@ -73,6 +66,10 @@ case class AnormAggregates(db: DB, openCon: Option[Connection] = None) extends A
}) *)

new Dependencies(Time.fromMicroseconds(startMs), Time.fromMicroseconds(endMs), links)

} finally {
returnConn(conn, borrowTime, "getDependencies")
}
}

/**
Expand All @@ -81,7 +78,10 @@ case class AnormAggregates(db: DB, openCon: Option[Connection] = None) extends A
* Synchronize these so we don't do concurrent writes from the same box
*/
def storeDependencies(dependencies: Dependencies): Future[Unit] = inNewThread {
db.withTransaction(conn, { implicit conn: Connection =>
implicit val (conn, borrowTime) = borrowConn()
try {

db.withRecoverableTransaction(conn, { implicit conn: Connection =>
val dlid = SQL("""INSERT INTO zipkin_dependencies
| (start_ts, end_ts)
|VALUES ({startTs}, {endTs})
Expand All @@ -106,6 +106,10 @@ case class AnormAggregates(db: DB, openCon: Option[Connection] = None) extends A
.execute()
}
})

} finally {
returnConn(conn, borrowTime, "storeDependencies")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,47 @@ import com.twitter.zipkin.storage.anormdb.DB.byteArrayToStatement
* The index methods are stubs since SQL databases don't use NoSQL-style
* indexing.
*/
case class AnormIndex(db: DB, openCon: Option[Connection] = None) extends Index {
// Database connection object
private implicit val conn = openCon match {
case None => db.getConnection()
case Some(con) => con
}

/**
* Close the index
*/
def close() { conn.close() }
case class AnormIndex(db: DB, openCon: Option[Connection] = None) extends Index with DBPool {

/**
* Get the trace ids for this particular service and if provided, span name.
* Only return maximum of limit trace ids from before the endTs.
*/
def getTraceIdsByName(serviceName: String, spanName: Option[String],
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = inNewThread {
val result:List[(Long, Long)] = SQL(
"""SELECT trace_id, MAX(a_timestamp)
|FROM zipkin_annotations
|WHERE service_name = {service_name}
| AND (span_name = {span_name} OR {span_name} = '')
| AND a_timestamp < {end_ts}
|GROUP BY trace_id
|ORDER BY a_timestamp DESC
|LIMIT {limit}
""".stripMargin)
.on("service_name" -> serviceName)
.on("span_name" -> (if (spanName.isEmpty) "" else spanName.get))
.on("end_ts" -> endTs)
.on("limit" -> limit)
.as((long("trace_id") ~ long("MAX(a_timestamp)") map flatten) *)
result map { case (tId, ts) =>
IndexedTraceId(traceId = tId, timestamp = ts)
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = db.inNewThreadWithRecoverableRetry {

if (endTs <= 0 || limit <= 0) {
Seq.empty
}
else {
implicit val (conn, borrowTime) = borrowConn()
try {
val result:List[(Long, Long)] = SQL(
"""SELECT t1.trace_id, MAX(a_timestamp)
|FROM zipkin_annotations t1
|INNER JOIN (
| SELECT DISTINCT trace_id
| FROM zipkin_annotations
| WHERE service_name = {service_name}
| AND (span_name = {span_name} OR {span_name} = '')
| AND a_timestamp < {end_ts}
| ORDER BY a_timestamp DESC
| LIMIT {limit})
|AS t2 ON t1.trace_id = t2.trace_id
|GROUP BY t1.trace_id
|ORDER BY t1.a_timestamp DESC
""".stripMargin)
.on("service_name" -> serviceName)
.on("span_name" -> (if (spanName.isEmpty) "" else spanName.get))
.on("end_ts" -> endTs)
.on("limit" -> limit)
.as((long("trace_id") ~ long("MAX(a_timestamp)") map flatten) *)
result map { case (tId, ts) =>
IndexedTraceId(traceId = tId, timestamp = ts)
}
} finally {
returnConn(conn, borrowTime, "getTraceIdsByName")
}
}
}

Expand All @@ -85,27 +91,33 @@ case class AnormIndex(db: DB, openCon: Option[Connection] = None) extends Index
* Only return maximum of limit trace ids from before the endTs.
*/
def getTraceIdsByAnnotation(serviceName: String, annotation: String, value: Option[ByteBuffer],
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = inNewThread {
if ((Constants.CoreAnnotations ++ Constants.CoreAddress).contains(annotation)) {
endTs: Long, limit: Int): Future[Seq[IndexedTraceId]] = db.inNewThreadWithRecoverableRetry {
if ((Constants.CoreAnnotations ++ Constants.CoreAddress).contains(annotation) || endTs <= 0 || limit <= 0) {
Seq.empty
}
else {
implicit val (conn, borrowTime) = borrowConn()
try {

val result:List[(Long, Long)] = value match {
// Binary annotations
case Some(bytes) => {
SQL(
"""SELECT zba.trace_id, s.created_ts
|FROM zipkin_binary_annotations AS zba
|LEFT JOIN zipkin_spans AS s
| ON zba.trace_id = s.trace_id
|WHERE zba.service_name = {service_name}
| AND zba.annotation_key = {annotation}
| AND zba.annotation_value = {value}
| AND s.created_ts < {end_ts}
| AND s.created_ts IS NOT NULL
|GROUP BY zba.trace_id
|ORDER BY s.created_ts DESC
|LIMIT {limit}
"""SELECT t1.trace_id, t1.created_ts
|FROM zipkin_spans t1
|INNER JOIN (
| SELECT DISTINCT trace_id
| FROM zipkin_binary_annotations
| WHERE service_name = {service_name}
| AND annotation_key = {annotation}
| AND annotation_value = {value}
| AND annotation_ts < {end_ts}
| AND annotation_ts IS NOT NULL
| ORDER BY annotation_ts DESC
| LIMIT {limit})
|AS t2 ON t1.trace_id = t2.trace_id
|GROUP BY t1.trace_id
|ORDER BY t1.created_ts DESC
""".stripMargin)
.on("service_name" -> serviceName)
.on("annotation" -> annotation)
Expand Down Expand Up @@ -136,6 +148,10 @@ case class AnormIndex(db: DB, openCon: Option[Connection] = None) extends Index
result map { case (tId, ts) =>
IndexedTraceId(traceId = tId, timestamp = ts)
}

} finally {
returnConn(conn, borrowTime, "getTraceIdsByAnnotation")
}
}
}

Expand All @@ -144,7 +160,10 @@ case class AnormIndex(db: DB, openCon: Option[Connection] = None) extends Index
*
* Duration returned in microseconds.
*/
def getTracesDuration(traceIds: Seq[Long]): Future[Seq[TraceIdDuration]] = inNewThread {
def getTracesDuration(traceIds: Seq[Long]): Future[Seq[TraceIdDuration]] = db.inNewThreadWithRecoverableRetry {
implicit val (conn, borrowTime) = borrowConn()
try {

val result:List[(Long, Option[Long], Long)] = SQL(
"""SELECT trace_id, duration, created_ts
|FROM zipkin_spans
Expand All @@ -156,35 +175,53 @@ case class AnormIndex(db: DB, openCon: Option[Connection] = None) extends Index
// trace ID, duration, start TS
TraceIdDuration(traceId, duration.getOrElse(0), startTs)
}

} finally {
returnConn(conn, borrowTime, "getTracesDuration")
}
}

/**
* Get all the service names.
*/
def getServiceNames: Future[Set[String]] = inNewThread {
def getServiceNames: Future[Set[String]] = db.inNewThreadWithRecoverableRetry {
implicit val (conn, borrowTime) = borrowConn()
try {

SQL(
"""SELECT service_name
|FROM zipkin_annotations
|FROM zipkin_service_spans
|GROUP BY service_name
|ORDER BY service_name ASC
""".stripMargin)
.as(str("service_name") *).toSet

} finally {
returnConn(conn, borrowTime, "getServiceNames")
}
}

/**
* Get all the span names for a particular service.
*/
def getSpanNames(service: String): Future[Set[String]] = inNewThread {
def getSpanNames(service: String): Future[Set[String]] = db.inNewThreadWithRecoverableRetry {
implicit val (conn, borrowTime) = borrowConn()
try {

SQL(
"""SELECT span_name
|FROM zipkin_annotations
|FROM zipkin_service_spans
|WHERE service_name = {service} AND span_name <> ''
|GROUP BY span_name
|ORDER BY span_name ASC
""".stripMargin)
.on("service" -> service)
.as(str("span_name") *)
.toSet

} finally {
returnConn(conn, borrowTime, "getSpanNames")
}
}

/**
Expand Down
Loading

0 comments on commit 2f5f5f5

Please sign in to comment.