-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JsonStreamWriter
fails to append rows with updated schemas right after updating the table
#1465
Comments
Hi Bruno, thank you for your feedback. We are aware of this limitation (delayed response of JsonStreamWriter in knowing when the schema is updated due to time taken in metadata updates). We added a new feature ignoreUnknownField which could mitigate the situation in error handling but is not solving the problem completely. Would it help with your use case? The best way to solve this is for BigQueryStorage Write API to support JSON natively. This is something being discussed internally (cc @yirutang @yayi-google). Meanwhile, we don't expect schema update to happen very frequently (as you've acknowledged). Please let us know if Thanks! |
Hello! We are of of Thank you |
Update: We opted for a workaround: re-instantiate both Hopefully this workaround suffices in the foreseeable future. In any case, will leave the ticket open as it is still valid. |
How is that different than what has been implemented in #1447? |
In detail, Given this scenario, the only way to have an up-to-date |
If you know explicitly when your schema is updated and when you need the new data in, you could just recreate a new JsonWriter instead if reusing the existing Json Writer. |
Also why do you want to use BUFFERED mode to write your data? |
Thank you both for providing feedback.
Oh, I see that we can use
Seemed more suitable for our use-case: ingest information from our data pipelines to Big Query (Streaming approach). Probably the |
There is something in the samples that does the conversion. Yeah, in most cases pending mode is good enough for batching behavior, and committed is good for streaming behavior. |
Thank you! That really helped! That utility class seems useful enough to be exposed to others (rather than a sample). As reference to others, I will leave the updated integration test that verifies that your solution works. Manually updating the JsonStreamWriter Schematest("Schema migrations using JsonStreamWriter fails if appending rows with new schema") {
val client = BigQueryWriteClient.create()
val bigquery = BigQueryOptions.newBuilder
.setProjectId("project")
.build
.getService
val DATASET = "dataset"
val tableName =
"SchemaUpdateTestTable" + UUID.randomUUID().toString.replace("-", "").substring(0, 5)
val tableId = TableId.of(DATASET, tableName)
val col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build
val originalSchema = Schema.of(col1)
val tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build
bigquery.create(tableInfo)
val parent = TableName.of(ServiceOptions.getDefaultProjectId, DATASET, tableName)
val writeStream = client.createWriteStream(
CreateWriteStreamRequest.newBuilder
.setParent(parent.toString)
.setWriteStream(WriteStream.newBuilder.setType(WriteStream.Type.BUFFERED).build)
.build
)
try {
val jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName, writeStream.getTableSchema).build
try { // write the 1st row
var currentOffset = 0L
val foo = new JSONObject()
foo.put("col1", "aaa")
val jsonArr = new JSONArray()
jsonArr.put(foo)
val response = jsonStreamWriter.append(jsonArr, currentOffset)
currentOffset += jsonArr.length() - 1
assertEquals(0L, response.get.getAppendResult.getOffset.getValue)
assertEquals(
0L,
client
.flushRows(
FlushRowsRequest.newBuilder
.setWriteStream(writeStream.getName)
.setOffset(Int64Value.of(currentOffset))
.build()
)
.getOffset,
)
// update schema with a new column
val col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build
val updatedSchema = Schema.of(ImmutableList.of(col1, col2))
val updatedTableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build
val updatedTable = bigquery.update(updatedTableInfo)
assertEquals(
updatedSchema,
updatedTable.getDefinition.asInstanceOf[TableDefinition].getSchema,
)
// NEW: Manually update the underlying schema to the new one.
Thread.sleep(30000) // wait a bit until BQ's backend is ready to accept new requests with the new schema. 30 is likely too much but works.
val updatedJsonWriter = JsonStreamWriter.newBuilder(writeStream.getName, BqToBqStorageSchemaConverter.convertTableSchema(updatedSchema)).build()
// write rows with updated schema.
val updatedFoo = new JSONObject()
updatedFoo.put("col1", "ccc")
updatedFoo.put("col2", "ddd")
val updatedJsonArr = new JSONArray()
updatedJsonArr.put(updatedFoo)
for (i <- 0 until 10) {
currentOffset += updatedJsonArr.length()
val response3 = updatedJsonWriter.append(updatedJsonArr, currentOffset)
assertEquals(currentOffset, response3.get.getAppendResult.getOffset.getValue)
}
assertEquals(
currentOffset,
client
.flushRows(
FlushRowsRequest.newBuilder
.setWriteStream(writeStream.getName)
.setOffset(Int64Value.of(currentOffset))
.build()
)
.getOffset,
)
// verify table data correctness
val rowsIter = bigquery.listTableData(tableId).getValues.iterator
// 1 row of aaa
assertEquals("aaa", rowsIter.next.get(0).getStringValue)
// 10 rows of ccc, ddd
for (_ <- 0 until 10) {
val temp = rowsIter.next
assertEquals("ccc", temp.get(0).getStringValue)
assertEquals("ddd", temp.get(1).getStringValue)
}
assertFalse(rowsIter.hasNext)
} finally if (jsonStreamWriter != null) jsonStreamWriter.close()
}
}
// Add the content of https://github.com/googleapis/java-bigquerystorage/blob/main/samples/snippets/src/main/java/com/example/bigquerystorage/BqToBqStorageSchemaConverter.java |
Yeah, it was in the client lib, but the bigquery storage cannot reference bigquery, thus the function cannot be included in the client lib... |
Here is a bit more context on that decision. As things evolve, we can explore ways to improve the client if you have other suggestions. |
I see. Have you considered having a separate dependency that just holds the "model" (table definition, field definition, etc)? That way, both Big Query API and Big Query Storage API would use the same API to represent Big Query entities.
Thank you both for your help. It still requires closing the previous In any case, we are not functionally blocked. The ticket itself still makes sense but I'll leave the decision to you on how you want to triage this. Happy to help if you need |
Thanks, closing for now. |
Hello,
Me and my team are exploring the BigQuery Storage API to stream information to BigQuery (
BUFFERED
mode). The problem comes with schema updates. We can successfully update the BigQuery table's schema once we deem necessary, however appending rows with the new schemas usingJsonStreamWriter
fails as the underlying table schema is outdated leading toJSONObject has fields unknown to BigQuery: root.col2.
.I am aware that #1447 adds support for table schemas, however it depends on the client making new calls to the append until the underlying schema is updated (looking at the test).
For reference, I copied your integration tests to our project (Scala) to ease discussion:
Works with `BUFFERED` mode
Does not work if appending rows with new schema right after updating table
Error:
Describe the solution you'd like
Follows two ideas but you are best suited to take design decisions.
JsonStreamWriter
recovering this error by checking the latestTableDefinition
, however this API call can be costly depending on how often schema updates occur (in our case, rare). It may be a configurable option. Works for our use-case.JsonStreamWriter
could support changing the underlying schema:JsonStreamWriter::setTableDefinition
that does something similar to this. Edit: This introduces mutability so it is not the best solution.Describe alternatives you've considered
Attempting to recreate the
WriteStream
, thenJsonStreamWriter
but it is disruptive given our Streaming approach.Environment details
OS type and version: Mac OS M1
Java version: Coretto 11.0.12
bigquerystorage version(s): 2.8.0
bigquery: 2.5.1
Steps to reproduce
See second code sample provided. Happy to help!
Code example
See second code sample provided. I essentially removed the logic
// continue writing rows until backend acknowledges schema update
from the first sample with theBUFFERED
mode.Stack trace
The text was updated successfully, but these errors were encountered: