Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update StreamWriterV2 to support trace id #895

Merged
merged 8 commits into from
Mar 2, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
*
* <p>TODO: Attach schema.
*
* <p>TODO: Attach traceId.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
Expand All @@ -69,6 +67,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final long maxInflightBytes;

/*
* TraceId for debugging purpose.
*/
private final String traceId;

/*
* Tracks current inflight requests in the stream.
*/
Expand Down Expand Up @@ -137,6 +140,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
this.traceId = builder.traceId;
yayi-google marked this conversation as resolved.
Show resolved Hide resolved
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
if (builder.client == null) {
Expand Down Expand Up @@ -381,6 +385,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
if (isFirstRequest) {
requestBuilder.setWriteStream(this.streamName);
if (this.traceId != null) {
requestBuilder.setTraceId(this.traceId);
}
} else {
requestBuilder.clearWriteStream();
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
Expand Down Expand Up @@ -485,6 +492,8 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private String traceId = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -531,6 +540,12 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
return this;
}

/** TraceId for debuging purpose. */
public Builder setTraceId(String traceId) {
this.traceId = traceId;
return this;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's perform some simple sanity check here.

On the server side, this needs to be something like key value pair like http header or A:B; is okay as well (we need to document it).

A simple processing at the client side will be splitting the trace id per the standard above if we can;
other wise convert it to UserTrace:xxx;

On top of it, adding the StreamWrite:Version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The traceId name is strictly screened in the backend. In order for dataflow to be recognized, it has to start from Dataflow:, so apart from this function, maybe also provide a setDataflowJobId to set it directly to Dataflow:job_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these requires backend support, which is not there yet.

I would also prefer to put all the logic you mentioned into the server side, because we have strong control of the codes there.

On the other hand, it is very hard to change the client library. So a free string seems most flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making dataflow do the right thing is also fine to me. Just that if they mess up, then we will have issues in debugging their traffic. The backend already parses these trace ids.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to perform such kind of logic in the server side, but here, let's perform some sanity check and makes sure that final trace id is in a good shape as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added check to make sure traceId follow the format of A:B.

}

/** Builds the {@code StreamWriterV2}. */
public StreamWriterV2 build() throws IOException {
return new StreamWriterV2(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
public class StreamWriterV2Test {
private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_TRACE_ID = "test trace id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
private static MockServiceHelper serviceHelper;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void tearDown() throws Exception {
}

private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
}

private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
Expand Down Expand Up @@ -205,10 +206,12 @@ public void testAppendSuccess() throws Exception {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
assertEquals(serverRequest.getTraceId(), "");
}
}

Expand Down