Skip to content

Commit

Permalink
HBASE-27947 RegionServer OOM when outbound channel backed up (apache#…
Browse files Browse the repository at this point in the history
…5350)

Signed-off-by: Duo Zhang <[email protected]>
Reviewed-by: Norman Maurer <[email protected]>
  • Loading branch information
bbeaudreault authored Aug 18, 2023
1 parent b1fd92d commit 8ccb910
Show file tree
Hide file tree
Showing 15 changed files with 619 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundBuffer;

/**
* Wraps some usages of netty's unsafe API, for ease of maintainability.
*/
@InterfaceAudience.Private
public final class NettyUnsafeUtils {

private NettyUnsafeUtils() {
}

/**
* Directly closes the channel, setting SO_LINGER to 0 and skipping any handlers in the pipeline.
* This is useful for cases where it's important to immediately close without any delay.
* Otherwise, pipeline handlers and even general TCP flows can cause a normal close to take
* upwards of a few second or more. This will likely cause the client side to see either a
* "Connection reset by peer" or unexpected ConnectionClosedException.
* <p>
* <b>It's necessary to call this from within the channel's eventLoop!</b>
*/
public static void closeImmediately(Channel channel) {
assert channel.eventLoop().inEventLoop();
channel.config().setOption(ChannelOption.SO_LINGER, 0);
channel.unsafe().close(channel.voidPromise());
}

/**
* Get total bytes pending write to socket
*/
public static long getTotalPendingOutboundBytes(Channel channel) {
ChannelOutboundBuffer outboundBuffer = channel.unsafe().outboundBuffer();
// can be null when the channel is closing
if (outboundBuffer == null) {
return 0;
}
return outboundBuffer.totalPendingWriteBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String PROCESS_CALL_TIME_DESC = "Processing call time.";
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";

String UNWRITABLE_TIME_NAME = "unwritableTime";
String UNWRITABLE_TIME_DESC =
"Time where an channel was unwritable due to having too many outbound bytes";
String MAX_OUTBOUND_BYTES_EXCEEDED_NAME = "maxOutboundBytesExceeded";
String MAX_OUTBOUND_BYTES_EXCEEDED_DESC =
"Number of times a connection was closed because the channel outbound "
+ "bytes exceeded the configured max.";
String QUEUE_SIZE_NAME = "queueSize";
String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and "
+ "parsed and is waiting to run or is currently being executed.";
Expand Down Expand Up @@ -97,6 +105,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage";

String NETTY_DM_USAGE_DESC = "Current Netty direct memory usage.";
String NETTY_TOTAL_PENDING_OUTBOUND_NAME = "nettyTotalPendingOutboundBytes";
String NETTY_TOTAL_PENDING_OUTBOUND_DESC = "Current total bytes pending write to all channel";
String NETTY_MAX_PENDING_OUTBOUND_NAME = "nettyMaxPendingOutboundBytes";
String NETTY_MAX_PENDING_OUTBOUND_DESC = "Current maximum bytes pending write to any channel";

void authorizationSuccess();

Expand All @@ -121,4 +133,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
void processedCall(int processingTime);

void queuedAndProcessedCall(int totalTime);

void unwritableTime(long unwritableTime);

void maxOutboundBytesExceeded();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
Expand All @@ -36,10 +37,12 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
private final MutableFastCounter authenticationFallbacks;
private final MutableFastCounter sentBytes;
private final MutableFastCounter receivedBytes;
private final MutableFastCounter maxOutboundBytesExceeded;

private MetricHistogram queueCallTime;
private MetricHistogram processCallTime;
private MetricHistogram totalCallTime;
private MetricHistogram unwritableTime;
private MetricHistogram requestSize;
private MetricHistogram responseSize;

Expand Down Expand Up @@ -67,6 +70,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, String metricsDescriptio
this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, PROCESS_CALL_TIME_DESC);
this.totalCallTime =
this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, TOTAL_CALL_TIME_DESC);
this.unwritableTime =
this.getMetricsRegistry().newTimeHistogram(UNWRITABLE_TIME_NAME, UNWRITABLE_TIME_DESC);
this.maxOutboundBytesExceeded = this.getMetricsRegistry()
.newCounter(MAX_OUTBOUND_BYTES_EXCEEDED_NAME, MAX_OUTBOUND_BYTES_EXCEEDED_DESC, 0);
this.requestSize =
this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, REQUEST_SIZE_DESC);
this.responseSize =
Expand Down Expand Up @@ -133,6 +140,16 @@ public void queuedAndProcessedCall(int totalTime) {
totalCallTime.add(totalTime);
}

@Override
public void unwritableTime(long unwritableTime) {
this.unwritableTime.add(unwritableTime);
}

@Override
public void maxOutboundBytesExceeded() {
maxOutboundBytesExceeded.incr();
}

@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
Expand Down Expand Up @@ -177,6 +194,13 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
wrapper.getActiveScanRpcHandlerCount())
.addGauge(Interns.info(NETTY_DM_USAGE_NAME, NETTY_DM_USAGE_DESC),
wrapper.getNettyDmUsage());

Pair<Long, Long> totalAndMax = wrapper.getTotalAndMaxNettyOutboundBytes();
mrb.addGauge(
Interns.info(NETTY_TOTAL_PENDING_OUTBOUND_NAME, NETTY_TOTAL_PENDING_OUTBOUND_DESC),
totalAndMax.getFirst());
mrb.addGauge(Interns.info(NETTY_MAX_PENDING_OUTBOUND_NAME, NETTY_MAX_PENDING_OUTBOUND_DESC),
totalAndMax.getSecond());
}

metricsRegistry.snapshot(mrb, all);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand Down Expand Up @@ -64,4 +65,10 @@ public interface MetricsHBaseServerWrapper {
int getActiveScanRpcHandlerCount();

long getNettyDmUsage();

/**
* These two metrics are calculated together, so we want to return them in one call
* @return pair containing total (first) and max (second) pending outbound bytes.
*/
Pair<Long, Long> getTotalAndMaxNettyOutboundBytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ void totalCall(int totalTime) {
source.queuedAndProcessedCall(totalTime);
}

void unwritableTime(long unwritableTime) {
source.unwritableTime(unwritableTime);
}

void maxOutboundBytesExceeded() {
source.maxOutboundBytesExceeded();
}

public void exception(Throwable throwable) {
source.exception();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
Expand Down Expand Up @@ -209,4 +210,16 @@ public long getNettyDmUsage() {

return DirectMemoryUtils.getNettyDirectMemoryUsage();
}

@Override
public Pair<Long, Long> getTotalAndMaxNettyOutboundBytes() {
if (
!isServerStarted() || this.server.getScheduler() == null
|| !(this.server instanceof NettyRpcServer)
) {
return Pair.newPair(0L, 0L);
}

return ((NettyRpcServer) server).getTotalAndMaxNettyOutboundBytes();
}
}
Loading

0 comments on commit 8ccb910

Please sign in to comment.