From be01b978c93473dd4673d9456a787bead6fb3fe7 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Mon, 14 Mar 2016 17:28:53 +0800 Subject: [PATCH] Adds Kafka Transport This adds a Kafka transport, which receives messages in the same way as zipkin-scala, using the same environment variables. --- pom.xml | 9 +- zipkin-server/README.md | 14 +++ zipkin-server/pom.xml | 37 ++++--- .../zipkin/server/ZipkinKafkaProperties.java | 56 +++++++++++ .../server/ZipkinServerConfiguration.java | 47 +++++++++ .../src/main/resources/zipkin-server.yml | 9 ++ zipkin-transports/kafka/README.md | 24 +++++ zipkin-transports/kafka/pom.xml | 57 +++++++++++ .../main/java/zipkin/kafka/KafkaConfig.java | 84 ++++++++++++++++ .../zipkin/kafka/KafkaStreamProcessor.java | 41 ++++++++ .../java/zipkin/kafka/KafkaTransport.java | 54 ++++++++++ .../main/java/zipkin/kafka/SpansDecoder.java | 47 +++++++++ .../java/zipkin/kafka/KafkaTransportTest.java | 98 +++++++++++++++++++ .../kafka/src/test/resources/log4j.properties | 12 +++ zipkin-transports/pom.xml | 33 +++++++ 15 files changed, 606 insertions(+), 16 deletions(-) create mode 100644 zipkin-server/src/main/java/zipkin/server/ZipkinKafkaProperties.java create mode 100644 zipkin-transports/kafka/README.md create mode 100644 zipkin-transports/kafka/pom.xml create mode 100644 zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaConfig.java create mode 100644 zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaStreamProcessor.java create mode 100644 zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaTransport.java create mode 100644 zipkin-transports/kafka/src/main/java/zipkin/kafka/SpansDecoder.java create mode 100644 zipkin-transports/kafka/src/test/java/zipkin/kafka/KafkaTransportTest.java create mode 100644 zipkin-transports/kafka/src/test/resources/log4j.properties create mode 100644 zipkin-transports/pom.xml diff --git a/pom.xml b/pom.xml index a1973a03d49..20c1b2fb18e 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,7 @@ benchmarks interop zipkin-spanstores + zipkin-transports zipkin-server @@ -41,7 +42,7 @@ 1.1.0 1.6.0 - 1.3.2.RELEASE + 1.3.3.RELEASE 1.35.0 - - com.github.kristofa - brave-spring-web-servlet-interceptor - ${brave.version} - true - - - - com.github.kristofa - brave-spancollector-scribe - ${brave.version} - true - - org.springframework.boot spring-boot-configuration-processor @@ -108,6 +93,28 @@ true + + + ${project.groupId} + transport-kafka + true + + + + + com.github.kristofa + brave-spring-web-servlet-interceptor + ${brave.version} + true + + + + com.github.kristofa + brave-spancollector-scribe + ${brave.version} + true + + org.springframework.boot spring-boot-starter-test diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinKafkaProperties.java b/zipkin-server/src/main/java/zipkin/server/ZipkinKafkaProperties.java new file mode 100644 index 00000000000..7905b4d0761 --- /dev/null +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinKafkaProperties.java @@ -0,0 +1,56 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.server; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("kafka") +class ZipkinKafkaProperties { + private String topic = "zipkin"; + private String zookeeper; + private String groupId = "zipkin"; + private int streams = 1; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getZookeeper() { + return zookeeper; + } + + public void setZookeeper(String zookeeper) { + this.zookeeper = "".equals(zookeeper) ? null : zookeeper; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public int getStreams() { + return streams; + } + + public void setStreams(int streams) { + this.streams = streams; + } +} diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java index 30a8f2e463b..83deeb2c756 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinServerConfiguration.java @@ -14,6 +14,10 @@ package zipkin.server; import com.github.kristofa.brave.Brave; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; import javax.sql.DataSource; import org.jooq.ExecuteListenerProvider; import org.jooq.conf.Settings; @@ -21,11 +25,16 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; +import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.scheduling.annotation.EnableAsync; import zipkin.Codec; import zipkin.InMemorySpanStore; @@ -34,6 +43,8 @@ import zipkin.cassandra.CassandraConfig; import zipkin.cassandra.CassandraSpanStore; import zipkin.jdbc.JDBCSpanStore; +import zipkin.kafka.KafkaConfig; +import zipkin.kafka.KafkaTransport; import zipkin.server.ZipkinServerProperties.Store.Type; import zipkin.server.brave.TraceWritesSpanStore; @@ -105,4 +116,40 @@ public Object postProcessAfterInitialization(Object bean, String beanName) { return bean; } } + + /** + * This transport consumes a topic, decodes spans from thrift messages and stores them subject to + * sampling policy. + */ + @Configuration + @EnableConfigurationProperties(ZipkinKafkaProperties.class) + @ConditionalOnKafkaZookeeper + static class KafkaConfiguration { + @Bean KafkaTransport kafkaTransport(ZipkinKafkaProperties kafka, ZipkinSpanWriter writer) { + KafkaConfig config = KafkaConfig.builder() + .topic(kafka.getTopic()) + .zookeeper(kafka.getZookeeper()) + .groupId(kafka.getGroupId()) + .streams(kafka.getStreams()).build(); + return new KafkaTransport(config, writer); + } + } + + /** + * This condition passes when Kafka classes are available and {@link + * ZipkinKafkaProperties#getZookeeper()} is set. + */ + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + @Conditional(ConditionalOnKafkaZookeeper.KafkaEnabledCondition.class) + @ConditionalOnClass(name = "zipkin.kafka.KafkaTransport") @interface ConditionalOnKafkaZookeeper { + class KafkaEnabledCondition extends SpringBootCondition { + @Override + public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) { + return context.getEnvironment().getProperty("kafka.zookeeper").isEmpty() ? + ConditionOutcome.noMatch("kafka.zookeeper isn't set") : + ConditionOutcome.match(); + } + } + } } diff --git a/zipkin-server/src/main/resources/zipkin-server.yml b/zipkin-server/src/main/resources/zipkin-server.yml index eadbced70b3..bdd17491b4e 100644 --- a/zipkin-server/src/main/resources/zipkin-server.yml +++ b/zipkin-server/src/main/resources/zipkin-server.yml @@ -19,6 +19,15 @@ mysql: db: ${MYSQL_DB:zipkin} max-active: ${MYSQL_MAX_CONNECTIONS:10} use-ssl: ${MYSQL_USE_SSL:false} +kafka: + # ZooKeeper host string, comma-separated host:port value. + zookeeper: ${KAFKA_ZOOKEEPER:} + # Name of topic to poll for spans + topic: ${KAFKA_TOPIC:zipkin} + # Consumer group this process is consuming on behalf of. + group-id: ${KAFKA_GROUP_ID:zipkin} + # Count of consumer threads consuming the topic + streams: ${KAFKA_STREAMS:1} zipkin: collector: # percentage to traces to retain diff --git a/zipkin-transports/kafka/README.md b/zipkin-transports/kafka/README.md new file mode 100644 index 00000000000..7b6a39e26f3 --- /dev/null +++ b/zipkin-transports/kafka/README.md @@ -0,0 +1,24 @@ +# transport-kafka +This transport polls a Kafka 8.2.2+ topic for messages that contain +TBinaryProtocol big-endian encoded lists of spans. These spans are +pushed to a span consumer. + +`zipkin.kafka.KafkaConfig` includes defaults that will operate +against a local Cassandra installation. + + +## Encoding spans into Kafka messages +`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion: + +The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol +``` +write_byte(12) // type of the list elements: 12 == struct +write_i32(count) // count of spans that will follow +for (int i = 0; i < count; i++) { + writeTBinaryProtocol(spans(i)) +} +``` + +### Legacy encoding +Older versions of zipkin accepted a single span per message, as opposed +to a list per message. This practice is deprecated, but still supported. \ No newline at end of file diff --git a/zipkin-transports/kafka/pom.xml b/zipkin-transports/kafka/pom.xml new file mode 100644 index 00000000000..4c2cbc6ab54 --- /dev/null +++ b/zipkin-transports/kafka/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + + io.zipkin.java + zipkin-transports + 0.7.1-SNAPSHOT + + + transport-kafka + Span Transport: Kafka + + + ${project.basedir}/.. + + 0.8.2.2 + + 1.7 + + + + + ${project.groupId} + zipkin + + + + org.apache.kafka + kafka_2.11 + ${kafka.version} + + + + com.github.charithe + kafka-junit + ${kafka-junit.version} + test + + + diff --git a/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaConfig.java b/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaConfig.java new file mode 100644 index 00000000000..1b12ae330c6 --- /dev/null +++ b/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaConfig.java @@ -0,0 +1,84 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin.kafka; + +import java.util.Properties; +import kafka.consumer.ConsumerConfig; + +import static zipkin.internal.Util.checkNotNull; + +/** Configuration including defaults needed to consume spans from a Kafka topic. */ +public final class KafkaConfig { + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private String topic = "zipkin"; + private String zookeeper; + private String groupId = "zipkin"; + private int streams = 1; + + /** Topic zipkin spans will be consumed from. Defaults to "zipkin" */ + public Builder topic(String topic) { + this.topic = topic; + return this; + } + + /** The zookeeper connect string, ex. 127.0.0.1:2181. No default */ + public Builder zookeeper(String zookeeper) { + this.zookeeper = zookeeper; + return this; + } + + /** The consumer group this process is consuming on behalf of. Defaults to "zipkin" */ + public Builder groupId(String groupId) { + this.groupId = groupId; + return this; + } + + /** Count of threads/streams consuming the topic. Defaults to 1 */ + public Builder streams(int streams) { + this.streams = streams; + return this; + } + + public KafkaConfig build() { + return new KafkaConfig(this); + } + } + + final String topic; + final String zookeeper; + final String groupId; + final int streams; + + KafkaConfig(Builder builder) { + this.topic = checkNotNull(builder.topic, "topic"); + this.zookeeper = checkNotNull(builder.zookeeper, "zookeeper"); + this.groupId = checkNotNull(builder.groupId, "groupId"); + this.streams = builder.streams; + } + + ConsumerConfig forConsumer() { + Properties props = new Properties(); + props.put("zookeeper.connect", zookeeper); + props.put("group.id", groupId); + // Same default as zipkin-scala, and keeps tests from hanging + props.put("auto.offset.reset", "smallest"); + return new ConsumerConfig(props); + } +} diff --git a/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaStreamProcessor.java b/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaStreamProcessor.java new file mode 100644 index 00000000000..912ae5c6cd7 --- /dev/null +++ b/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaStreamProcessor.java @@ -0,0 +1,41 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.kafka; + +import java.util.List; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import zipkin.Span; +import zipkin.SpanConsumer; + +final class KafkaStreamProcessor implements Runnable { + + private final KafkaStream> stream; + private final SpanConsumer spanConsumer; + + KafkaStreamProcessor(KafkaStream> stream, SpanConsumer spanConsumer) { + this.stream = stream; + this.spanConsumer = spanConsumer; + } + + @Override + public void run() { + ConsumerIterator> messages = stream.iterator(); + while (messages.hasNext()) { + List spans = messages.next().message(); + if (spans.isEmpty()) continue; + spanConsumer.accept(spans); + } + } +} diff --git a/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaTransport.java b/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaTransport.java new file mode 100644 index 00000000000..fbb73bcc77f --- /dev/null +++ b/zipkin-transports/kafka/src/main/java/zipkin/kafka/KafkaTransport.java @@ -0,0 +1,54 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.kafka; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.serializer.StringDecoder; +import zipkin.SpanConsumer; + +import static kafka.consumer.Consumer.createJavaConsumerConnector; + +/** + * This transport polls a Kafka topic for messages that contain TBinaryProtocol big-endian encoded + * lists of spans. These spans are pushed to a {@link SpanConsumer#accept(List) span consumer}. + */ +public final class KafkaTransport implements AutoCloseable { + + final ExecutorService pool; + + public KafkaTransport(KafkaConfig config, SpanConsumer spanConsumer) { + this.pool = config.streams == 1 + ? Executors.newSingleThreadExecutor() + : Executors.newFixedThreadPool(config.streams); + ConsumerConnector connector = createJavaConsumerConnector(config.forConsumer()); + + Map topicCountMap = new LinkedHashMap<>(1); + topicCountMap.put(config.topic, config.streams); + + connector.createMessageStreams(topicCountMap, new StringDecoder(null), new SpansDecoder()) + .get(config.topic).forEach(stream -> + pool.execute(new KafkaStreamProcessor(stream, spanConsumer)) + ); + } + + @Override + public void close() { + pool.shutdown(); + } +} diff --git a/zipkin-transports/kafka/src/main/java/zipkin/kafka/SpansDecoder.java b/zipkin-transports/kafka/src/main/java/zipkin/kafka/SpansDecoder.java new file mode 100644 index 00000000000..7de9430e884 --- /dev/null +++ b/zipkin-transports/kafka/src/main/java/zipkin/kafka/SpansDecoder.java @@ -0,0 +1,47 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.kafka; + +import java.util.Collections; +import java.util.List; +import kafka.serializer.Decoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin.Codec; +import zipkin.Span; + +/** + * Conditionally decodes depending on whether the input bytes are encoded as a single span or a + * list. Malformed input is ignored. + */ +final class SpansDecoder implements Decoder> { + + @Override + public List fromBytes(byte[] bytes) { + try { + // Given the thrift encoding is TBinaryProtocol.. + // .. When serializing a Span (Struct), the first byte will be the type of a field + // .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT + // Span has no STRUCT fields: we assume that if the first byte is TType.STRUCT is a list. + if (bytes[0] == 12 /* TType.STRUCT */) { + return Codec.THRIFT.readSpans(bytes); + } else { + return Collections.singletonList(Codec.THRIFT.readSpan(bytes)); + } + } catch (IllegalArgumentException ignored) { + // binary decoding messages aren't useful enough to clutter logs with. + return Collections.emptyList(); + } + } +} diff --git a/zipkin-transports/kafka/src/test/java/zipkin/kafka/KafkaTransportTest.java b/zipkin-transports/kafka/src/test/java/zipkin/kafka/KafkaTransportTest.java new file mode 100644 index 00000000000..1890b1a6f41 --- /dev/null +++ b/zipkin-transports/kafka/src/test/java/zipkin/kafka/KafkaTransportTest.java @@ -0,0 +1,98 @@ +/** + * Copyright 2015-2016 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.kafka; + +import com.github.charithe.kafka.KafkaJunitRule; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import org.junit.ClassRule; +import org.junit.Test; +import zipkin.Annotation; +import zipkin.Codec; +import zipkin.Endpoint; +import zipkin.Span; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.Constants.SERVER_RECV; + +public class KafkaTransportTest { + @ClassRule public static KafkaJunitRule kafka = new KafkaJunitRule(); + + Endpoint endpoint = Endpoint.create("web", 127 << 24 | 1, 80); + Annotation ann = Annotation.create(System.currentTimeMillis() * 1000, SERVER_RECV, endpoint); + Span span = new Span.Builder().id(1L).traceId(1L).timestamp(ann.timestamp).name("get") + .addAnnotation(ann).build(); + + /** Ensures legacy encoding works: a single TBinaryProtocol encoded span */ + @Test + public void messageWithSingleSpan() throws Exception { + KafkaConfig config = KafkaConfig.builder() + .zookeeper(kafka.zookeeperConnectionString()) + .topic("single_span").build(); + + CompletableFuture> promise = new CompletableFuture<>(); + + Producer producer = new Producer<>(kafka.producerConfigWithDefaultEncoder()); + producer.send(new KeyedMessage<>(config.topic, Codec.THRIFT.writeSpan(span))); + producer.close(); + + try (KafkaTransport processor = new KafkaTransport(config, promise::complete)) { + assertThat(promise.get()).containsOnly(span); + } + } + + /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */ + @Test + public void messageWithMultipleSpans() throws Exception { + KafkaConfig config = KafkaConfig.builder() + .zookeeper(kafka.zookeeperConnectionString()) + .topic("multiple_spans").build(); + + CompletableFuture> promise = new CompletableFuture<>(); + + Producer producer = new Producer<>(kafka.producerConfigWithDefaultEncoder()); + producer.send(new KeyedMessage<>(config.topic, Codec.THRIFT.writeSpans(asList(span, span)))); + producer.close(); + + try (KafkaTransport processor = new KafkaTransport(config, promise::complete)) { + assertThat(promise.get()).containsExactly(span, span); + } + } + + /** Ensures malformed spans don't hang the processor */ + @Test + public void skipsMalformedData() throws Exception { + KafkaConfig config = KafkaConfig.builder() + .zookeeper(kafka.zookeeperConnectionString()) + .topic("malformed").build(); + + LinkedBlockingQueue> recvdSpans = new LinkedBlockingQueue<>(); + + Producer producer = new Producer<>(kafka.producerConfigWithDefaultEncoder()); + producer.send(new KeyedMessage<>(config.topic, Codec.THRIFT.writeSpans(asList(span)))); + producer.send(new KeyedMessage<>(config.topic, "malformed".getBytes())); + producer.send(new KeyedMessage<>(config.topic, Codec.THRIFT.writeSpans(asList(span)))); + producer.close(); + + try (KafkaTransport processor = new KafkaTransport(config, recvdSpans::add)) { + assertThat(recvdSpans.take()).containsExactly(span); + // the only way we could read this, is if the malformed span was skipped. + assertThat(recvdSpans.take()).containsExactly(span); + } + } +} diff --git a/zipkin-transports/kafka/src/test/resources/log4j.properties b/zipkin-transports/kafka/src/test/resources/log4j.properties new file mode 100644 index 00000000000..e5e9ae0044c --- /dev/null +++ b/zipkin-transports/kafka/src/test/resources/log4j.properties @@ -0,0 +1,12 @@ +# By default, everything goes to console and file +log4j.rootLogger=WARN, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n +log4j.appender.A1.ImmediateFlush=true + +log4j.logger.kafka.utils=WARN, A1 +log4j.logger.kafka.consumer=WARN, A1 +log4j.logger.kafka.producer=WARN, A1 diff --git a/zipkin-transports/pom.xml b/zipkin-transports/pom.xml new file mode 100644 index 00000000000..37b3cefafea --- /dev/null +++ b/zipkin-transports/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + + io.zipkin.java + parent + 0.7.1-SNAPSHOT + + + zipkin-transports + Zipkin Span Transports + pom + + + kafka + +