From 8851e388992db8481b83307d4fad26935ad269e1 Mon Sep 17 00:00:00 2001 From: Eric Hauser Date: Sat, 15 Dec 2018 15:10:25 -0800 Subject: [PATCH] Adds gRPC endpoint /zipkin.proto3.SpanService/Report (#2328) This adds an opt-in gRPC endpoint for reporting spans. A request to the gRPC `/zipkin.proto3.SpanService/Report` service is the same proto used with our `POST /api/v2/spans`, `Content-Type: application/x-protobuf` endpoint: `zipkin.proto3.ListOfSpans`. This is currently disabled by default, as the feature is experimental. Enable it like so: ```bash $ COLLECTOR_GRPC_ENABLED=true java -jar zipkin.jar ``` Under the scenes the runtime is the same as the POST endpoint (Armeria), and uses the same port (9411). --- benchmarks/pom.xml | 2 +- pom.xml | 25 +++- zipkin-server/README.md | 13 +++ zipkin-server/pom.xml | 109 ++++++++++++++++++ .../internal/InternalZipkinConfiguration.java | 1 + .../server/internal/ZipkinGrpcCollector.java | 78 +++++++++++++ .../internal/ZipkinServerConfiguration.java | 5 +- .../main/resources/zipkin-server-shared.yml | 3 + .../server/internal/ITZipkinGrpcCollector.kt | 85 ++++++++++++++ 9 files changed, 317 insertions(+), 4 deletions(-) create mode 100644 zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java create mode 100644 zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 009b2e688ce..0ae57bb421b 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -80,7 +80,7 @@ ${wire.version} - io.zipkin.proto3 + org.apache.zipkin.proto3 zipkin-proto3 diff --git a/pom.xml b/pom.xml index f94e16ddcb1..6e34b0a9418 100755 --- a/pom.xml +++ b/pom.xml @@ -154,6 +154,26 @@ https://github.com/openzipkin/zipkin/issues + + + apache.snapshots.https + https://repository.apache.org/content/repositories/snapshots/ + + false + + + + + + + apache.snapshots.https + https://repository.apache.org/content/repositories/snapshots/ + + false + + + + @@ -362,9 +382,9 @@ - io.zipkin.proto3 + org.apache.zipkin.proto3 zipkin-proto3 - 0.1.0 + 0.2.1-SNAPSHOT com.squareup.wire @@ -627,6 +647,7 @@ SLASHSTAR_STYLE + SLASHSTAR_STYLE .travis.yml diff --git a/zipkin-server/README.md b/zipkin-server/README.md index 5b18374162f..43d01fe6f83 100644 --- a/zipkin-server/README.md +++ b/zipkin-server/README.md @@ -370,6 +370,19 @@ Example usage: $ RABBIT_ADDRESSES=localhost java -jar zipkin.jar ``` +### gRPC Collector (Experimental) +You can enable a gRPC span collector endpoint by setting `COLLECTOR_GRPC_ENABLED=true`. The +`zipkin.proto3.SpanService/Report` endpoint will run on the same port as normal http (9411). + + +Example usage: + +```bash +COLLECTOR_GRPC_ENABLED=true java -jar zipkin.jar +``` + +As this service is experimental, it is not recommended to run this in production environments. + ### 128-bit trace IDs Zipkin supports 64 and 128-bit trace identifiers, typically serialized diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index 93fb1fbf260..d418a58181c 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -34,7 +34,9 @@ 1.8 java18 zipkin.server.ZipkinServer + 1.3.30 3.2.0 + ${project.build.directory}/generated-test-sources/wire @@ -82,6 +84,10 @@ com.linecorp.armeria armeria-zipkin + + com.linecorp.armeria + armeria-grpc-protocol + @@ -125,6 +131,25 @@ 1.11.3 + + + org.apache.zipkin.proto3 + zipkin-proto3 + test + + + com.squareup.wire + wire-grpc-client + ${wire.version} + test + + + org.jetbrains.kotlin + kotlin-stdlib + ${kotlin.version} + test + + ${project.groupId}.zipkin2 @@ -348,6 +373,90 @@ + + + maven-dependency-plugin + + + com.squareup.wire + wire-maven-plugin + + + generate-test-sources + + generate-sources + + + ${proto.generatedSourceDirectory} + + + + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + enable + + + + compile + none + + + test-compile + + test-compile + + + + ${project.basedir}/src/test/kotlin + ${project.basedir}/src/test/java + ${proto.generatedSourceDirectory} + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.0.0 + + + add-test-source + generate-test-sources + + add-test-source + + + + ${proto.generatedSourceDirectory} + + + + + + + maven-compiler-plugin + ${maven-compiler-plugin.version} + + + + default-testCompile + none + + + java-test-compile + test-compile + + testCompile + + + + org.springframework.boot spring-boot-maven-plugin diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java index 1361e0c05dc..e275ab92a1a 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java @@ -39,6 +39,7 @@ TracingConfiguration.class, ZipkinQueryApiV2.class, ZipkinHttpCollector.class, + ZipkinGrpcCollector.class, ZipkinKafkaCollectorConfiguration.class, ZipkinRabbitMQCollectorConfiguration.class, MetricsHealthController.class, diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java new file mode 100644 index 00000000000..62c3f80554a --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package zipkin2.server.internal; + +import com.linecorp.armeria.common.grpc.protocol.AbstractUnaryGrpcService; +import com.linecorp.armeria.spring.ArmeriaServerConfigurator; +import java.util.concurrent.CompletableFuture; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import zipkin2.Callback; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.storage.StorageComponent; + +/** Collector for receiving spans on a gRPC endpoint. */ +@ConditionalOnProperty(name = "zipkin.collector.grpc.enabled") // disabled by default +final class ZipkinGrpcCollector { + static final byte[] EMPTY = new byte[0]; + + @Bean ArmeriaServerConfigurator grpcCollectorConfigurator(StorageComponent storage, + CollectorSampler sampler, CollectorMetrics metrics) { + CollectorMetrics grpcMetrics = metrics.forTransport("grpc"); + Collector collector = Collector.newBuilder(getClass()) + .storage(storage) + .sampler(sampler) + .metrics(grpcMetrics) + .build(); + + return sb -> + sb.service("/zipkin.proto3.SpanService/Report", new SpanService(collector, grpcMetrics)); + } + + static final class SpanService extends AbstractUnaryGrpcService { + + final Collector collector; + final CollectorMetrics metrics; + + SpanService(Collector collector, CollectorMetrics metrics) { + this.collector = collector; + this.metrics = metrics; + } + + @Override protected CompletableFuture handleMessage(byte[] bytes) { + metrics.incrementMessages(); + CompletableFutureCallback result = new CompletableFutureCallback(); + collector.acceptSpans(bytes, SpanBytesDecoder.PROTO3, result); + return result; + } + } + + static final class CompletableFutureCallback extends CompletableFuture + implements Callback { + + @Override public void onSuccess(Void value) { + complete(EMPTY); + } + + @Override public void onError(Throwable t) { + completeExceptionally(t); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java index b004479fd6a..cb381a7f338 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java @@ -98,7 +98,10 @@ public void addViewControllers(ViewControllerRegistry registry) { // could split the collector's CORS policy into a different property, still allowing POST // with content-type by default. .allowRequestMethods(HttpMethod.GET, HttpMethod.POST) - .allowRequestHeaders(HttpHeaderNames.CONTENT_TYPE); + .allowRequestHeaders(HttpHeaderNames.CONTENT_TYPE, + // Use literals to avoid a runtime dependency on armeria-grpc types + HttpHeaderNames.of("X-GRPC-WEB")) + .exposeHeaders("grpc-status", "grpc-message", "armeria.grpc.ThrowableProto-bin"); return builder -> builder.decorator(corsBuilder::build); } diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 79bc9392d04..3236cb6ebb7 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -12,6 +12,9 @@ zipkin: http: # Set to false to disable creation of spans via HTTP collector API enabled: ${HTTP_COLLECTOR_ENABLED:true} + grpc: + # Set to true to enable the GRPC collector + enabled: ${COLLECTOR_GRPC_ENABLED:false} kafka: # Kafka bootstrap broker list, comma-separated host:port values. Setting this activates the # Kafka 0.10+ collector. diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt new file mode 100644 index 00000000000..c87d133ef3d --- /dev/null +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zipkin2.server.internal + +import com.linecorp.armeria.server.Server +import com.squareup.wire.GrpcClient +import com.squareup.wire.Service +import com.squareup.wire.WireRpc +import kotlinx.coroutines.runBlocking +import okhttp3.OkHttpClient +import okhttp3.Protocol +import org.assertj.core.api.Assertions.assertThat +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit4.SpringRunner +import zipkin.server.ZipkinServer +import zipkin2.Span +import zipkin2.TestObjects +import zipkin2.codec.SpanBytesDecoder +import zipkin2.codec.SpanBytesEncoder +import zipkin2.proto3.ListOfSpans +import zipkin2.proto3.ReportResponse +import zipkin2.storage.InMemoryStorage + +@SpringBootTest(classes = [ZipkinServer::class], + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = ["spring.config.name=zipkin-server", "zipkin.collector.grpc.enabled=true"]) +@RunWith(SpringRunner::class) +// Written in Kotlin as Square Wire's grpc client is Kotlin. +// Also, the first author of this test wanted an excuse to write Kotlin. +class ITZipkinGrpcCollector { + @Autowired lateinit var storage: InMemoryStorage + @Autowired lateinit var server: Server + + var request = ListOfSpans.ADAPTER.decode(SpanBytesEncoder.PROTO3.encodeList(TestObjects.TRACE)) + lateinit var spanService: SpanService + + interface SpanService : Service { + @WireRpc( + path = "/zipkin.proto3.SpanService/Report", + requestAdapter = "zipkin2.proto3.ListOfSpans#ADAPTER", + responseAdapter = "zipkin2.proto3.ReportResponse#ADAPTER" + ) + suspend fun Report(request: ListOfSpans): ReportResponse + } + + @Before fun sanityCheckCodecCompatible() { + assertThat(SpanBytesDecoder.PROTO3.decodeList(request.encode())) + .containsExactlyElementsOf(TestObjects.TRACE) + } + + @Before fun createClient() { + spanService = GrpcClient.Builder() + .client(OkHttpClient.Builder().protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE)).build()) + .baseUrl("http://localhost:" + server.activePort().get().localAddress().port) + .build().create(SpanService::class) + } + + /** This tests that we accept messages constructed by other clients. */ + @Test fun report_withWireGrpcLibrary() { + runBlocking { + spanService.Report(request) // Result is effectively void + } + assertThat>(storage.traces) + .containsExactly(TestObjects.TRACE) + } +}