From 75e446ae8acc9e6b70ab6b89c19b43eaa5bcd06b Mon Sep 17 00:00:00 2001 From: Diego Marquez Date: Fri, 21 Oct 2022 19:14:44 +0000 Subject: [PATCH] fix(batcher): exceptions in unaryCaller bubble up fixes #1826. When the BatcherImpl's unary caller throws an exception, this will bubble up to response.get() --- .ijwb/.bazelproject | 21 ++++++ .../google/api/gax/batching/BatcherImpl.java | 9 ++- .../api/gax/batching/BatcherImplTest.java | 75 +++++++++++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 .ijwb/.bazelproject diff --git a/.ijwb/.bazelproject b/.ijwb/.bazelproject new file mode 100644 index 000000000..b1a7b1bac --- /dev/null +++ b/.ijwb/.bazelproject @@ -0,0 +1,21 @@ +directories: + # Add the directories you want added as source here + # By default, we've added your entire workspace ('.') + . + +# Automatically includes all relevant targets under the 'directories' above +derive_targets_from_directories: true + +targets: + # If source code isn't resolving, add additional targets that compile it here + +additional_languages: + # Uncomment any additional languages you want supported + # android + # dart + # go + # javascript + # kotlin + # python + # scala + # typescript diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index d29e40d60..26eeb1a8e 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -274,8 +274,13 @@ public void sendOutstanding() { callContextWithOption = callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs); } - final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption); + ApiFuture batchResponse; + try { + batchResponse = + unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption); + } catch (Exception ex) { + batchResponse = ApiFutures.immediateFailedFuture(ex); + } numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index b503e28da..cd2595028 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -46,6 +46,7 @@ import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.api.gax.rpc.testing.FakeCallContext; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; @@ -931,6 +932,80 @@ public void testThrottlingNonBlocking() throws Exception { } } + /** + * If the batcher's unary callable throws an exception when obtaining a response, then the + * response .get() should throw the exception + */ + @Test + public void testAddDoesNotHangIfExceptionThrowStartingACall() { + BatchingDescriptor batchingDescriptor = + new BatchingDescriptor() { + @Override + public BatchingRequestBuilder newRequestBuilder(Object o) { + return new BatchingRequestBuilder() { + @Override + public void add(Object o) {} + + @Override + public Object build() { + return new Object(); + } + }; + } + + @Override + public void splitResponse(Object o, List> list) { + for (BatchEntry e : list) { + e.getResultFuture().set(new Object()); + } + } + + @Override + public void splitException(Throwable throwable, List> list) { + for (BatchEntry e : list) { + e.getResultFuture().setException(new RuntimeException("fake")); + } + } + + @Override + public long countBytes(Object o) { + return 1; + } + }; + + UnaryCallable unaryCallable = + new UnaryCallable() { + @Override + public ApiFuture futureCall(Object o, ApiCallContext apiCallContext) { + throw new RuntimeException("this should bubble up"); + } + }; + Object prototype = new Object(); + BatchingSettings batchingSettings = + BatchingSettings.newBuilder() + .setDelayThreshold(Duration.ofSeconds(1)) + .setElementCountThreshold(100L) + .setRequestByteThreshold(100L) + .setFlowControlSettings(FlowControlSettings.getDefaultInstance()) + .build(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); + FlowController flowController = new FlowController(batchingSettings.getFlowControlSettings()); + ApiCallContext callContext = FakeCallContext.createDefault(); + + BatcherImpl batcher = + new BatcherImpl<>( + batchingDescriptor, + unaryCallable, + prototype, + batchingSettings, + executor, + flowController, + callContext); + + ApiFuture f = batcher.add(new Object()); + Assert.assertThrows(ExecutionException.class, f::get); + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(4);