diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 7bbac03708..c8a41f38e3 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -251,6 +251,9 @@ private void setException(Throwable t) { * @return the message ID wrapped in a future. */ public ApiFuture append(AppendRowsRequest message) { + if (shutdown.get()) { + throw new IllegalStateException("Cannot append to a shutdown writer"); + } appendAndRefreshAppendLock.lock(); Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); Preconditions.checkNotNull(message, "Message is null."); @@ -284,14 +287,9 @@ public ApiFuture append(AppendRowsRequest message) { * @throws Exception */ public void flushAll(long timeoutMillis) throws Exception { - appendAndRefreshAppendLock.lock(); - try { - writeAllOutstanding(); - synchronized (messagesWaiter) { - messagesWaiter.waitComplete(timeoutMillis); - } - } finally { - appendAndRefreshAppendLock.unlock(); + writeAllOutstanding(); + synchronized (messagesWaiter) { + messagesWaiter.waitComplete(timeoutMillis); } exceptionLock.lock(); try { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 2b67f9f3d1..e0c78000e7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -1049,4 +1049,28 @@ public void testShutdownWithConnectionError() throws Exception { assertEquals("Request aborted due to previous failures", e.getCause().getMessage()); } } + + @Test + public void testAppendAfterShutdown() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .build(); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); + writer.shutdown(); + try { + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + fail("Should fail with exception"); + } catch (IllegalStateException e) { + assertEquals("Cannot append to a shutdown writer", e.getMessage()); + } + } }