Skip to content

Commit

Permalink
Merge pull request #95 from openzipkin/kafka
Browse files Browse the repository at this point in the history
Adds Kafka Transport
  • Loading branch information
adriancole committed Mar 15, 2016
2 parents ca0b940 + be01b97 commit 524f115
Show file tree
Hide file tree
Showing 15 changed files with 606 additions and 16 deletions.
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<module>benchmarks</module>
<module>interop</module>
<module>zipkin-spanstores</module>
<module>zipkin-transports</module>
<module>zipkin-server</module>
</modules>

Expand All @@ -41,7 +42,7 @@

<moshi.version>1.1.0</moshi.version>
<okio.version>1.6.0</okio.version>
<spring-boot.version>1.3.2.RELEASE</spring-boot.version>
<spring-boot.version>1.3.3.RELEASE</spring-boot.version>
<zipkin-scala.version>1.35.0</zipkin-scala.version>
<!-- MySQL connector is GPL, even if it has an OSS exception.
https://www.mysql.com/about/legal/licensing/foss-exception/
Expand Down Expand Up @@ -145,6 +146,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>transport-kafka</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-server</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ Example usage:
$ STORAGE_TYPE=mysql MYSQL_USER=root ./mvnw -pl zipkin-server spring-boot:run
```

### Kafka
The following apply when `KAFKA_ZOOKEEPER` is set:

* `KAFKA_ZOOKEEPER`: ZooKeeper host string, comma-separated host:port value. no default.
* `KAFKA_TOPIC`: Defaults to zipkin
* `KAFKA_GROUP_ID`: Consumer group this process is consuming on behalf of. Defaults to zipkin
* `KAFKA_STREAMS`: Count of consumer threads consuming the topic. defaults to 1.

Example usage:

```bash
$ TRANSPORT_TYPE=kafka KAFKA_ZOOKEEPER=127.0.0.1:2181 ./mvnw -pl zipkin-server spring-boot:run
```

## Running with Docker
Released versions of zipkin-server are published to Docker Hub as `openzipkin/zipkin-java`.
See [docker-zipkin-java](https://github.com/openzipkin/docker-zipkin-java) for details.
37 changes: 22 additions & 15 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,6 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- Trace api controller activity with Brave -->
<dependency>
<groupId>com.github.kristofa</groupId>
<artifactId>brave-spring-web-servlet-interceptor</artifactId>
<version>${brave.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.github.kristofa</groupId>
<artifactId>brave-spancollector-scribe</artifactId>
<version>${brave.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
Expand Down Expand Up @@ -108,6 +93,28 @@
<optional>true</optional>
</dependency>

<!-- Kafka transport -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>transport-kafka</artifactId>
<optional>true</optional>
</dependency>

<!-- Trace api controller activity with Brave -->
<dependency>
<groupId>com.github.kristofa</groupId>
<artifactId>brave-spring-web-servlet-interceptor</artifactId>
<version>${brave.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.github.kristofa</groupId>
<artifactId>brave-spancollector-scribe</artifactId>
<version>${brave.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.server;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("kafka")
class ZipkinKafkaProperties {
private String topic = "zipkin";
private String zookeeper;
private String groupId = "zipkin";
private int streams = 1;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getZookeeper() {
return zookeeper;
}

public void setZookeeper(String zookeeper) {
this.zookeeper = "".equals(zookeeper) ? null : zookeeper;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

public int getStreams() {
return streams;
}

public void setStreams(int streams) {
this.streams = streams;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@
package zipkin.server;

import com.github.kristofa.brave.Brave;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.sql.DataSource;
import org.jooq.ExecuteListenerProvider;
import org.jooq.conf.Settings;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.scheduling.annotation.EnableAsync;
import zipkin.Codec;
import zipkin.InMemorySpanStore;
Expand All @@ -34,6 +43,8 @@
import zipkin.cassandra.CassandraConfig;
import zipkin.cassandra.CassandraSpanStore;
import zipkin.jdbc.JDBCSpanStore;
import zipkin.kafka.KafkaConfig;
import zipkin.kafka.KafkaTransport;
import zipkin.server.ZipkinServerProperties.Store.Type;
import zipkin.server.brave.TraceWritesSpanStore;

Expand Down Expand Up @@ -105,4 +116,40 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
return bean;
}
}

/**
* This transport consumes a topic, decodes spans from thrift messages and stores them subject to
* sampling policy.
*/
@Configuration
@EnableConfigurationProperties(ZipkinKafkaProperties.class)
@ConditionalOnKafkaZookeeper
static class KafkaConfiguration {
@Bean KafkaTransport kafkaTransport(ZipkinKafkaProperties kafka, ZipkinSpanWriter writer) {
KafkaConfig config = KafkaConfig.builder()
.topic(kafka.getTopic())
.zookeeper(kafka.getZookeeper())
.groupId(kafka.getGroupId())
.streams(kafka.getStreams()).build();
return new KafkaTransport(config, writer);
}
}

/**
* This condition passes when Kafka classes are available and {@link
* ZipkinKafkaProperties#getZookeeper()} is set.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Conditional(ConditionalOnKafkaZookeeper.KafkaEnabledCondition.class)
@ConditionalOnClass(name = "zipkin.kafka.KafkaTransport") @interface ConditionalOnKafkaZookeeper {
class KafkaEnabledCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
return context.getEnvironment().getProperty("kafka.zookeeper").isEmpty() ?
ConditionOutcome.noMatch("kafka.zookeeper isn't set") :
ConditionOutcome.match();
}
}
}
}
9 changes: 9 additions & 0 deletions zipkin-server/src/main/resources/zipkin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@ mysql:
db: ${MYSQL_DB:zipkin}
max-active: ${MYSQL_MAX_CONNECTIONS:10}
use-ssl: ${MYSQL_USE_SSL:false}
kafka:
# ZooKeeper host string, comma-separated host:port value.
zookeeper: ${KAFKA_ZOOKEEPER:}
# Name of topic to poll for spans
topic: ${KAFKA_TOPIC:zipkin}
# Consumer group this process is consuming on behalf of.
group-id: ${KAFKA_GROUP_ID:zipkin}
# Count of consumer threads consuming the topic
streams: ${KAFKA_STREAMS:1}
zipkin:
collector:
# percentage to traces to retain
Expand Down
24 changes: 24 additions & 0 deletions zipkin-transports/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# transport-kafka
This transport polls a Kafka 8.2.2+ topic for messages that contain
TBinaryProtocol big-endian encoded lists of spans. These spans are
pushed to a span consumer.

`zipkin.kafka.KafkaConfig` includes defaults that will operate
against a local Cassandra installation.


## Encoding spans into Kafka messages
`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:

The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol
```
write_byte(12) // type of the list elements: 12 == struct
write_i32(count) // count of spans that will follow
for (int i = 0; i < count; i++) {
writeTBinaryProtocol(spans(i))
}
```

### Legacy encoding
Older versions of zipkin accepted a single span per message, as opposed
to a list per message. This practice is deprecated, but still supported.
57 changes: 57 additions & 0 deletions zipkin-transports/kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2015-2016 The OpenZipkin Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-transports</artifactId>
<version>0.7.1-SNAPSHOT</version>
</parent>

<artifactId>transport-kafka</artifactId>
<name>Span Transport: Kafka</name>

<properties>
<main.basedir>${project.basedir}/..</main.basedir>
<!-- This is pinned to Kafka 0.8.x client as 0.9.x brokers work with them, but not visa-versa
http://docs.confluent.io/2.0.0/upgrade.html -->
<kafka.version>0.8.2.2</kafka.version>
<!-- pinned to 0.8.2.2 -->
<kafka-junit.version>1.7</kafka-junit.version>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>com.github.charithe</groupId>
<artifactId>kafka-junit</artifactId>
<version>${kafka-junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 524f115

Please sign in to comment.