diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index f8f33e4..2fd73fd 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -20,7 +20,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <artifactId>benchmarks</artifactId> diff --git a/collector-pubsub/pom.xml b/collector-pubsub/pom.xml index c337823..b8bfbd4 100644 --- a/collector-pubsub/pom.xml +++ b/collector-pubsub/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/encoder-stackdriver-brave/pom.xml b/encoder-stackdriver-brave/pom.xml index 9b014de..11d9ed5 100644 --- a/encoder-stackdriver-brave/pom.xml +++ b/encoder-stackdriver-brave/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/encoder-stackdriver-zipkin/pom.xml b/encoder-stackdriver-zipkin/pom.xml index 11889e4..0b74560 100644 --- a/encoder-stackdriver-zipkin/pom.xml +++ b/encoder-stackdriver-zipkin/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/module/pom.xml b/module/pom.xml index 82dbadb..f45bf2c 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>io.zipkin.gcp</groupId> <artifactId>zipkin-gcp-parent</artifactId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <artifactId>zipkin-module-gcp</artifactId> diff --git a/pom.xml b/pom.xml index db94524..6a10a7b 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ <groupId>io.zipkin.gcp</groupId> <artifactId>zipkin-gcp-parent</artifactId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> <packaging>pom</packaging> <modules> @@ -75,7 +75,7 @@ <zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId> <!-- when updating, update docker/Dockerfile and storage/src/test/java/zipkin2/storage/kafka/IT* --> <zipkin.version>3.0.2</zipkin.version> - <zipkin-reporter.version>3.1.1</zipkin-reporter.version> + <zipkin-reporter.version>3.2.1</zipkin-reporter.version> <spring-boot.version>3.2.1</spring-boot.version> <!-- armeria.groupId allows you to test feature branches with jitpack --> <armeria.groupId>com.linecorp.armeria</armeria.groupId> diff --git a/propagation-stackdriver/pom.xml b/propagation-stackdriver/pom.xml index b9c8570..0f768d9 100644 --- a/propagation-stackdriver/pom.xml +++ b/propagation-stackdriver/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/sender-pubsub/pom.xml b/sender-pubsub/pom.xml index a9d4252..0daeba6 100644 --- a/sender-pubsub/pom.xml +++ b/sender-pubsub/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java index f194f44..0a49a4c 100644 --- a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -13,30 +13,22 @@ */ package zipkin2.reporter.pubsub; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; -import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.Topic; -import com.google.pubsub.v1.TopicName; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -public class PubSubSender extends Sender { +public class PubSubSender extends BytesMessageSender.Base { public static PubSubSender create(String topic) { return newBuilder().topic(topic).build(); @@ -109,7 +101,6 @@ public PubSubSender build() { if (topic == null) throw new NullPointerException("topic == null"); if (executorProvider == null) executorProvider = defaultExecutorProvider(); - ; if (publisher == null) { try { @@ -146,7 +137,6 @@ public Builder toBuilder() { final String topic; final int messageMaxBytes; - final Encoding encoding; final Publisher publisher; final ExecutorProvider executorProvider; final TopicAdminClient topicAdminClient; @@ -154,61 +144,43 @@ public Builder toBuilder() { volatile boolean closeCalled; PubSubSender(Builder builder) { - this.topic = builder.topic; - this.messageMaxBytes = builder.messageMaxBytes; - this.encoding = builder.encoding; - this.publisher = builder.publisher; - this.executorProvider = builder.executorProvider; - this.topicAdminClient = builder.topicAdminClient; + super(builder.encoding); + topic = builder.topic; + messageMaxBytes = builder.messageMaxBytes; + publisher = builder.publisher; + executorProvider = builder.executorProvider; + topicAdminClient = builder.topicAdminClient; } - /** - * If no permissions given sent back ok, f permissions and topic exist ok, if topic does not exist error - * - * @return - */ - @Override - public CheckResult check() { - try { - Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic)); - return CheckResult.OK; - } catch (ApiException e) { - return CheckResult.failed(e); - } - } - - @Override public Encoding encoding() { - return encoding; - } - - @Override - public int messageMaxBytes() { + @Override public int messageMaxBytes() { return messageMaxBytes; } - @Override - public int messageSizeInBytes(List<byte[]> bytes) { - return encoding().listSizeInBytes(bytes); - } - - @Override - public Call<Void> sendSpans(List<byte[]> byteList) { - if (closeCalled) throw new IllegalStateException("closed"); + @Override public void send(List<byte[]> byteList) throws IOException { + if (closeCalled) throw new ClosedSenderException(); byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList); - PubsubMessage pubsubMessage = + PubsubMessage message = PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build(); - return new PubSubCall(pubsubMessage); + try { + publisher.publish(message).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) throw (RuntimeException) cause; + if (cause instanceof Error) throw (Error) cause; + throw new RuntimeException(cause); + } } /** - * Shutdown on Publisher is not async thus moving the synchronized block to another function in order not to block until the shutdown is over - * - * @throws IOException + * Shutdown on Publisher is not async thus moving the synchronized block to another function in + * order not to block until the shutdown is over. */ - @Override - public void close() throws IOException { + @Override public void close() { if (!setClosed()) { return; } @@ -227,68 +199,4 @@ private synchronized boolean setClosed() { @Override public final String toString() { return "PubSubSender{topic=" + topic + "}"; } - - class PubSubCall extends Call.Base<Void> { - private final PubsubMessage message; - volatile ApiFuture<String> future; - - public PubSubCall(PubsubMessage message) { - this.message = message; - } - - @Override - protected Void doExecute() throws IOException { - try { - publisher.publish(message).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - return null; - } - - @Override - protected void doEnqueue(Callback<Void> callback) { - future = publisher.publish(message); - ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback), - executorProvider.getExecutor()); - if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); - } - - @Override - protected void doCancel() { - Future<String> maybeFuture = future; - if (maybeFuture != null) maybeFuture.cancel(true); - } - - @Override - protected boolean doIsCanceled() { - Future<String> maybeFuture = future; - return maybeFuture != null && maybeFuture.isCancelled(); - } - - @Override - public Call<Void> clone() { - PubsubMessage clone = PubsubMessage.newBuilder(message).build(); - return new PubSubCall(clone); - } - } - - static final class ApiFutureCallbackAdapter implements ApiFutureCallback<String> { - - final Callback<Void> callback; - - public ApiFutureCallbackAdapter(Callback<Void> callback) { - this.callback = callback; - } - - @Override - public void onFailure(Throwable t) { - callback.onError(t); - } - - @Override - public void onSuccess(String result) { - callback.onSuccess(null); - } - } } diff --git a/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java index 6c80592..d09c80b 100644 --- a/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java +++ b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java @@ -54,13 +54,12 @@ import org.mockito.junit.jupiter.MockitoExtension; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.Encoding; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static zipkin2.TestObjects.CLIENT_SPAN; @@ -116,9 +115,8 @@ private InstantiatingExecutorProvider testExecutorProvider() { .build(); } - @Test void sendsSpans() throws Exception { - ArgumentCaptor<PublishRequest> requestCaptor = - ArgumentCaptor.forClass(PublishRequest.class); + @Test void send() throws Exception { + ArgumentCaptor<PublishRequest> requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver<PublishResponse> responseObserver = invocationOnMock.getArgument(1); @@ -128,15 +126,14 @@ private InstantiatingExecutorProvider testExecutorProvider() { return null; }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(extractSpans(requestCaptor.getValue())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { - ArgumentCaptor<PublishRequest> requestCaptor = - ArgumentCaptor.forClass(PublishRequest.class); + @Test void send_empty() throws Exception { + ArgumentCaptor<PublishRequest> requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver<PublishResponse> responseObserver = invocationOnMock.getArgument(1); @@ -146,17 +143,14 @@ private InstantiatingExecutorProvider testExecutorProvider() { return null; }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(); assertThat(extractSpans(requestCaptor.getValue())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + .isEmpty(); } - @Test void sendsSpans_json_unicode() throws Exception { - ArgumentCaptor<PublishRequest> requestCaptor = - ArgumentCaptor.forClass(PublishRequest.class); + @Test void send_PROTO3() throws Exception { + ArgumentCaptor<PublishRequest> requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver<PublishResponse> responseObserver = invocationOnMock.getArgument(1); @@ -166,39 +160,42 @@ private InstantiatingExecutorProvider testExecutorProvider() { return null; }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); - send(unicode).execute(); + sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - assertThat(extractSpans(requestCaptor.getValue())).containsExactly(unicode); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); + + assertThat(extractSpans(requestCaptor.getValue())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void checkPasses() throws Exception { - ArgumentCaptor<GetTopicRequest> captor = - ArgumentCaptor.forClass(GetTopicRequest.class); + @Test void send_json_unicode() throws Exception { + ArgumentCaptor<PublishRequest> requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { - StreamObserver<Topic> responseObserver = invocationOnMock.getArgument(1); - responseObserver.onNext(Topic.newBuilder().setName("topic-name").build()); + StreamObserver<PublishResponse> responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext( + PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); responseObserver.onCompleted(); return null; - }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - CheckResult result = sender.check(); - assertThat(result.ok()).isTrue(); + Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); + sendSpans(unicode); + + assertThat(extractSpans(requestCaptor.getValue())).containsExactly(unicode); } - @Test void checkFailsWithStreamNotActive() throws Exception { - ArgumentCaptor<GetTopicRequest> captor = - ArgumentCaptor.forClass(GetTopicRequest.class); + @Test void sendFailsWithStreamNotActive() { + ArgumentCaptor<PublishRequest> requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver<Topic> responseObserver = invocationOnMock.getArgument(1); responseObserver.onError(new io.grpc.StatusRuntimeException(Status.NOT_FOUND)); return null; - }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - CheckResult result = sender.check(); - assertThat(result.error()).isInstanceOf(ApiException.class); + assertThatThrownBy(this::sendSpans) + .isInstanceOf(ApiException.class); } private List<Span> extractSpans(PublishRequest publishRequest) { @@ -217,9 +214,9 @@ Stream<Span> extractSpans(PubsubMessage pubsubMessage) { return SpanBytesDecoder.PROTO3.decodeList(messageBytes).stream(); } - Call<Void> send(zipkin2.Span... spans) { + void sendSpans(zipkin2.Span... spans) throws Exception { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } } diff --git a/sender-stackdriver/pom.xml b/sender-stackdriver/pom.xml index ff6d2e4..bbf2be3 100644 --- a/sender-stackdriver/pom.xml +++ b/sender-stackdriver/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/AwaitableUnaryClientCallListener.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/AwaitableUnaryClientCallListener.java similarity index 97% rename from sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/AwaitableUnaryClientCallListener.java rename to sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/AwaitableUnaryClientCallListener.java index 0ff93db..852d66f 100644 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/AwaitableUnaryClientCallListener.java +++ b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/AwaitableUnaryClientCallListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin2.reporter.stackdriver.internal; +package zipkin2.reporter.stackdriver; import io.grpc.ClientCall; import io.grpc.Metadata; diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java index 16c3ba8..53fbb0d 100644 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java +++ b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java @@ -22,23 +22,22 @@ import com.google.protobuf.UnsafeByteOperations; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; +import io.grpc.Metadata; import java.io.IOException; import java.util.List; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -import zipkin2.reporter.stackdriver.internal.UnaryClientCall; import static com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag; import static io.grpc.CallOptions.DEFAULT; -import static zipkin2.reporter.stackdriver.internal.UnaryClientCall.DEFAULT_SERVER_TIMEOUT_MS; -public final class StackdriverSender extends Sender { +public final class StackdriverSender extends BytesMessageSender.Base { + static final int DEFAULT_SERVER_TIMEOUT_MS = 5000; + public static Builder newBuilder() { ManagedChannel channel = ManagedChannelBuilder.forTarget("cloudtrace.googleapis.com").build(); Builder result = newBuilder(channel); @@ -100,9 +99,8 @@ public StackdriverSender build() { final int spanNameFieldSize; final long serverResponseTimeoutMs; - final BatchWriteSpansCall healthcheckCall; - StackdriverSender(Builder builder) { + super(Encoding.PROTO3); channel = builder.channel; callOptions = builder.callOptions; projectName = ByteString.copyFromUtf8("projects/" + builder.projectId); @@ -117,26 +115,13 @@ public StackdriverSender build() { spanNameFieldSize = CodedOutputStream.computeTagSize(1) + CodedOutputStream.computeUInt32SizeNoTag(spanNameSize) + spanNameSize; - - BatchWriteSpansRequest healthcheckRequest = BatchWriteSpansRequest.newBuilder() - .setNameBytes(projectName) - .addSpans(Span.newBuilder().build()) - .build(); - healthcheckCall = new BatchWriteSpansCall(healthcheckRequest); } - @Override - public Encoding encoding() { - return Encoding.PROTO3; - } - - @Override - public int messageMaxBytes() { + @Override public int messageMaxBytes() { return 1024 * 1024; // 1 MiB for now } - @Override - public int messageSizeInBytes(List<byte[]> traceIdPrefixedSpans) { + @Override public int messageSizeInBytes(List<byte[]> traceIdPrefixedSpans) { int length = traceIdPrefixedSpans.size(); if (length == 0) return 0; if (length == 1) return messageSizeInBytes(traceIdPrefixedSpans.get(0).length); @@ -149,19 +134,15 @@ public int messageSizeInBytes(List<byte[]> traceIdPrefixedSpans) { return size; } - @Override - public int messageSizeInBytes(int traceIdPrefixedSpanSize) { + @Override public int messageSizeInBytes(int traceIdPrefixedSpanSize) { return projectNameFieldSize + spanFieldSize(traceIdPrefixedSpanSize); } /** close is typically called from a different thread */ volatile boolean closeCalled; - @Override - public Call<Void> sendSpans(List<byte[]> traceIdPrefixedSpans) { - if (closeCalled) throw new IllegalStateException("closed"); - int length = traceIdPrefixedSpans.size(); - if (length == 0) return Call.create(null); + @Override public void send(List<byte[]> traceIdPrefixedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); BatchWriteSpansRequest.Builder request = BatchWriteSpansRequest.newBuilder() .setNameBytes(projectName); @@ -169,42 +150,28 @@ public Call<Void> sendSpans(List<byte[]> traceIdPrefixedSpans) { request.addSpans(parseTraceIdPrefixedSpan(traceIdPrefixedSpan, spanNameSize, traceIdPrefix)); } - return new BatchWriteSpansCall(request.build()).map(EmptyToVoid.INSTANCE); - } + ClientCall<BatchWriteSpansRequest, Empty> call = + channel.newCall(TraceServiceGrpc.getBatchWriteSpansMethod(), callOptions); - /** - * Sends a malformed call to Stackdriver Trace to validate service health. - * - * @return successful status if Stackdriver Trace API responds with expected validation - * error (or happens to respond as success -- unexpected but okay); otherwise returns error status - * wrapping the underlying exception. - */ - @Override - public CheckResult check() { + AwaitableUnaryClientCallListener<Empty> listener = + new AwaitableUnaryClientCallListener<>(serverResponseTimeoutMs); try { - healthcheckCall.clone().execute(); - } catch (StatusRuntimeException sre) { - if (sre.getStatus().getCode() == Status.Code.INVALID_ARGUMENT) { - return CheckResult.OK; - } - return CheckResult.failed(sre); - } catch (Exception e) { - return CheckResult.failed(e); + call.start(listener, new Metadata()); + call.request(1); + call.sendMessage(request.build()); + call.halfClose(); + } catch (RuntimeException | Error t) { + call.cancel(null, t); + throw t; } - - // Currently the rpc throws a validation exception on malformed input, which we handle above. - // If we get here despite the known malformed input, the implementation changed and we need to - // update this check. It's unlikely enough that we can wait and see. - return CheckResult.OK; + listener.await(); } - @Override - public final String toString() { + @Override public String toString() { return "StackdriverSender{" + projectName.toStringUtf8() + "}"; } - @Override - public void close() { + @Override public void close() { if (!shutdownChannelOnClose) return; if (closeCalled) return; closeCalled = true; @@ -244,31 +211,4 @@ int spanFieldSize(int traceIdPrefixedSpanSize) { return CodedOutputStream.computeTagSize(2) + computeUInt32SizeNoTag(sizeOfSpanMessage) + sizeOfSpanMessage; } - - final class BatchWriteSpansCall extends UnaryClientCall<BatchWriteSpansRequest, Empty> { - - BatchWriteSpansCall(BatchWriteSpansRequest request) { - super(channel, TraceServiceGrpc.getBatchWriteSpansMethod(), callOptions, request, - serverResponseTimeoutMs); - } - - @Override - public String toString() { - return "BatchWriteSpansCall{" + request() + "}"; - } - - @Override - public BatchWriteSpansCall clone() { - return new BatchWriteSpansCall(request()); - } - } - - enum EmptyToVoid implements Call.Mapper<Empty, Void> { - INSTANCE { - @Override - public Void map(Empty empty) { - return null; - } - } - } } diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java deleted file mode 100644 index ab8e9d0..0000000 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.stackdriver.internal; - -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.Status; -import zipkin2.reporter.Callback; - -final class CallbackToUnaryClientCallListener<RespT> extends ClientCall.Listener<RespT> { - private final Callback<RespT> callback; - /** this differentiates between not yet set and null */ - boolean valueSet; // guarded by this - - RespT value; // guarded by this - - CallbackToUnaryClientCallListener(Callback<RespT> callback) { - this.callback = callback; - } - - @Override - public void onHeaders(Metadata headers) { - } - - @Override - public synchronized void onMessage(RespT value) { - if (valueSet) { - throw Status.INTERNAL - .withDescription("More than one value received for unary call") - .asRuntimeException(); - } - valueSet = true; - this.value = value; - } - - @Override - public synchronized void onClose(Status status, Metadata trailers) { - if (status.isOk()) { - if (!valueSet) { - callback.onError( - Status.INTERNAL - .withDescription("No value received for unary call") - .asRuntimeException(trailers)); - } - callback.onSuccess(value); - } else { - callback.onError(status.asRuntimeException(trailers)); - } - } -} diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java deleted file mode 100644 index 91f5972..0000000 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.stackdriver.internal; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import java.io.IOException; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; - -public abstract class UnaryClientCall<ReqT, RespT> extends Call.Base<RespT> { - public static final int DEFAULT_SERVER_TIMEOUT_MS = 5000; - final ClientCall<ReqT, RespT> call; - final ReqT request; - final long serverTimeoutMs; - - protected UnaryClientCall( - Channel channel, - MethodDescriptor<ReqT, RespT> descriptor, - CallOptions callOptions, - ReqT request, - long serverTimeoutMs) { - this.call = channel.newCall(descriptor, callOptions); - this.request = request; - this.serverTimeoutMs = serverTimeoutMs; - } - - protected final ReqT request() { - return request; - } - - @Override - protected final RespT doExecute() throws IOException { - AwaitableUnaryClientCallListener<RespT> listener = - new AwaitableUnaryClientCallListener<>(this.serverTimeoutMs); - beginUnaryCall(listener); - return listener.await(); - } - - @Override - protected final void doEnqueue(Callback<RespT> callback) { - ClientCall.Listener<RespT> listener = new CallbackToUnaryClientCallListener<>(callback); - try { - beginUnaryCall(listener); - } catch (RuntimeException | Error t) { - callback.onError(t); - throw t; - } - } - - void beginUnaryCall(ClientCall.Listener<RespT> listener) { - try { - call.start(listener, new Metadata()); - call.request(1); - call.sendMessage(request); - call.halfClose(); - } catch (RuntimeException | Error t) { - call.cancel(null, t); - throw t; - } - } - - @Override - protected final void doCancel() { - call.cancel(null, null); - } -} diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java index 2a24481..20fd481 100644 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java +++ b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java @@ -71,13 +71,21 @@ class AsyncReporterStackdriverSenderTest { .build(StackdriverEncoder.V2); } - @Test void sendSpans_empty() { + @Test void send_empty() { reporter.flush(); - verify(traceService, never()).batchWriteSpans(any(), any()); + ArgumentCaptor<BatchWriteSpansRequest> requestCaptor = + ArgumentCaptor.forClass(BatchWriteSpansRequest.class); + + verify(traceService).batchWriteSpans(requestCaptor.capture(), any()); + + BatchWriteSpansRequest request = requestCaptor.getValue(); + assertThat(request.getName()).isEqualTo("projects/" + projectId); + + assertThat(request.getSpansList()).isEmpty(); } - @Test void sendSpans() { + @Test void send() { onClientCall( observer -> { observer.onNext(Empty.getDefaultInstance()); diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java index 287c8f7..e3a8730 100644 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java +++ b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java @@ -52,8 +52,7 @@ public class ITStackdriverSender { AsyncReporter<Span> reporterNoPermission; TraceServiceGrpc.TraceServiceBlockingStub traceServiceGrpcV1; - @BeforeEach - public void setUp() throws IOException { + @BeforeEach void setUp() throws IOException { // Application Default credential is configured using the GOOGLE_APPLICATION_CREDENTIALS env var // See: https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application @@ -90,8 +89,7 @@ public void setUp() throws IOException { .build(StackdriverEncoder.V2); } - @AfterEach - public void tearDown() { + @AfterEach void tearDown() { if (reporter != null) { reporter.close(); } diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java index 8d9396d..721d41a 100644 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java +++ b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java @@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -37,11 +38,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import zipkin2.Span; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.stackdriver.zipkin.StackdriverEncoder; import zipkin2.translation.stackdriver.SpanTranslator; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -82,7 +83,7 @@ class StackdriverSenderTest { observer.onCompleted(); }); - sender.sendSpans(encodedSpans).execute(); + sender.send(encodedSpans); // verify our estimate is correct int actualSize = takeRequest().getSerializedSize(); @@ -124,7 +125,7 @@ void verifyRequestSent(List<Span> spans) throws IOException { List<byte[]> encodedSpans = spans.stream().map(StackdriverEncoder.V2::encode).collect(Collectors.toList()); - sender.sendSpans(encodedSpans).execute(); + sender.send(encodedSpans); BatchWriteSpansRequest request = takeRequest(); @@ -139,41 +140,33 @@ void verifyRequestSent(List<Span> spans) throws IOException { assertThat(sender.messageSizeInBytes(encodedSpans)).isEqualTo(actualSize); } - @Test void verifyCheckReturnsFailureWhenServiceFailsWithKnownGrpcFailure() { + @Test void sendFailureWhenServiceFailsWithKnownGrpcFailure() { onClientCall(observer -> { observer.onError(new StatusRuntimeException(Status.RESOURCE_EXHAUSTED)); }); - CheckResult result = sender.check(); - assertThat(result.ok()).isFalse(); - assertThat(result.error()) + + assertThatThrownBy(() -> sender.send(Collections.emptyList())) .isInstanceOf(StatusRuntimeException.class) .hasMessageContaining("RESOURCE_EXHAUSTED"); } - @Test void verifyCheckReturnsFailureWhenServiceFailsForUnknownReason() { + @Test void sendFailureWhenServiceFailsForUnknownReason() { onClientCall(observer -> { observer.onError(new RuntimeException("oh no")); }); - CheckResult result = sender.check(); - assertThat(result.ok()).isFalse(); - assertThat(result.error()) + + assertThatThrownBy(() -> sender.send(Collections.emptyList())) .isInstanceOf(RuntimeException.class) .hasMessageContaining("UNKNOWN"); } - @Test void verifyCheckReturnsOkWhenExpectedValidationFailure() { - onClientCall(observer -> { - observer.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - }); - assertThat(sender.check()).isSameAs(CheckResult.OK); - } - - @Test void verifyCheckReturnsOkWhenServiceSucceeds() { + @Test void send_empty() throws IOException { onClientCall(observer -> { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); }); - assertThat(sender.check()).isSameAs(CheckResult.OK); + + sender.send(Collections.emptyList()); } void onClientCall(Consumer<StreamObserver<Empty>> onClientCall) { diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java deleted file mode 100644 index cdf416d..0000000 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.stackdriver.internal; - -import com.asarkar.grpc.test.GrpcCleanupExtension; -import com.asarkar.grpc.test.Resources; -import com.google.devtools.cloudtrace.v2.BatchWriteSpansRequest; -import com.google.devtools.cloudtrace.v2.TraceServiceGrpc; -import com.google.protobuf.Empty; -import io.grpc.Channel; -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.StatusRuntimeException; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.stub.StreamObserver; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.stubbing.Answer; -import zipkin2.reporter.Callback; - -import static io.grpc.CallOptions.DEFAULT; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static zipkin2.reporter.stackdriver.internal.UnaryClientCall.DEFAULT_SERVER_TIMEOUT_MS; - -@ExtendWith(GrpcCleanupExtension.class) -class UnaryClientCallTest { - final TestTraceService traceService = spy(new TestTraceService()); - - static class BatchWriteSpansCall extends UnaryClientCall<BatchWriteSpansRequest, Empty> { - final Channel channel; - - BatchWriteSpansCall(Channel channel, BatchWriteSpansRequest request, - long serverResponseTimeout) { - super(channel, TraceServiceGrpc.getBatchWriteSpansMethod(), DEFAULT, request, - serverResponseTimeout); - this.channel = channel; - } - - @Override - public BatchWriteSpansCall clone() { - return new BatchWriteSpansCall(channel, request(), DEFAULT_SERVER_TIMEOUT_MS); - } - } - - BatchWriteSpansCall call; - - @BeforeEach void setUp(Resources resources) throws Exception { - String serverName = InProcessServerBuilder.generateName(); - - Server server = InProcessServerBuilder - .forName(serverName) - .directExecutor() - .addService(traceService) - .build().start(); - resources.register(server, Duration.ofSeconds(10)); // shutdown deadline - - ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - resources.register(channel, Duration.ofSeconds(10));// close deadline - - call = new BatchWriteSpansCall(channel, BatchWriteSpansRequest.newBuilder().build(), - DEFAULT_SERVER_TIMEOUT_MS); - } - - @Test void execute_success() throws Throwable { - onClientCall( - observer -> { - observer.onNext(Empty.getDefaultInstance()); - observer.onCompleted(); - }); - - call.execute(); - - verifyPatchRequestSent(); - } - - @Test void enqueue_success() throws Throwable { - onClientCall( - observer -> { - observer.onNext(Empty.getDefaultInstance()); - observer.onCompleted(); - }); - - awaitCallbackResult(); - - verifyPatchRequestSent(); - } - - void verifyPatchRequestSent() { - ArgumentCaptor<BatchWriteSpansRequest> requestCaptor = - ArgumentCaptor.forClass(BatchWriteSpansRequest.class); - - verify(traceService).batchWriteSpans(requestCaptor.capture(), any()); - - BatchWriteSpansRequest request = requestCaptor.getValue(); - assertThat(request).isEqualTo(BatchWriteSpansRequest.getDefaultInstance()); - } - - @Test void accept_execute_serverError() throws Throwable { - assertThrows(StatusRuntimeException.class, () -> { - onClientCall(observer -> observer.onError(new IllegalStateException())); - - call.execute(); - }); - } - - @Test void accept_enqueue_serverError() throws Throwable { - assertThrows(StatusRuntimeException.class, () -> { - onClientCall(observer -> observer.onError(new IllegalStateException())); - - awaitCallbackResult(); - }); - } - - @Test void execute_timeout() throws Throwable { - assertThrows(IllegalStateException.class, () -> { - long overriddenTimeout = 50; - call = new BatchWriteSpansCall(call.channel, BatchWriteSpansRequest.newBuilder().build(), - overriddenTimeout); - onClientCall( - observer -> - Executors.newSingleThreadExecutor().submit(() -> - { - try { - Thread.sleep(overriddenTimeout + 10); - } catch (InterruptedException e) { - } - observer.onCompleted(); - })); - - call.execute(); - }); - } - - static class TestTraceService extends TraceServiceGrpc.TraceServiceImplBase { - } - - void awaitCallbackResult() throws Throwable { - AtomicReference<Throwable> ref = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - call.enqueue( - new Callback<Empty>() { - @Override - public void onSuccess(Empty empty) { - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - ref.set(throwable); - latch.countDown(); - } - }); - latch.await(10, TimeUnit.MILLISECONDS); - if (ref.get() != null) throw ref.get(); - } - - void onClientCall(Consumer<StreamObserver<Empty>> onClientCall) { - doAnswer( - (Answer<Void>) - invocationOnMock -> { - StreamObserver<Empty> observer = - ((StreamObserver) invocationOnMock.getArguments()[1]); - onClientCall.accept(observer); - return null; - }) - .when(traceService) - .batchWriteSpans(any(BatchWriteSpansRequest.class), any(StreamObserver.class)); - } -} diff --git a/storage-stackdriver/pom.xml b/storage-stackdriver/pom.xml index 5cde510..3b257eb 100644 --- a/storage-stackdriver/pom.xml +++ b/storage-stackdriver/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> diff --git a/translation-stackdriver/pom.xml b/translation-stackdriver/pom.xml index c1b98c5..76a74c9 100644 --- a/translation-stackdriver/pom.xml +++ b/translation-stackdriver/pom.xml @@ -18,7 +18,7 @@ <parent> <artifactId>zipkin-gcp-parent</artifactId> <groupId>io.zipkin.gcp</groupId> - <version>2.1.2-SNAPSHOT</version> + <version>2.2.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>