diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 009b2e688ce..0ae57bb421b 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -80,7 +80,7 @@
${wire.version}
- io.zipkin.proto3
+ org.apache.zipkin.proto3
zipkin-proto3
diff --git a/pom.xml b/pom.xml
index f94e16ddcb1..6e34b0a9418 100755
--- a/pom.xml
+++ b/pom.xml
@@ -154,6 +154,26 @@
https://github.com/openzipkin/zipkin/issues
+
+
+ apache.snapshots.https
+ https://repository.apache.org/content/repositories/snapshots/
+
+ false
+
+
+
+
+
+
+ apache.snapshots.https
+ https://repository.apache.org/content/repositories/snapshots/
+
+ false
+
+
+
+
@@ -362,9 +382,9 @@
- io.zipkin.proto3
+ org.apache.zipkin.proto3
zipkin-proto3
- 0.1.0
+ 0.2.1-SNAPSHOT
com.squareup.wire
@@ -627,6 +647,7 @@
SLASHSTAR_STYLE
+ SLASHSTAR_STYLE
.travis.yml
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 5b18374162f..43d01fe6f83 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -370,6 +370,19 @@ Example usage:
$ RABBIT_ADDRESSES=localhost java -jar zipkin.jar
```
+### gRPC Collector (Experimental)
+You can enable a gRPC span collector endpoint by setting `COLLECTOR_GRPC_ENABLED=true`. The
+`zipkin.proto3.SpanService/Report` endpoint will run on the same port as normal http (9411).
+
+
+Example usage:
+
+```bash
+COLLECTOR_GRPC_ENABLED=true java -jar zipkin.jar
+```
+
+As this service is experimental, it is not recommended to run this in production environments.
+
### 128-bit trace IDs
Zipkin supports 64 and 128-bit trace identifiers, typically serialized
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index 93fb1fbf260..d418a58181c 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -34,7 +34,9 @@
1.8
java18
zipkin.server.ZipkinServer
+ 1.3.30
3.2.0
+ ${project.build.directory}/generated-test-sources/wire
@@ -82,6 +84,10 @@
com.linecorp.armeria
armeria-zipkin
+
+ com.linecorp.armeria
+ armeria-grpc-protocol
+
@@ -125,6 +131,25 @@
1.11.3
+
+
+ org.apache.zipkin.proto3
+ zipkin-proto3
+ test
+
+
+ com.squareup.wire
+ wire-grpc-client
+ ${wire.version}
+ test
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ ${kotlin.version}
+ test
+
+
${project.groupId}.zipkin2
@@ -348,6 +373,90 @@
+
+
+ maven-dependency-plugin
+
+
+ com.squareup.wire
+ wire-maven-plugin
+
+
+ generate-test-sources
+
+ generate-sources
+
+
+ ${proto.generatedSourceDirectory}
+
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-maven-plugin
+ ${kotlin.version}
+
+ enable
+
+
+
+ compile
+ none
+
+
+ test-compile
+
+ test-compile
+
+
+
+ ${project.basedir}/src/test/kotlin
+ ${project.basedir}/src/test/java
+ ${proto.generatedSourceDirectory}
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.0.0
+
+
+ add-test-source
+ generate-test-sources
+
+ add-test-source
+
+
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+ ${maven-compiler-plugin.version}
+
+
+
+ default-testCompile
+ none
+
+
+ java-test-compile
+ test-compile
+
+ testCompile
+
+
+
+
org.springframework.boot
spring-boot-maven-plugin
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java
index 1361e0c05dc..e275ab92a1a 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java
@@ -39,6 +39,7 @@
TracingConfiguration.class,
ZipkinQueryApiV2.class,
ZipkinHttpCollector.class,
+ ZipkinGrpcCollector.class,
ZipkinKafkaCollectorConfiguration.class,
ZipkinRabbitMQCollectorConfiguration.class,
MetricsHealthController.class,
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
new file mode 100644
index 00000000000..62c3f80554a
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal;
+
+import com.linecorp.armeria.common.grpc.protocol.AbstractUnaryGrpcService;
+import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
+import java.util.concurrent.CompletableFuture;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import zipkin2.Callback;
+import zipkin2.codec.SpanBytesDecoder;
+import zipkin2.collector.Collector;
+import zipkin2.collector.CollectorMetrics;
+import zipkin2.collector.CollectorSampler;
+import zipkin2.storage.StorageComponent;
+
+/** Collector for receiving spans on a gRPC endpoint. */
+@ConditionalOnProperty(name = "zipkin.collector.grpc.enabled") // disabled by default
+final class ZipkinGrpcCollector {
+ static final byte[] EMPTY = new byte[0];
+
+ @Bean ArmeriaServerConfigurator grpcCollectorConfigurator(StorageComponent storage,
+ CollectorSampler sampler, CollectorMetrics metrics) {
+ CollectorMetrics grpcMetrics = metrics.forTransport("grpc");
+ Collector collector = Collector.newBuilder(getClass())
+ .storage(storage)
+ .sampler(sampler)
+ .metrics(grpcMetrics)
+ .build();
+
+ return sb ->
+ sb.service("/zipkin.proto3.SpanService/Report", new SpanService(collector, grpcMetrics));
+ }
+
+ static final class SpanService extends AbstractUnaryGrpcService {
+
+ final Collector collector;
+ final CollectorMetrics metrics;
+
+ SpanService(Collector collector, CollectorMetrics metrics) {
+ this.collector = collector;
+ this.metrics = metrics;
+ }
+
+ @Override protected CompletableFuture handleMessage(byte[] bytes) {
+ metrics.incrementMessages();
+ CompletableFutureCallback result = new CompletableFutureCallback();
+ collector.acceptSpans(bytes, SpanBytesDecoder.PROTO3, result);
+ return result;
+ }
+ }
+
+ static final class CompletableFutureCallback extends CompletableFuture
+ implements Callback {
+
+ @Override public void onSuccess(Void value) {
+ complete(EMPTY);
+ }
+
+ @Override public void onError(Throwable t) {
+ completeExceptionally(t);
+ }
+ }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
index b004479fd6a..cb381a7f338 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
@@ -98,7 +98,10 @@ public void addViewControllers(ViewControllerRegistry registry) {
// could split the collector's CORS policy into a different property, still allowing POST
// with content-type by default.
.allowRequestMethods(HttpMethod.GET, HttpMethod.POST)
- .allowRequestHeaders(HttpHeaderNames.CONTENT_TYPE);
+ .allowRequestHeaders(HttpHeaderNames.CONTENT_TYPE,
+ // Use literals to avoid a runtime dependency on armeria-grpc types
+ HttpHeaderNames.of("X-GRPC-WEB"))
+ .exposeHeaders("grpc-status", "grpc-message", "armeria.grpc.ThrowableProto-bin");
return builder -> builder.decorator(corsBuilder::build);
}
diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml
index 79bc9392d04..3236cb6ebb7 100644
--- a/zipkin-server/src/main/resources/zipkin-server-shared.yml
+++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml
@@ -12,6 +12,9 @@ zipkin:
http:
# Set to false to disable creation of spans via HTTP collector API
enabled: ${HTTP_COLLECTOR_ENABLED:true}
+ grpc:
+ # Set to true to enable the GRPC collector
+ enabled: ${COLLECTOR_GRPC_ENABLED:false}
kafka:
# Kafka bootstrap broker list, comma-separated host:port values. Setting this activates the
# Kafka 0.10+ collector.
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
new file mode 100644
index 00000000000..c87d133ef3d
--- /dev/null
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zipkin2.server.internal
+
+import com.linecorp.armeria.server.Server
+import com.squareup.wire.GrpcClient
+import com.squareup.wire.Service
+import com.squareup.wire.WireRpc
+import kotlinx.coroutines.runBlocking
+import okhttp3.OkHttpClient
+import okhttp3.Protocol
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.junit4.SpringRunner
+import zipkin.server.ZipkinServer
+import zipkin2.Span
+import zipkin2.TestObjects
+import zipkin2.codec.SpanBytesDecoder
+import zipkin2.codec.SpanBytesEncoder
+import zipkin2.proto3.ListOfSpans
+import zipkin2.proto3.ReportResponse
+import zipkin2.storage.InMemoryStorage
+
+@SpringBootTest(classes = [ZipkinServer::class],
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = ["spring.config.name=zipkin-server", "zipkin.collector.grpc.enabled=true"])
+@RunWith(SpringRunner::class)
+// Written in Kotlin as Square Wire's grpc client is Kotlin.
+// Also, the first author of this test wanted an excuse to write Kotlin.
+class ITZipkinGrpcCollector {
+ @Autowired lateinit var storage: InMemoryStorage
+ @Autowired lateinit var server: Server
+
+ var request = ListOfSpans.ADAPTER.decode(SpanBytesEncoder.PROTO3.encodeList(TestObjects.TRACE))
+ lateinit var spanService: SpanService
+
+ interface SpanService : Service {
+ @WireRpc(
+ path = "/zipkin.proto3.SpanService/Report",
+ requestAdapter = "zipkin2.proto3.ListOfSpans#ADAPTER",
+ responseAdapter = "zipkin2.proto3.ReportResponse#ADAPTER"
+ )
+ suspend fun Report(request: ListOfSpans): ReportResponse
+ }
+
+ @Before fun sanityCheckCodecCompatible() {
+ assertThat(SpanBytesDecoder.PROTO3.decodeList(request.encode()))
+ .containsExactlyElementsOf(TestObjects.TRACE)
+ }
+
+ @Before fun createClient() {
+ spanService = GrpcClient.Builder()
+ .client(OkHttpClient.Builder().protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE)).build())
+ .baseUrl("http://localhost:" + server.activePort().get().localAddress().port)
+ .build().create(SpanService::class)
+ }
+
+ /** This tests that we accept messages constructed by other clients. */
+ @Test fun report_withWireGrpcLibrary() {
+ runBlocking {
+ spanService.Report(request) // Result is effectively void
+ }
+ assertThat>(storage.traces)
+ .containsExactly(TestObjects.TRACE)
+ }
+}