Skip to content

Commit

Permalink
fix: Revive schema update e2e test and adjust some test names (#921)
Browse files Browse the repository at this point in the history
* fix: Revive schema update e2e test and adjust some test names

* .
  • Loading branch information
yirutang authored Mar 9, 2021
1 parent f7c645a commit dd392e5
Showing 1 changed file with 131 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testBatchWriteWithCommittedStream()
}

@Test
public void testJsonStreamWriterWriteWithCommittedStream()
public void testJsonStreamWriterCommittedStream()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "JsonTable";
Expand Down Expand Up @@ -283,7 +283,7 @@ public void testJsonStreamWriterWriteWithCommittedStream()
}

@Test
public void testJsonStreamWriterBatchWriteWithDefaultStream()
public void testJsonStreamWriterWithDefaultStream()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "JsonTableDefaultStream";
Expand Down Expand Up @@ -355,141 +355,137 @@ public void testJsonStreamWriterBatchWriteWithDefaultStream()
}
}

// @Test
// public void testJsonStreamWriterSchemaUpdate()
// throws IOException, InterruptedException, ExecutionException,
// Descriptors.DescriptorValidationException {
// String tableName = "SchemaUpdateTable";
// TableInfo tableInfo =
// TableInfo.newBuilder(
// TableId.of(DATASET, tableName),
// StandardTableDefinition.of(
// Schema.of(
// com.google.cloud.bigquery.Field.newBuilder("foo",
// LegacySQLTypeName.STRING)
// .build())))
// .build();
//
// bigquery.create(tableInfo);
// TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
// WriteStream writeStream =
// client.createWriteStream(
// CreateWriteStreamRequest.newBuilder()
// .setParent(parent.toString())
// .setWriteStream(
// WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
// .build());
//
// try (JsonStreamWriter jsonStreamWriter =
// JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
// .setBatchingSettings(
// StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
// .toBuilder()
// .setElementCountThreshold(1L)
// .build())
// .build()) {
// // 1). Send 1 row
// JSONObject foo = new JSONObject();
// foo.put("foo", "aaa");
// JSONArray jsonArr = new JSONArray();
// jsonArr.put(foo);
//
// ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, -1);
// assertEquals(0, response.get().getAppendResult().getOffset().getValue());
// // 2). Schema update and wait until querying it returns a new schema.
// try {
// com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName);
// Schema schema = table.getDefinition().getSchema();
// FieldList fields = schema.getFields();
// Field newField =
// Field.newBuilder("bar",
// LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
//
// List<Field> fieldList = new ArrayList<Field>();
// fieldList.add(fields.get(0));
// fieldList.add(newField);
// Schema newSchema = Schema.of(fieldList);
// // Update the table with the new schema
// com.google.cloud.bigquery.Table updatedTable =
// table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build();
// updatedTable.update();
// int millis = 0;
// while (millis <= 10000) {
// if (newSchema.equals(table.reload().getDefinition().getSchema())) {
// break;
// }
// Thread.sleep(1000);
// millis += 1000;
// }
// newSchema = schema;
// LOG.info(
// "bar column successfully added to table in "
// + millis
// + " millis: "
// + bigquery.getTable(DATASET, tableName).getDefinition().getSchema());
// } catch (BigQueryException e) {
// LOG.severe("bar column was not added. \n" + e.toString());
// }
// // 3). Send rows to wait for updatedSchema to be returned.
// JSONObject foo2 = new JSONObject();
// foo2.put("foo", "bbb");
// JSONArray jsonArr2 = new JSONArray();
// jsonArr2.put(foo2);
//
// int next = 0;
// for (int i = 1; i < 100; i++) {
// ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
// assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
// if (response2.get().hasUpdatedSchema()) {
// next = i;
// break;
// } else {
// Thread.sleep(1000);
// }
// }
//
// int millis = 0;
// while (millis <= 10000) {
// if (jsonStreamWriter.getDescriptor().getFields().size() == 2) {
// LOG.info("JsonStreamWriter successfully updated internal descriptor!");
// break;
// }
// Thread.sleep(100);
// millis += 100;
// }
// assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2);
// // 4). Send rows with updated schema.
// JSONObject updatedFoo = new JSONObject();
// updatedFoo.put("foo", "ccc");
// updatedFoo.put("bar", "ddd");
// JSONArray updatedJsonArr = new JSONArray();
// updatedJsonArr.put(updatedFoo);
// for (int i = 0; i < 10; i++) {
// ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(updatedJsonArr, -1);
// assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
// response3.get();
// }
//
// TableResult result3 =
// bigquery.listTableData(
// tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
// Iterator<FieldValueList> iter3 = result3.getValues().iterator();
// assertEquals("aaa", iter3.next().get(0).getStringValue());
// for (int j = 1; j <= next; j++) {
// assertEquals("bbb", iter3.next().get(0).getStringValue());
// }
// for (int j = next + 1; j < next + 1 + 10; j++) {
// FieldValueList temp = iter3.next();
// assertEquals("ccc", temp.get(0).getStringValue());
// assertEquals("ddd", temp.get(1).getStringValue());
// }
// assertEquals(false, iter3.hasNext());
// }
// }
@Test
public void testJsonStreamWriterSchemaUpdate()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "SchemaUpdateTable";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
.build())))
.build();

// Create a Bigquery table.
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
// Create a Write Api stream.
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());

try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
// 1). Send 1 row
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());
// 2). Schema update and wait until querying it returns a new schema.
try {
com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName);
Schema schema = table.getDefinition().getSchema();
FieldList fields = schema.getFields();
Field newField =
Field.newBuilder("bar", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();

List<Field> fieldList = new ArrayList<Field>();
fieldList.add(fields.get(0));
fieldList.add(newField);
Schema newSchema = Schema.of(fieldList);
// Update the table with the new schema
com.google.cloud.bigquery.Table updatedTable =
table.toBuilder().setDefinition(StandardTableDefinition.of(newSchema)).build();
updatedTable.update();
int millis = 0;
while (millis <= 10000) {
if (newSchema.equals(table.reload().getDefinition().getSchema())) {
break;
}
Thread.sleep(1000);
millis += 1000;
}
newSchema = schema;
LOG.info(
"bar column successfully added to table in "
+ millis
+ " millis: "
+ bigquery.getTable(DATASET, tableName).getDefinition().getSchema());
} catch (BigQueryException e) {
LOG.severe("bar column was not added. \n" + e.toString());
}
// 3). Send rows to wait for updatedSchema to be returned.
JSONObject foo2 = new JSONObject();
foo2.put("foo", "bbb");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo2);

int next = 0;
for (int i = 1; i < 100; i++) {
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
if (response2.get().hasUpdatedSchema()) {
next = i;
break;
} else {
Thread.sleep(1000);
}
}

int millis = 0;
while (millis <= 10000) {
if (jsonStreamWriter.getDescriptor().getFields().size() == 2) {
LOG.info("JsonStreamWriter successfully updated internal descriptor!");
break;
}
Thread.sleep(100);
millis += 100;
}
assertTrue(jsonStreamWriter.getDescriptor().getFields().size() == 2);
// 4). Send rows with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "ccc");
updatedFoo.put("bar", "ddd");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);
for (int i = 0; i < 10; i++) {
ApiFuture<AppendRowsResponse> response3 =
jsonStreamWriter.append(updatedJsonArr, next + 1 + i);
assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
response3.get();
}

TableResult result3 =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter3 = result3.getValues().iterator();
assertEquals("aaa", iter3.next().get(0).getStringValue());
for (int j = 1; j <= next; j++) {
assertEquals("bbb", iter3.next().get(0).getStringValue());
}
for (int j = next + 1; j < next + 1 + 10; j++) {
FieldValueList temp = iter3.next();
assertEquals("ccc", temp.get(0).getStringValue());
assertEquals("ddd", temp.get(1).getStringValue());
}
assertEquals(false, iter3.hasNext());
}
}

@Test
public void testComplicateSchemaWithPendingStream()
throws IOException, InterruptedException, ExecutionException {
LOG.info("Create a write stream");
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
Expand Down Expand Up @@ -520,6 +516,7 @@ public void testComplicateSchemaWithPendingStream()
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals(false, iter.hasNext());

LOG.info("Finalize a write stream");
finalizeResponse =
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
Expand All @@ -536,8 +533,8 @@ public void testComplicateSchemaWithPendingStream()
LOG.info("Got exception: " + expected.toString());
}
}
// Finalize row count is not populated.
assertEquals(2, finalizeResponse.getRowCount());
LOG.info("Commit a write stream");
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
client.batchCommitWriteStreams(
BatchCommitWriteStreamsRequest.newBuilder()
Expand Down

0 comments on commit dd392e5

Please sign in to comment.