From 8ccb910fa93c10dd84aac5e38ac0c0e44db935bb Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Fri, 18 Aug 2023 09:59:38 -0400 Subject: [PATCH] HBASE-27947 RegionServer OOM when outbound channel backed up (#5350) Signed-off-by: Duo Zhang Reviewed-by: Norman Maurer --- .../hadoop/hbase/util/NettyUnsafeUtils.java | 61 ++++++ .../hbase/ipc/MetricsHBaseServerSource.java | 16 ++ .../ipc/MetricsHBaseServerSourceImpl.java | 24 +++ .../hbase/ipc/MetricsHBaseServerWrapper.java | 7 + .../hadoop/hbase/ipc/MetricsHBaseServer.java | 8 + .../ipc/MetricsHBaseServerWrapperImpl.java | 13 ++ .../hadoop/hbase/ipc/NettyRpcServer.java | 161 +++++++++++++++- ...ttyRpcServerChannelWritabilityHandler.java | 125 ++++++++++++ .../ipc/NettyRpcServerPreambleHandler.java | 11 +- .../hadoop/hbase/ipc/NettyServerCall.java | 2 +- .../hbase/ipc/FailingNettyRpcServer.java | 9 +- .../ipc/MetricsHBaseServerWrapperStub.java | 7 + .../ipc/TestNettyChannelWritability.java | 182 ++++++++++++++++++ .../hadoop/hbase/ipc/TestRpcMetrics.java | 9 + .../ipc/TestRpcSkipInitialSaslHandshake.java | 28 +-- 15 files changed, 619 insertions(+), 44 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java new file mode 100644 index 000000000000..8b246e978ea0 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundBuffer; + +/** + * Wraps some usages of netty's unsafe API, for ease of maintainability. + */ +@InterfaceAudience.Private +public final class NettyUnsafeUtils { + + private NettyUnsafeUtils() { + } + + /** + * Directly closes the channel, setting SO_LINGER to 0 and skipping any handlers in the pipeline. + * This is useful for cases where it's important to immediately close without any delay. + * Otherwise, pipeline handlers and even general TCP flows can cause a normal close to take + * upwards of a few second or more. This will likely cause the client side to see either a + * "Connection reset by peer" or unexpected ConnectionClosedException. + *

+ * It's necessary to call this from within the channel's eventLoop! + */ + public static void closeImmediately(Channel channel) { + assert channel.eventLoop().inEventLoop(); + channel.config().setOption(ChannelOption.SO_LINGER, 0); + channel.unsafe().close(channel.voidPromise()); + } + + /** + * Get total bytes pending write to socket + */ + public static long getTotalPendingOutboundBytes(Channel channel) { + ChannelOutboundBuffer outboundBuffer = channel.unsafe().outboundBuffer(); + // can be null when the channel is closing + if (outboundBuffer == null) { + return 0; + } + return outboundBuffer.totalPendingWriteBytes(); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 98ecf8b8d92d..df2e335a718f 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -46,6 +46,14 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String PROCESS_CALL_TIME_DESC = "Processing call time."; String TOTAL_CALL_TIME_NAME = "totalCallTime"; String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time."; + + String UNWRITABLE_TIME_NAME = "unwritableTime"; + String UNWRITABLE_TIME_DESC = + "Time where an channel was unwritable due to having too many outbound bytes"; + String MAX_OUTBOUND_BYTES_EXCEEDED_NAME = "maxOutboundBytesExceeded"; + String MAX_OUTBOUND_BYTES_EXCEEDED_DESC = + "Number of times a connection was closed because the channel outbound " + + "bytes exceeded the configured max."; String QUEUE_SIZE_NAME = "queueSize"; String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " + "parsed and is waiting to run or is currently being executed."; @@ -97,6 +105,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage"; String NETTY_DM_USAGE_DESC = "Current Netty direct memory usage."; + String NETTY_TOTAL_PENDING_OUTBOUND_NAME = "nettyTotalPendingOutboundBytes"; + String NETTY_TOTAL_PENDING_OUTBOUND_DESC = "Current total bytes pending write to all channel"; + String NETTY_MAX_PENDING_OUTBOUND_NAME = "nettyMaxPendingOutboundBytes"; + String NETTY_MAX_PENDING_OUTBOUND_DESC = "Current maximum bytes pending write to any channel"; void authorizationSuccess(); @@ -121,4 +133,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { void processedCall(int processingTime); void queuedAndProcessedCall(int totalTime); + + void unwritableTime(long unwritableTime); + + void maxOutboundBytesExceeded(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 9c75f4e6bcba..1a6d557d8adc 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl; import org.apache.hadoop.hbase.metrics.Interns; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -36,10 +37,12 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private final MutableFastCounter authenticationFallbacks; private final MutableFastCounter sentBytes; private final MutableFastCounter receivedBytes; + private final MutableFastCounter maxOutboundBytesExceeded; private MetricHistogram queueCallTime; private MetricHistogram processCallTime; private MetricHistogram totalCallTime; + private MetricHistogram unwritableTime; private MetricHistogram requestSize; private MetricHistogram responseSize; @@ -67,6 +70,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, String metricsDescriptio this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, PROCESS_CALL_TIME_DESC); this.totalCallTime = this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, TOTAL_CALL_TIME_DESC); + this.unwritableTime = + this.getMetricsRegistry().newTimeHistogram(UNWRITABLE_TIME_NAME, UNWRITABLE_TIME_DESC); + this.maxOutboundBytesExceeded = this.getMetricsRegistry() + .newCounter(MAX_OUTBOUND_BYTES_EXCEEDED_NAME, MAX_OUTBOUND_BYTES_EXCEEDED_DESC, 0); this.requestSize = this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, REQUEST_SIZE_DESC); this.responseSize = @@ -133,6 +140,16 @@ public void queuedAndProcessedCall(int totalTime) { totalCallTime.add(totalTime); } + @Override + public void unwritableTime(long unwritableTime) { + this.unwritableTime.add(unwritableTime); + } + + @Override + public void maxOutboundBytesExceeded() { + maxOutboundBytesExceeded.incr(); + } + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); @@ -177,6 +194,13 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { wrapper.getActiveScanRpcHandlerCount()) .addGauge(Interns.info(NETTY_DM_USAGE_NAME, NETTY_DM_USAGE_DESC), wrapper.getNettyDmUsage()); + + Pair totalAndMax = wrapper.getTotalAndMaxNettyOutboundBytes(); + mrb.addGauge( + Interns.info(NETTY_TOTAL_PENDING_OUTBOUND_NAME, NETTY_TOTAL_PENDING_OUTBOUND_DESC), + totalAndMax.getFirst()); + mrb.addGauge(Interns.info(NETTY_MAX_PENDING_OUTBOUND_NAME, NETTY_MAX_PENDING_OUTBOUND_DESC), + totalAndMax.getSecond()); } metricsRegistry.snapshot(mrb, all); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java index 1a8980bbc7bd..bb376cba930d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -64,4 +65,10 @@ public interface MetricsHBaseServerWrapper { int getActiveScanRpcHandlerCount(); long getNettyDmUsage(); + + /** + * These two metrics are calculated together, so we want to return them in one call + * @return pair containing total (first) and max (second) pending outbound bytes. + */ + Pair getTotalAndMaxNettyOutboundBytes(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index a4c73f925d3c..b5fbb5c43d15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -97,6 +97,14 @@ void totalCall(int totalTime) { source.queuedAndProcessedCall(totalTime); } + void unwritableTime(long unwritableTime) { + source.unwritableTime(unwritableTime); + } + + void maxOutboundBytesExceeded() { + source.maxOutboundBytesExceeded(); + } + public void exception(Throwable throwable) { source.exception(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index 857315568c5e..1fc1806265d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.util.DirectMemoryUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -209,4 +210,16 @@ public long getNettyDmUsage() { return DirectMemoryUtils.getNettyDirectMemoryUsage(); } + + @Override + public Pair getTotalAndMaxNettyOutboundBytes() { + if ( + !isServerStarted() || this.server.getScheduler() == null + || !(this.server instanceof NettyRpcServer) + ) { + return Pair.newPair(0L, 0L); + } + + return ((NettyRpcServer) server).getTotalAndMaxNettyOutboundBytes(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 0b7badf7d815..722ee1d28c91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.io.crypto.tls.X509Util; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; +import org.apache.hadoop.hbase.util.NettyUnsafeUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.yetus.audience.InterfaceAudience; @@ -53,6 +55,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; @@ -84,6 +87,38 @@ public class NettyRpcServer extends RpcServer { static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; static final String HEAP_ALLOCATOR_TYPE = "heap"; + /** + * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was + * exceeded, channel will have setAutoRead to true again. The server will start reading incoming + * bytes (requests) from the client channel. + */ + public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY = + "hbase.server.netty.writable.watermark.low"; + private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0; + + /** + * High watermark for pending outbound bytes of a single netty channel. If the number of pending + * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server + * will stop reading incoming requests from the client channel. + *

+ * Note: any requests already in the call queue will still be processed. + */ + public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY = + "hbase.server.netty.writable.watermark.high"; + private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0; + + /** + * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending + * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory + * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight + * requests. + *

+ * Note: must be higher than the high watermark, otherwise it's ignored. + */ + public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY = + "hbase.server.netty.writable.watermark.fatal"; + private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0; + private final InetSocketAddress bindAddress; private final CountDownLatch closed = new CountDownLatch(1); @@ -94,6 +129,9 @@ public class NettyRpcServer extends RpcServer { private final AtomicReference keyStoreWatcher = new AtomicReference<>(); private final AtomicReference trustStoreWatcher = new AtomicReference<>(); + private volatile int writeBufferFatalThreshold; + private volatile WriteBufferWaterMark writeBufferWaterMark; + public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { @@ -108,6 +146,10 @@ public NettyRpcServer(Server server, String name, List channelClass = config.serverChannelClass(); ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass) @@ -117,6 +159,7 @@ public NettyRpcServer(Server server, String name, List() { @Override protected void initChannel(Channel ch) throws Exception { + ch.config().setWriteBufferWaterMark(writeBufferWaterMark); ch.config().setAllocator(channelAllocator); ChannelPipeline pipeline = ch.pipeline(); FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); @@ -124,12 +167,18 @@ protected void initChannel(Channel ch) throws Exception { if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) { initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true)); } + NettyServerRpcConnection conn = createNettyServerRpcConnection(ch); pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder) - .addLast(createNettyRpcServerPreambleHandler()) + .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn)) // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may // send RpcResponse to client. - .addLast(NettyRpcServerResponseEncoder.NAME, - new NettyRpcServerResponseEncoder(metrics)); + .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics)) + // Add writability handler after the response encoder, so we can abort writes before + // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so + // that the handler configs can be live updated via update_config. + .addLast(NettyRpcServerChannelWritabilityHandler.NAME, + new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold, + () -> isWritabilityBackpressureEnabled())); } }); try { @@ -142,6 +191,91 @@ protected void initChannel(Channel ch) throws Exception { this.scheduler.init(new RpcSchedulerContext(this)); } + @Override + public void onConfigurationChange(Configuration newConf) { + super.onConfigurationChange(newConf); + configureNettyWatermarks(newConf); + } + + private void configureNettyWatermarks(Configuration conf) { + int watermarkLow = + conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT); + int watermarkHigh = + conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT); + int fatalThreshold = + conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT); + + WriteBufferWaterMark oldWaterMark = writeBufferWaterMark; + int oldFatalThreshold = writeBufferFatalThreshold; + + boolean disabled = false; + if (watermarkHigh == 0 && watermarkLow == 0) { + // if both are 0, use the netty default, which we will treat as "disabled". + // when disabled, we won't manage autoRead in response to writability changes. + writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; + disabled = true; + } else { + // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to + // 1 to avoid confusing behavior. + if (watermarkLow == 0) { + LOG.warn( + "Detected a {} value of 0, which is impossible to achieve " + + "due to how netty evaluates these thresholds, setting to 1", + CHANNEL_WRITABLE_LOW_WATERMARK_KEY); + watermarkLow = 1; + } + + // netty validates the watermarks and throws an exception if high < low, fail more gracefully + // by disabling the watermarks and warning. + if (watermarkHigh <= watermarkLow) { + LOG.warn( + "Detected {} value {}, lower than {} value {}. This will fail netty validation, " + + "so disabling", + CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY, + watermarkLow); + writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; + } else { + writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh); + } + + // only apply this check when watermark is enabled. this way we give the operator some + // flexibility if they want to try enabling fatal threshold without backpressure. + if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) { + LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.", + CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, + watermarkHigh); + fatalThreshold = 0; + } + } + + writeBufferFatalThreshold = fatalThreshold; + + if ( + oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low() + || oldWaterMark.high() != writeBufferWaterMark.high() + || oldFatalThreshold != writeBufferFatalThreshold) + ) { + LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}", + disabled ? "disabled" : writeBufferWaterMark.low(), + disabled ? "disabled" : writeBufferWaterMark.high(), + writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold); + } + + // update any existing channels + for (Channel channel : allChannels) { + channel.config().setWriteBufferWaterMark(writeBufferWaterMark); + // if disabling watermark, set auto read to true in case channel had been exceeding + // previous watermark + if (disabled) { + channel.config().setAutoRead(true); + } + } + } + + public boolean isWritabilityBackpressureEnabled() { + return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT; + } + private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException { final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY); if (value != null) { @@ -172,10 +306,10 @@ private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOExcept } } - // will be overriden in tests + // will be overridden in tests @InterfaceAudience.Private - protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { - return new NettyRpcServerPreambleHandler(NettyRpcServer.this); + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + return new NettyServerRpcConnection(NettyRpcServer.this, channel); } @Override @@ -296,4 +430,19 @@ SslContext getSslContext() throws X509Exception, IOException { } return result; } + + public int getWriteBufferFatalThreshold() { + return writeBufferFatalThreshold; + } + + public Pair getTotalAndMaxNettyOutboundBytes() { + long total = 0; + long max = 0; + for (Channel channel : allChannels) { + long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel); + total += outbound; + max = Math.max(max, outbound); + } + return Pair.newPair(total, max); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java new file mode 100644 index 000000000000..4b0b3878da81 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.function.BooleanSupplier; +import java.util.function.IntSupplier; +import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.NettyUnsafeUtils; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise; +import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; + +/** + * Handler to enforce writability protections on our server channels:
+ * - Responds to channel writability events, which are triggered when the total pending bytes for a + * channel passes configured high and low watermarks. When high watermark is exceeded, the channel + * is setAutoRead(false). This way, we won't accept new requests from the client until some pending + * outbound bytes are successfully received by the client.
+ * - Pre-processes any channel write requests. If the total pending outbound bytes exceeds a fatal + * threshold, the channel is forcefully closed and the write is set to failed. This handler should + * be the last handler in the pipeline so that it's the first handler to receive any messages sent + * to channel.write() or channel.writeAndFlush(). + */ +@InterfaceAudience.Private +public class NettyRpcServerChannelWritabilityHandler extends ChannelDuplexHandler { + + static final String NAME = "NettyRpcServerChannelWritabilityHandler"; + + private final MetricsHBaseServer metrics; + private final IntSupplier pendingBytesFatalThreshold; + private final BooleanSupplier isWritabilityBackpressureEnabled; + + private boolean writable = true; + private long unwritableStartTime; + + NettyRpcServerChannelWritabilityHandler(MetricsHBaseServer metrics, + IntSupplier pendingBytesFatalThreshold, BooleanSupplier isWritabilityBackpressureEnabled) { + this.metrics = metrics; + this.pendingBytesFatalThreshold = pendingBytesFatalThreshold; + this.isWritabilityBackpressureEnabled = isWritabilityBackpressureEnabled; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (handleFatalThreshold(ctx)) { + promise.setFailure( + new ConnectionClosedException("Channel outbound bytes exceeded fatal threshold")); + if (msg instanceof RpcResponse) { + ((RpcResponse) msg).done(); + } else { + ReferenceCountUtil.release(msg); + } + return; + } + ctx.write(msg, promise); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + if (isWritabilityBackpressureEnabled.getAsBoolean()) { + handleWritabilityChanged(ctx); + } + ctx.fireChannelWritabilityChanged(); + } + + private boolean handleFatalThreshold(ChannelHandlerContext ctx) { + int fatalThreshold = pendingBytesFatalThreshold.getAsInt(); + if (fatalThreshold <= 0) { + return false; + } + + Channel channel = ctx.channel(); + long outboundBytes = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel); + if (outboundBytes < fatalThreshold) { + return false; + } + + if (channel.isOpen()) { + metrics.maxOutboundBytesExceeded(); + RpcServer.LOG.warn( + "{}: Closing connection because outbound buffer size of {} exceeds fatal threshold of {}", + channel.remoteAddress(), outboundBytes, fatalThreshold); + NettyUnsafeUtils.closeImmediately(channel); + } + + return true; + } + + private void handleWritabilityChanged(ChannelHandlerContext ctx) { + boolean oldWritableValue = this.writable; + + this.writable = ctx.channel().isWritable(); + ctx.channel().config().setAutoRead(this.writable); + + if (!oldWritableValue && this.writable) { + // changing from not writable to writable, update metrics + metrics.unwritableTime(EnvironmentEdgeManager.currentTime() - unwritableStartTime); + unwritableStartTime = 0; + } else if (oldWritableValue && !this.writable) { + // changing from writable to non-writable, set start time + unwritableStartTime = EnvironmentEdgeManager.currentTime(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java index 8269bbc60d88..b79a67f986e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java @@ -22,7 +22,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; -import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; @@ -38,14 +37,15 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler static final String DECODER_NAME = "preambleDecoder"; private final NettyRpcServer rpcServer; + private final NettyServerRpcConnection conn; - public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) { + public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer, NettyServerRpcConnection conn) { this.rpcServer = rpcServer; + this.conn = conn; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - NettyServerRpcConnection conn = createNettyServerRpcConnection(ctx.channel()); ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes()); msg.readBytes(buf); buf.flip(); @@ -76,9 +76,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.channel().remoteAddress(), cause); NettyFutureUtils.safeClose(ctx); } - - // will be overridden in tests - protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { - return new NettyServerRpcConnection(rpcServer, channel); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java index fd0c6d75d888..4f0540da80a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java @@ -54,6 +54,6 @@ class NettyServerCall extends ServerCall { public synchronized void sendResponseIfReady() throws IOException { // set param null to reduce memory pressure this.param = null; - connection.channel.writeAndFlush(this); + connection.doRespond(this); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java index d5c408c23874..da4f70e3a247 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java @@ -49,12 +49,7 @@ public void processRequest(ByteBuff buf) throws IOException, InterruptedExceptio } @Override - protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { - return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) { - @Override - protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { - return new FailingConnection(FailingNettyRpcServer.this, channel); - } - }; + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + return new FailingConnection(FailingNettyRpcServer.this, channel); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java index 6e5dfe87fc7b..7170413bee90 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.ipc; +import org.apache.hadoop.hbase.util.Pair; + public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper { @Override public long getTotalQueueSize() { @@ -127,4 +129,9 @@ public int getMetaPriorityQueueLength() { public int getActiveMetaPriorityRpcHandlerCount() { return 1; } + + @Override + public Pair getTotalAndMaxNettyOutboundBytes() { + return Pair.newPair(100L, 5L); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java new file mode 100644 index 000000000000..001f6dbd22c7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CompatibilityFactory; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.test.MetricsAssertHelper; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; + +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos; + +@Category({ RPCTests.class, MediumTests.class }) +public class TestNettyChannelWritability { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNettyChannelWritability.class); + + private static final MetricsAssertHelper METRICS_ASSERT = + CompatibilityFactory.getInstance(MetricsAssertHelper.class); + + private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); + private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + + /** + * Test that we properly send configured watermarks to netty, and trigger setWritable when + * necessary. + */ + @Test + public void testNettyWritableWatermarks() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1); + conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2); + + NettyRpcServer rpcServer = createRpcServer(conf, 0); + try { + sendAndReceive(conf, rpcServer, 5); + METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0, + rpcServer.metrics.getMetricsSource()); + } finally { + rpcServer.stop(); + } + } + + /** + * Test that our fatal watermark is honored, which requires artificially causing some queueing so + * that pendingOutboundBytes increases. + */ + @Test + public void testNettyWritableFatalThreshold() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1); + + // flushAfter is 3 here, with requestCount 5 below. If we never flush, the WriteTasks will sit + // in the eventloop. So we flush a few at once, which will ensure that we hit fatal threshold + NettyRpcServer rpcServer = createRpcServer(conf, 3); + try { + CompletionException exception = + assertThrows(CompletionException.class, () -> sendAndReceive(conf, rpcServer, 5)); + assertTrue(exception.getCause().getCause() instanceof ServiceException); + METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0, + rpcServer.metrics.getMetricsSource()); + } finally { + rpcServer.stop(); + } + } + + private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, int requestCount) + throws Exception { + List cells = new ArrayList<>(); + int count = 3; + for (int i = 0; i < count; i++) { + cells.add(CELL); + } + + try (NettyRpcClient client = new NettyRpcClient(conf)) { + rpcServer.start(); + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = + newBlockingStub(client, rpcServer.getListenerAddress()); + CompletableFuture[] futures = new CompletableFuture[requestCount]; + for (int i = 0; i < requestCount; i++) { + futures[i] = CompletableFuture.runAsync(() -> { + try { + sendMessage(cells, stub); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + CompletableFuture.allOf(futures).join(); + } + } + + private void sendMessage(List cells, + TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws Exception { + HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()) + .getMessage()); + int index = 0; + CellScanner cellScanner = pcrc.cellScanner(); + assertNotNull(cellScanner); + while (cellScanner.advance()) { + assertEquals(CELL, cellScanner.current()); + index++; + } + assertEquals(cells.size(), index); + } + + private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) throws IOException { + String name = "testRpcServer"; + ArrayList services = + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)); + + InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0); + FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1); + + AtomicInteger writeCount = new AtomicInteger(0); + + return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true) { + @Override + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + return new NettyServerRpcConnection(this, channel) { + @Override + protected void doRespond(RpcResponse resp) { + if (writeCount.incrementAndGet() >= flushAfter) { + super.doRespond(resp); + } else { + channel.write(resp); + } + } + }; + } + }; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java index 288bb3fe2624..c55568d392ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java @@ -89,6 +89,9 @@ public void testWrapperSource() { HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource); HELPER.assertGauge("numCallsInReadQueue", 50, serverSource); HELPER.assertGauge("numCallsInScanQueue", 2, serverSource); + HELPER.assertGauge("nettyDirectMemoryUsage", 100, serverSource); + HELPER.assertGauge("nettyTotalPendingOutboundBytes", 100, serverSource); + HELPER.assertGauge("nettyMaxPendingOutboundBytes", 5, serverSource); } /** @@ -100,6 +103,12 @@ public void testSourceMethods() { new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub()); MetricsHBaseServerSource serverSource = mrpc.getMetricsSource(); + mrpc.unwritableTime(100); + mrpc.maxOutboundBytesExceeded(); + mrpc.maxOutboundBytesExceeded(); + HELPER.assertCounter("maxOutboundBytesExceeded", 2, serverSource); + HELPER.assertCounter("unwritableTime_NumOps", 1, serverSource); + for (int i = 0; i < 12; i++) { mrpc.authenticationFailure(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java index 9f6b7d54430b..bc791754a12e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java @@ -28,7 +28,7 @@ import java.io.File; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -49,9 +49,7 @@ import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.Channel; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; @@ -131,29 +129,15 @@ public void test() throws Exception { .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL); SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock); - final AtomicBoolean useSaslRef = new AtomicBoolean(false); + final AtomicReference conn = new AtomicReference<>(null); NettyRpcServer rpcServer = new NettyRpcServer(null, getClass().getSimpleName(), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(HOST, 0), serverConf, new FifoRpcScheduler(serverConf, 1), true) { @Override - protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { - return new NettyRpcServerPreambleHandler(this) { - private NettyServerRpcConnection conn; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - super.channelRead0(ctx, msg); - useSaslRef.set(conn.useSasl); - - } - - @Override - protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { - conn = super.createNettyServerRpcConnection(channel); - return conn; - } - }; + protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) { + conn.set(super.createNettyServerRpcConnection(channel)); + return conn.get(); } }; @@ -167,7 +151,7 @@ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channe stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage("test").build()) .getMessage(); assertTrue("test".equals(response)); - assertFalse(useSaslRef.get()); + assertFalse(conn.get().useSasl); } finally { rpcServer.stop();