Skip to content

Commit

Permalink
feat: Add schema comparision to the main request loop for multiplexin…
Browse files Browse the repository at this point in the history
…g to correctly update schema (#1865)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

* feat: some fixes for multiplexing client

* feat: fix some todos, and reject the mixed behavior of passed in client or not

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: fix the bug that we may peek into the write_stream field but it's
possible the proto schema does not contain this field

* feat: add getInflightWaitSeconds implementation

* feat: Add schema comparision in connection loop to ensure schema update for
the same stream name can be notified

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] authored Nov 4, 2022
1 parent 5699122 commit cb18d28
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.1.3')
implementation platform('com.google.cloud:libraries-bom:26.1.4')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ private void appendLoop() {
&& !streamName.isEmpty()
&& !originalRequest.getWriteStream().equals(streamName)) {
streamName = originalRequest.getWriteStream();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForDestinationSwitch = true;
}
Expand All @@ -470,17 +469,22 @@ private void appendLoop() {
if (this.traceId != null) {
originalRequestBuilder.setTraceId(this.traceId);
}
firstRequestForDestinationSwitch = false;
} else if (isMultiplexing) {
// If we are not at the first request after table switch, but we are in multiplexing
// mode, we only need the stream name but not the schema in the request.
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
} else {
// If we are not at the first request or in multiplexing, create request with no schema
// and no stream name.
} else if (!isMultiplexing) {
// If we are not in multiplexing and not in the first request, clear the stream name.
originalRequestBuilder.clearWriteStream();
}

// We don't use message differencer to speed up the comparing process.
// `equals(...)` can bring us false positive, e.g. two repeated field can be considered the
// same but is not considered equals(). However as long as it's never provide false negative
// we will always correctly pass writer schema to backend.
if (firstRequestForDestinationSwitch
|| !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) {
writerSchema = originalRequest.getProtoRows().getWriterSchema();
} else {
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
}
firstRequestForDestinationSwitch = false;

// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,118 @@ public void testMultiplexedAppendSuccess() throws Exception {
}
}

@Test
public void testAppendInSameStream_switchSchema() throws Exception {
try (ConnectionWorker connectionWorker = createConnectionWorker()) {
long appendCount = 20;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();

// Schema1 and schema2 are the same content, but different instance.
ProtoSchema schema1 = createProtoSchema("foo");
ProtoSchema schema2 = createProtoSchema("foo");
// Schema3 is a different schema
ProtoSchema schema3 = createProtoSchema("bar");

// We do a pattern of:
// send to stream1, schema1
// send to stream1, schema2
// send to stream1, schema3
// send to stream1, schema3
// send to stream1, schema1
// ...
for (long i = 0; i < appendCount; i++) {
switch ((int) i % 4) {
case 0:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema1,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
case 1:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema2,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
case 2:
case 3:
futures.add(
sendTestMessage(
connectionWorker,
TEST_STREAM_1,
schema3,
createFooProtoRows(new String[] {String.valueOf(i)}),
i));
break;
default: // fall out
break;
}
}
// In the real world the response won't contain offset for default stream, but we use offset
// here just to test response.
for (int i = 0; i < appendCount; i++) {
Int64Value offset = futures.get(i).get().getAppendResult().getOffset();
assertThat(offset).isEqualTo(Int64Value.of(i));
}
assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount);
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
assertThat(serverRequest.getProtoRows().getRows().getSerializedRowsCount())
.isGreaterThan(0);
assertThat(serverRequest.getOffset().getValue()).isEqualTo(i);

// We will get the request as the pattern of:
// (writer_stream: t1, schema: schema1)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema3)
// (writer_stream: _, schema: _)
// (writer_stream: _, schema: schema1)
// (writer_stream: _, schema: _)
switch (i % 4) {
case 0:
if (i == 0) {
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
}
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("foo");
break;
case 1:
assertThat(serverRequest.getWriteStream()).isEmpty();
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
case 2:
assertThat(serverRequest.getWriteStream()).isEmpty();
// Schema is populated after table switch.
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("bar");
break;
case 3:
assertThat(serverRequest.getWriteStream()).isEmpty();
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
break;
default: // fall out
break;
}
}

assertThat(connectionWorker.getLoad().destinationCount()).isEqualTo(1);
assertThat(connectionWorker.getLoad().inFlightRequestsBytes()).isEqualTo(0);
}
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
@RunWith(JUnit4.class)
public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_2 = "projects/p/datasets/d/tables/t/streams/s";
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/s2";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
Expand Down Expand Up @@ -112,13 +112,17 @@ private StreamWriter getTestStreamWriter() throws IOException {
}

private ProtoSchema createProtoSchema() {
return createProtoSchema("foo");
}

private ProtoSchema createProtoSchema(String fieldName) {
return ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProtos.DescriptorProto.newBuilder()
.setName("Message")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("foo")
.setName(fieldName)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
.setNumber(1)
.build())
Expand Down Expand Up @@ -562,6 +566,107 @@ public void testOneMaxInflightRequests_MultiplexingCase() throws Exception {
writer2.close();
}

@Test
public void testProtoSchemaPiping_nonMultiplexingCase() throws Exception {
ProtoSchema protoSchema = createProtoSchema();
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(protoSchema)
.setMaxInflightBytes(1)
.build();
long appendCount = 5;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
for (int i = 0; i < appendCount; i++) {
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
}

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
for (int i = 0; i < appendCount; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i == 0) {
appendRowsRequest.getProtoRows().getWriterSchema().equals(protoSchema);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
} else {
appendRowsRequest.getProtoRows().getWriterSchema().equals(ProtoSchema.getDefaultInstance());
}
}
writer.close();
}

@Test
public void testProtoSchemaPiping_multiplexingCase() throws Exception {
// Use the shared connection mode.
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
ProtoSchema schema1 = createProtoSchema("Schema1");
ProtoSchema schema2 = createProtoSchema("Schema2");
StreamWriter writer1 =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(schema1)
.setLocation("US")
.setEnableConnectionPool(true)
.setMaxInflightRequests(1)
.build();
StreamWriter writer2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(schema2)
.setMaxInflightRequests(1)
.setEnableConnectionPool(true)
.setLocation("US")
.build();

long appendCountPerStream = 5;
for (int i = 0; i < appendCountPerStream * 4; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// In total insert append `appendCountPerStream` * 4 requests.
// We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2
for (int i = 0; i < appendCountPerStream; i++) {
futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4));
futures.add(writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1));
futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2));
futures.add(writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3));
}

for (int i = 0; i < appendCountPerStream * 4; i++) {
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
assertEquals(i, appendRowsRequest.getOffset().getValue());
if (i % 4 == 0) {
assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema1);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
} else if (i % 4 == 1) {
assertEquals(
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
// Before entering multiplexing (i == 1) case, the write stream won't be populated.
if (i == 1) {
assertEquals(appendRowsRequest.getWriteStream(), "");
} else {
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_1);
}
} else if (i % 4 == 2) {
assertEquals(appendRowsRequest.getProtoRows().getWriterSchema(), schema2);
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
} else {
assertEquals(
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
}
}

writer1.close();
writer2.close();
}

@Test
public void testAppendsWithTinyMaxInflightBytes() throws Exception {
StreamWriter writer =
Expand Down

0 comments on commit cb18d28

Please sign in to comment.