Skip to content

Commit

Permalink
migrates to zipkin-reporter 3.2 BytesMessageSender (#214)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Jan 14, 2024
1 parent 31ab49f commit 01e0aec
Show file tree
Hide file tree
Showing 21 changed files with 127 additions and 619 deletions.
2 changes: 1 addition & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>benchmarks</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion collector-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion encoder-stackdriver-brave/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion encoder-stackdriver-zipkin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>io.zipkin.gcp</groupId>
<artifactId>zipkin-gcp-parent</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-module-gcp</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>io.zipkin.gcp</groupId>
<artifactId>zipkin-gcp-parent</artifactId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
Expand Down Expand Up @@ -75,7 +75,7 @@
<zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId>
<!-- when updating, update docker/Dockerfile and storage/src/test/java/zipkin2/storage/kafka/IT* -->
<zipkin.version>3.0.2</zipkin.version>
<zipkin-reporter.version>3.1.1</zipkin-reporter.version>
<zipkin-reporter.version>3.2.1</zipkin-reporter.version>
<spring-boot.version>3.2.1</spring-boot.version>
<!-- armeria.groupId allows you to test feature branches with jitpack -->
<armeria.groupId>com.linecorp.armeria</armeria.groupId>
Expand Down
2 changes: 1 addition & 1 deletion propagation-stackdriver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion sender-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>zipkin-gcp-parent</artifactId>
<groupId>io.zipkin.gcp</groupId>
<version>2.1.2-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
148 changes: 28 additions & 120 deletions sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,22 @@
*/
package zipkin2.reporter.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.Call;
import zipkin2.reporter.Callback;
import zipkin2.reporter.CheckResult;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.Sender;

public class PubSubSender extends Sender {
public class PubSubSender extends BytesMessageSender.Base {

public static PubSubSender create(String topic) {
return newBuilder().topic(topic).build();
Expand Down Expand Up @@ -109,7 +101,6 @@ public PubSubSender build() {
if (topic == null) throw new NullPointerException("topic == null");

if (executorProvider == null) executorProvider = defaultExecutorProvider();
;

if (publisher == null) {
try {
Expand Down Expand Up @@ -146,69 +137,50 @@ public Builder toBuilder() {

final String topic;
final int messageMaxBytes;
final Encoding encoding;
final Publisher publisher;
final ExecutorProvider executorProvider;
final TopicAdminClient topicAdminClient;

volatile boolean closeCalled;

PubSubSender(Builder builder) {
this.topic = builder.topic;
this.messageMaxBytes = builder.messageMaxBytes;
this.encoding = builder.encoding;
this.publisher = builder.publisher;
this.executorProvider = builder.executorProvider;
this.topicAdminClient = builder.topicAdminClient;
super(builder.encoding);
topic = builder.topic;
messageMaxBytes = builder.messageMaxBytes;
publisher = builder.publisher;
executorProvider = builder.executorProvider;
topicAdminClient = builder.topicAdminClient;
}

/**
* If no permissions given sent back ok, f permissions and topic exist ok, if topic does not exist error
*
* @return
*/
@Override
public CheckResult check() {
try {
Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic));
return CheckResult.OK;
} catch (ApiException e) {
return CheckResult.failed(e);
}
}

@Override public Encoding encoding() {
return encoding;
}

@Override
public int messageMaxBytes() {
@Override public int messageMaxBytes() {
return messageMaxBytes;
}

@Override
public int messageSizeInBytes(List<byte[]> bytes) {
return encoding().listSizeInBytes(bytes);
}

@Override
public Call<Void> sendSpans(List<byte[]> byteList) {
if (closeCalled) throw new IllegalStateException("closed");
@Override public void send(List<byte[]> byteList) throws IOException {
if (closeCalled) throw new ClosedSenderException();

byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList);
PubsubMessage pubsubMessage =
PubsubMessage message =
PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build();

return new PubSubCall(pubsubMessage);
try {
publisher.publish(message).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) throw (RuntimeException) cause;
if (cause instanceof Error) throw (Error) cause;
throw new RuntimeException(cause);
}
}

/**
* Shutdown on Publisher is not async thus moving the synchronized block to another function in order not to block until the shutdown is over
*
* @throws IOException
* Shutdown on Publisher is not async thus moving the synchronized block to another function in
* order not to block until the shutdown is over.
*/
@Override
public void close() throws IOException {
@Override public void close() {
if (!setClosed()) {
return;
}
Expand All @@ -227,68 +199,4 @@ private synchronized boolean setClosed() {
@Override public final String toString() {
return "PubSubSender{topic=" + topic + "}";
}

class PubSubCall extends Call.Base<Void> {
private final PubsubMessage message;
volatile ApiFuture<String> future;

public PubSubCall(PubsubMessage message) {
this.message = message;
}

@Override
protected Void doExecute() throws IOException {
try {
publisher.publish(message).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return null;
}

@Override
protected void doEnqueue(Callback<Void> callback) {
future = publisher.publish(message);
ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback),
executorProvider.getExecutor());
if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans");
}

@Override
protected void doCancel() {
Future<String> maybeFuture = future;
if (maybeFuture != null) maybeFuture.cancel(true);
}

@Override
protected boolean doIsCanceled() {
Future<String> maybeFuture = future;
return maybeFuture != null && maybeFuture.isCancelled();
}

@Override
public Call<Void> clone() {
PubsubMessage clone = PubsubMessage.newBuilder(message).build();
return new PubSubCall(clone);
}
}

static final class ApiFutureCallbackAdapter implements ApiFutureCallback<String> {

final Callback<Void> callback;

public ApiFutureCallbackAdapter(Callback<Void> callback) {
this.callback = callback;
}

@Override
public void onFailure(Throwable t) {
callback.onError(t);
}

@Override
public void onSuccess(String result) {
callback.onSuccess(null);
}
}
}
Loading

0 comments on commit 01e0aec

Please sign in to comment.