Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add sample about processing permanent writer failure #2057

Merged
merged 12 commits into from
Apr 3, 2023
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -123,6 +124,7 @@ private static class AppendContext {
private static class DataWriter {

private static final int MAX_RETRY_COUNT = 3;
private static final int MAX_RECREATE_COUNT = 3;
private static final ImmutableList<Code> RETRIABLE_ERROR_CODES =
ImmutableList.of(
Code.INTERNAL,
Expand All @@ -140,6 +142,8 @@ private static class DataWriter {
@GuardedBy("lock")
private RuntimeException error = null;

private AtomicInteger recreateCount = new AtomicInteger(0);

public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
Expand All @@ -153,6 +157,13 @@ public void initialize(TableName parentTable)
public void append(AppendContext appendContext)
throws DescriptorValidationException, IOException {
synchronized (this.lock) {
if (!streamWriter.isUserClosed() && streamWriter.isClosed() &&
recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter =
JsonStreamWriter.newBuilder(streamWriter.getStreamName(),
BigQueryWriteClient.create()).build();
this.error = null;
}
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
Expand Down Expand Up @@ -194,6 +205,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {

public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
this.parent.recreateCount.set(0);
done();
}

Expand Down