Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update StreamWriterV2 to support trace id #895

Merged
merged 8 commits into from
Mar 2, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Attach traceId.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
Expand Down Expand Up @@ -74,6 +72,11 @@ public class StreamWriterV2 implements AutoCloseable {
*/
private final long maxInflightBytes;

/*
* TraceId for debugging purpose.
*/
private final String traceId;

/*
* Tracks current inflight requests in the stream.
*/
Expand Down Expand Up @@ -143,6 +146,7 @@ private StreamWriterV2(Builder builder) throws IOException {
this.writerSchema = builder.writerSchema;
this.maxInflightRequests = builder.maxInflightRequest;
this.maxInflightBytes = builder.maxInflightBytes;
this.traceId = builder.traceId;
yayi-google marked this conversation as resolved.
Show resolved Hide resolved
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
if (builder.client == null) {
Expand Down Expand Up @@ -433,6 +437,9 @@ private AppendRowsRequest prepareRequestBasedOnPosition(
requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
}
requestBuilder.setWriteStream(this.streamName);
if (this.traceId != null) {
requestBuilder.setTraceId(this.traceId);
}
} else {
requestBuilder.clearWriteStream();
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
Expand Down Expand Up @@ -539,6 +546,8 @@ public static final class Builder {
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private String traceId = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -591,6 +600,20 @@ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
return this;
}

/**
* Sets traceId for debuging purpose. TraceId must follow the format of
* CustomerDomain:DebugString, e.g. DATAFLOW:job_id_x.
*/
public Builder setTraceId(String traceId) {
int colonIndex = traceId.indexOf(':');
if (colonIndex == -1 || colonIndex == 0 || colonIndex == traceId.length() - 1) {
throw new IllegalArgumentException(
"TraceId must follow the format of A:B. Actual:" + traceId);
}
this.traceId = traceId;
return this;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's perform some simple sanity check here.

On the server side, this needs to be something like key value pair like http header or A:B; is okay as well (we need to document it).

A simple processing at the client side will be splitting the trace id per the standard above if we can;
other wise convert it to UserTrace:xxx;

On top of it, adding the StreamWrite:Version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The traceId name is strictly screened in the backend. In order for dataflow to be recognized, it has to start from Dataflow:, so apart from this function, maybe also provide a setDataflowJobId to set it directly to Dataflow:job_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these requires backend support, which is not there yet.

I would also prefer to put all the logic you mentioned into the server side, because we have strong control of the codes there.

On the other hand, it is very hard to change the client library. So a free string seems most flexible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making dataflow do the right thing is also fine to me. Just that if they mess up, then we will have issues in debugging their traffic. The backend already parses these trace ids.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to perform such kind of logic in the server side, but here, let's perform some sanity check and makes sure that final trace id is in a good shape as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added check to make sure traceId follow the format of A:B.

}

/** Builds the {@code StreamWriterV2}. */
public StreamWriterV2 build() throws IOException {
return new StreamWriterV2(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
public class StreamWriterV2Test {
private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
private static MockServiceHelper serviceHelper;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void tearDown() throws Exception {
}

private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
return StreamWriterV2.newBuilder(TEST_STREAM, client).setTraceId(TEST_TRACE_ID).build();
}

private ProtoSchema createProtoSchema() {
Expand Down Expand Up @@ -184,10 +185,12 @@ private void verifyAppendRequests(long appendCount) {
// First request received by server should have schema and stream name.
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
assertEquals(serverRequest.getTraceId(), TEST_TRACE_ID);
} else {
// Following request should not have schema and stream name.
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
assertEquals(serverRequest.getWriteStream(), "");
assertEquals(serverRequest.getTraceId(), "");
}
}
}
Expand All @@ -209,7 +212,10 @@ public void testBuildBigQueryWriteClientInWriter() throws Exception {
@Test
public void testAppendWithRowsSuccess() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM, client).setWriterSchema(createProtoSchema()).build();
StreamWriterV2.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.build();

long appendCount = 100;
for (int i = 0; i < appendCount; i++) {
Expand Down Expand Up @@ -269,6 +275,34 @@ public void run() throws Throwable {
assertTrue(ex.getStatus().getDescription().contains("Writer schema must be provided"));
}

@Test
public void testInvalidTraceId() throws Exception {
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc");
}
});
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriterV2.newBuilder(TEST_STREAM).setTraceId("abc:");
}
});
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriterV2.newBuilder(TEST_STREAM).setTraceId(":abc");
}
});
}

@Test
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
Expand Down