From c0555593afb76af6f6e61c41029d11db9614d961 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 8 Nov 2018 12:50:33 -0800 Subject: [PATCH 1/6] change to watchdog provider --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 44 +++- .../spanner/spi/v1/WatchdogInterceptor.java | 235 ------------------ 2 files changed, 43 insertions(+), 236 deletions(-) delete mode 100644 google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptor.java diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 4f367772cdcc..39e8586e1420 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -19,6 +19,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; import com.google.api.core.ApiFunction; +import com.google.api.core.NanoClock; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -29,12 +30,14 @@ import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.OperationCallable; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.api.gax.rpc.WatchdogProvider; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.ServiceOptions; import com.google.cloud.grpc.GrpcTransportOptions; @@ -54,6 +57,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.longrunning.GetOperationRequest; import com.google.longrunning.Operation; import com.google.protobuf.Empty; @@ -101,8 +105,10 @@ import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ public class GapicSpannerRpc implements SpannerRpc { @@ -113,6 +119,12 @@ public class GapicSpannerRpc implements SpannerRpc { PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}"); private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes + private static final String PROPERTY_TIMEOUT_SECONDS = + "com.google.cloud.spanner.watchdogTimeoutSeconds"; + private static final String PROPERTY_PERIOD_SECONDS = + "com.google.cloud.spanner.watchdogPeriodSeconds"; + private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60; + private static final int DEFAULT_PERIOD_SECONDS = 10; private final SpannerStub spannerStub; private final InstanceAdminStub instanceAdminStub; @@ -120,6 +132,9 @@ public class GapicSpannerRpc implements SpannerRpc { private final String projectId; private final String projectName; private final SpannerMetadataProvider metadataProvider; + private final Duration waitTimeout = systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); + private final Duration idleTimeout = systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); + private final Duration checkInterval = systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS); public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -173,6 +188,17 @@ public GapicSpannerRpc(SpannerOptions options) { CredentialsProvider credentialsProvider = GrpcTransportOptions.setUpCredentialsProvider(options); + WatchdogProvider watchdogProvider = + InstantiatingWatchdogProvider.create() + .withExecutor( + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Cloud-Spanner-WatchdogInterceptor-%d") + .build())) + .withCheckInterval(checkInterval) + .withClock(NanoClock.getDefaultClock()); + // Disabling retry for now because spanner handles retry in SpannerImpl. // We will finally want to improve gax but for smooth transitioning we // preserve the retry in SpannerImpl @@ -184,6 +210,7 @@ public GapicSpannerRpc(SpannerOptions options) { SpannerStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) .applyToAllUnaryMethods( new ApiFunction, Void>() { @Override @@ -199,6 +226,7 @@ public Void apply(UnaryCallSettings.Builder builder) { InstanceAdminStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) .applyToAllUnaryMethods( new ApiFunction, Void>() { @Override @@ -213,6 +241,7 @@ public Void apply(UnaryCallSettings.Builder builder) { DatabaseAdminStubSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) + .setStreamWatchdogProvider(watchdogProvider) .applyToAllUnaryMethods( new ApiFunction, Void>() { @Override @@ -527,12 +556,20 @@ private static T get(final Future future) throws SpannerException { } } - private GrpcCallContext newCallContext(@Nullable Map options, String resource) { + private GrpcCallContext newCallContext( + @Nullable Map options, + String resource) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); + if (waitTimeout != null) { + context = context.withStreamWaitTimeout(waitTimeout); + } + if (idleTimeout != null) { + context = context.withStreamIdleTimeout(waitTimeout); + } return context; } @@ -582,4 +619,9 @@ StreamController getController() { return Preconditions.checkNotNull(this.controller); } } + + private static Duration systemProperty(String name, int defaultValue) { + String stringValue = System.getProperty(name, ""); + return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue)); + } } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptor.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptor.java deleted file mode 100644 index bfe3de37e32b..000000000000 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptor.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright 2017 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner.spi.v1; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Ticker; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall; -import io.grpc.ForwardingClientCallListener; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Status; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; - -/** - * A gRPC interceptor that monitors activity on outstanding RPCs. If an RPC sees no activity for a - * configurable period of time, the watchdog will cancel that RPC and cause it to return an {@code - * UNAVAILABLE} error so that higher level code will retry the operation. This class exists to work - * around any edge case that may cause an RPC to hang indefinitely. - */ -class WatchdogInterceptor implements ClientInterceptor { - private static final String PROPERTY_TIMEOUT_SECONDS = - "com.google.cloud.spanner.watchdogTimeoutSeconds"; - private static final String PROPERTY_PERIOD_SECONDS = - "com.google.cloud.spanner.watchdogPeriodSeconds"; - private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60; - private static final int DEFAULT_PERIOD_SECONDS = 10; - - private static final Logger logger = Logger.getLogger(WatchdogInterceptor.class.getName()); - - private final long activityTimeoutNanos; - private final TimeUnit activityTimeoutUnits; - private final Ticker ticker; - private final ConcurrentHashMap, MonitoredCall> monitoredCalls; - - WatchdogInterceptor(long activityTimeout, TimeUnit activityTimeoutUnits) { - this(activityTimeout, activityTimeoutUnits, Ticker.systemTicker()); - } - - WatchdogInterceptor(long activityTimeout, TimeUnit activityTimeoutUnits, Ticker ticker) { - checkArgument(activityTimeout > 0, "activityTimeout must be positive"); - this.activityTimeoutNanos = activityTimeoutUnits.toNanos(activityTimeout); - this.activityTimeoutUnits = checkNotNull(activityTimeoutUnits); - this.ticker = checkNotNull(ticker); - // Expect up to ~100 RPCs/channel for initial capacity. Default concurrency level of 16 is - // likely about right for most applications. - this.monitoredCalls = new ConcurrentHashMap<>(128 /* initialCapacity */); - } - - private static int systemProperty(String name, int defaultValue) { - String stringValue = System.getProperty(name, ""); - return stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue); - } - - /** - * Creates a default instance based on the system property {@code - * com.google.cloud.spanner.watchdogTimeoutSeconds}, or a no-op interceptor if none configured. - */ - @Nullable - static ClientInterceptor newDefaultWatchdogInterceptor() { - int timeoutSeconds = systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); - if (timeoutSeconds <= 0) { - return new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions, - Channel channel) { - return channel.newCall(methodDescriptor, callOptions); - } - }; - } - int periodSeconds = systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS); - final WatchdogInterceptor interceptor = - new WatchdogInterceptor(timeoutSeconds, TimeUnit.SECONDS); - ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Cloud-Spanner-WatchdogInterceptor-%d") - .build()); - executor.scheduleWithFixedDelay( - new Runnable() { - @Override - public void run() { - interceptor.tick(); - } - }, - periodSeconds, - periodSeconds, - TimeUnit.SECONDS); - logger.log( - Level.FINE, - "Created watchdog interceptor with activity timeout of {0}s and period {1}s", - new Object[] {timeoutSeconds, periodSeconds}); - return interceptor; - } - - /** - * Scans over RPCs currently known to the interceptor, cancelling any that have not seen activity - * within the timeout. - */ - void tick() { - for (MonitoredCall call : monitoredCalls.keySet()) { - call.checkActivity(); - } - } - - @Override - public ClientCall interceptCall( - MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { - return new MonitoredCall<>(channel.newCall(methodDescriptor, callOptions)); - } - - private void registerCall(MonitoredCall call) { - // Cannot put a null value into the map, so use "call". - monitoredCalls.put(call, call); - } - - private void unregisterCall(MonitoredCall call) { - monitoredCalls.remove(call); - } - - private class MonitoredCall - extends ForwardingClientCall.SimpleForwardingClientCall { - private volatile long lastActivityNanos; - private volatile boolean stoppedByWatchdog; - private final AtomicBoolean cancelled = new AtomicBoolean(false); - - MonitoredCall(ClientCall delegate) { - super(delegate); - } - - @Override - public void start(Listener responseListener, Metadata headers) { - recordActivity(); - registerCall(this); - - ForwardingClientCallListener.SimpleForwardingClientCallListener listener = - new ForwardingClientCallListener.SimpleForwardingClientCallListener( - responseListener) { - @Override - public void onHeaders(Metadata headers) { - recordActivity(); - super.onHeaders(headers); - } - - @Override - public void onMessage(RespT message) { - recordActivity(); - super.onMessage(message); - } - - @Override - public void onReady() { - recordActivity(); - super.onReady(); - } - - @Override - public void onClose(Status status, Metadata trailers) { - unregisterCall(MonitoredCall.this); - super.onClose(handleStatus(status), trailers); - } - }; - - super.start(listener, headers); - } - - void recordActivity() { - lastActivityNanos = ticker.read(); - } - - void checkActivity() { - if (ticker.read() - lastActivityNanos > activityTimeoutNanos - && cancelled.compareAndSet(false, true)) { - stoppedByWatchdog = true; - delegate().cancel("Cancelled by activity watchdog", null); - logger.log( - Level.WARNING, - "Cancelled due to exceeding inactivity timeout of {0} {1}", - new Object[] { - activityTimeoutUnits.convert(activityTimeoutNanos, TimeUnit.NANOSECONDS), - activityTimeoutUnits - }); - } - } - - Status handleStatus(Status status) { - if (stoppedByWatchdog && status.getCode() == Status.Code.CANCELLED) { - return Status.UNAVAILABLE.withDescription( - "Aborted by RPC activity watchdog [timeout=" - + activityTimeoutUnits.convert(activityTimeoutNanos, TimeUnit.NANOSECONDS) - + " " - + activityTimeoutUnits - + "]"); - } - return status; - } - - @Override - public void cancel(@Nullable String message, @Nullable Throwable cause) { - if (cancelled.compareAndSet(false, true)) { - super.cancel(message, cause); - } - } - } -} From 659888fbd2001c73a621f5fca165c4e19de661be Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 8 Nov 2018 12:55:29 -0800 Subject: [PATCH 2/6] format --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 39e8586e1420..9e58caac09e8 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -132,9 +132,12 @@ public class GapicSpannerRpc implements SpannerRpc { private final String projectId; private final String projectName; private final SpannerMetadataProvider metadataProvider; - private final Duration waitTimeout = systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); - private final Duration idleTimeout = systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); - private final Duration checkInterval = systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS); + private final Duration waitTimeout = + systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); + private final Duration idleTimeout = + systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS); + private final Duration checkInterval = + systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS); public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -556,21 +559,13 @@ private static T get(final Future future) throws SpannerException { } } - private GrpcCallContext newCallContext( - @Nullable Map options, - String resource) { + private GrpcCallContext newCallContext(@Nullable Map options, String resource) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); - if (waitTimeout != null) { - context = context.withStreamWaitTimeout(waitTimeout); - } - if (idleTimeout != null) { - context = context.withStreamIdleTimeout(waitTimeout); - } - return context; + return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(waitTimeout); } public void shutdown() { From 8998c74b10d92bf4ca171de8444090d786ef7b0f Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 8 Nov 2018 12:56:58 -0800 Subject: [PATCH 3/6] work --- .../cloud/spanner/spi/v1/SpannerInterceptorProvider.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java index c51966837d00..060f4ca9d1cc 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java @@ -33,8 +33,7 @@ public class SpannerInterceptorProvider implements GrpcInterceptorProvider { private static final List defaultInterceptors = ImmutableList.of( new SpannerErrorInterceptor(), - new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER), - WatchdogInterceptor.newDefaultWatchdogInterceptor()); + new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER)); private final List clientInterceptors; From e2d6ae3cb66c059b12284680396250fb8878b996 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Thu, 8 Nov 2018 13:49:02 -0800 Subject: [PATCH 4/6] tests --- .../spi/v1/WatchdogInterceptorTest.java | 202 ------------------ 1 file changed, 202 deletions(-) delete mode 100644 google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptorTest.java diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptorTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptorTest.java deleted file mode 100644 index 2c5ab421835a..000000000000 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/WatchdogInterceptorTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright 2017 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner.spi.v1; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.same; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.api.gax.grpc.testing.FakeMethodDescriptor; -import com.google.common.testing.FakeTicker; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Status; -import java.util.concurrent.TimeUnit; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** Unit tests for {@link WatchdogInterceptor}. */ -@RunWith(JUnit4.class) -public class WatchdogInterceptorTest { - private static final long TIMEOUT_NANOS = 1000000; - - FakeTicker ticker; - WatchdogInterceptor watchdog; - - CallOptions options; - Metadata metadata; - MethodDescriptor descriptor = FakeMethodDescriptor.create(); - - @Mock Channel channel; - @Mock ClientCall innerCall; - @Mock ClientCall.Listener listener; - - @Captor ArgumentCaptor> innerListener; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - ticker = new FakeTicker(); - watchdog = new WatchdogInterceptor(TIMEOUT_NANOS, TimeUnit.NANOSECONDS, ticker); - // Nothing special about the deadline: we just want a particular CallOptions instance to expect. - options = CallOptions.DEFAULT.withDeadlineAfter(10, TimeUnit.SECONDS); - metadata = new Metadata(); - } - - private ClientCall startCall() { - when(channel.newCall(Mockito.>any(), same(options))) - .thenReturn(innerCall); - doNothing().when(innerCall).start(innerListener.capture(), Mockito.any()); - - ClientCall watchdogCall = watchdog.interceptCall(descriptor, options, channel); - watchdogCall.start(listener, metadata); - return watchdogCall; - } - - @Test - public void noTimeout() { - startCall(); - - // Finish RPC with ABORTED. - Metadata trailers = new Metadata(); - innerListener.getValue().onClose(Status.ABORTED, trailers); - - // Check that this was propagated. - verify(listener).onClose(Status.ABORTED, trailers); - } - - @Test - public void timeout() { - startCall(); - - ticker.advance(TIMEOUT_NANOS + 1); - watchdog.tick(); - - verify(innerCall, times(1)).cancel(anyString(), Mockito.any()); - - // Finish RPC with CANCELLED. - Metadata trailers = new Metadata(); - innerListener.getValue().onClose(Status.CANCELLED, trailers); - - // Should translate to UNAVAILABLE. - verify(listener).onClose(argThat(isStatusWithCode(Status.Code.UNAVAILABLE)), same(trailers)); - } - - @Test - public void callerCancelsBeforeTimeout() { - ClientCall call = startCall(); - - // Cancel the call before the watchdog does. - call.cancel("Cancelled by user", null); - - ticker.advance(TIMEOUT_NANOS + 1); - - // Finish RPC with CANCELLED. - Metadata trailers = new Metadata(); - innerListener.getValue().onClose(Status.CANCELLED, trailers); - - watchdog.tick(); - - // The call should be cancelled exactly once. - verify(innerCall, times(1)).cancel(anyString(), Mockito.any()); - - // No translation, since the user beat the watchdog. - verify(listener).onClose(argThat(isStatusWithCode(Status.Code.CANCELLED)), same(trailers)); - } - - @Test - public void callerCancelsAfterTimeout() { - ClientCall call = startCall(); - - ticker.advance(TIMEOUT_NANOS + 1); - // Cancel the call before the watchdog does. - call.cancel("Cancelled by user", null); - watchdog.tick(); - - // The call should be cancelled exactly once. - verify(innerCall, times(1)).cancel(anyString(), Mockito.any()); - - // Finish RPC with CANCELLED. - Metadata trailers = new Metadata(); - innerListener.getValue().onClose(Status.CANCELLED, trailers); - - // No translation, since the user beat the watchdog. - verify(listener).onClose(argThat(isStatusWithCode(Status.Code.CANCELLED)), same(trailers)); - } - - @Test - public void failureAfterTimeout() { - startCall(); - - ticker.advance(TIMEOUT_NANOS + 1); - watchdog.tick(); - - // The call should be cancelled exactly once. - verify(innerCall, times(1)).cancel(anyString(), Mockito.any()); - - // Finish RPC with error. - Metadata trailers = new Metadata(); - innerListener.getValue().onClose(Status.DATA_LOSS, trailers); - - // No translation expected. - verify(listener).onClose(argThat(isStatusWithCode(Status.Code.DATA_LOSS)), same(trailers)); - } - - private static Matcher isStatusWithCode(Status.Code code) { - return new StatusCodeMatcher(code); - } - - private static class StatusCodeMatcher extends BaseMatcher { - private final Status.Code expectedCode; - - StatusCodeMatcher(Status.Code expectedCode) { - this.expectedCode = checkNotNull(expectedCode); - } - - @Override - public boolean matches(Object item) { - if (!(item instanceof Status)) { - return false; - } - Status s = (Status) item; - return s.getCode() == expectedCode; - } - - @Override - public void describeTo(Description description) { - description.appendText("Status[" + expectedCode + "]"); - } - } -} From 69122c115ed81fe8f73aff2e8df8d0573a50b81c Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Fri, 9 Nov 2018 10:47:51 -0800 Subject: [PATCH 5/6] format --- .../java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 9e58caac09e8..5940bd47e249 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -565,7 +565,7 @@ private GrpcCallContext newCallContext(@Nullable Map options, String context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); } context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); - return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(waitTimeout); + return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(idleTimeout); } public void shutdown() { From 89434125c51916036be9cb663f3f074a94b3139b Mon Sep 17 00:00:00 2001 From: Hanzhen Yi Date: Fri, 9 Nov 2018 11:22:39 -0800 Subject: [PATCH 6/6] code review --- .../java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 5940bd47e249..607f3fd3e41a 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -197,7 +197,7 @@ public GapicSpannerRpc(SpannerOptions options) { Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("Cloud-Spanner-WatchdogInterceptor-%d") + .setNameFormat("Cloud-Spanner-WatchdogProvider-%d") .build())) .withCheckInterval(checkInterval) .withClock(NanoClock.getDefaultClock());