Skip to content

Commit

Permalink
HBASE-26036 DBB released too early in HRegion.get() and dirty data fo…
Browse files Browse the repository at this point in the history
…r some operations (#3436) (#3486)

Signed-off-by: Michael Stack <[email protected]>
  • Loading branch information
sunhelly authored Jul 14, 2021
1 parent 12f2a16 commit 692d384
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,6 +75,13 @@ public class ByteBuffAllocator {

public static final String MIN_ALLOCATE_SIZE_KEY = "hbase.server.allocator.minimal.allocate.size";

/**
* Set an alternate bytebuffallocator by setting this config,
* e.g. we can config {@link DeallocateRewriteByteBuffAllocator} to find out
* prematurely release issues
*/
public static final String BYTEBUFF_ALLOCATOR_CLASS = "hbase.bytebuff.allocator.class";

/**
* @deprecated since 2.3.0 and will be removed in 4.0.0. Use
* {@link ByteBuffAllocator#ALLOCATOR_POOL_ENABLED_KEY} instead.
Expand Down Expand Up @@ -121,8 +129,8 @@ public interface Recycler {
void free();
}

private final boolean reservoirEnabled;
private final int bufSize;
protected final boolean reservoirEnabled;
protected final int bufSize;
private final int maxBufCount;
private final AtomicInteger usedBufCount = new AtomicInteger(0);

Expand Down Expand Up @@ -173,7 +181,9 @@ public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnab
conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2);
int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6);
return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
Class<?> clazz = conf.getClass(BYTEBUFF_ALLOCATOR_CLASS, ByteBuffAllocator.class);
return (ByteBuffAllocator) ReflectionUtils
.newInstance(clazz, true, maxBuffCount, poolBufSize, minSizeForReservoirUse);
} else {
return HEAP;
}
Expand All @@ -188,8 +198,8 @@ private static ByteBuffAllocator createOnHeap() {
return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE);
}

ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) {
protected ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) {
this.reservoirEnabled = reservoirEnabled;
this.maxBufCount = maxBufCount;
this.bufSize = bufSize;
Expand Down Expand Up @@ -381,7 +391,7 @@ private ByteBuffer getBuffer() {
* Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning.
* @param buf ByteBuffer to return.
*/
private void putbackBuffer(ByteBuffer buf) {
protected void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;

import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A ByteBuffAllocator that rewrite the bytebuffers right after released.
* It can be used for test whether there are prematurely releasing backing bytebuffers.
*/
@InterfaceAudience.Private
public class DeallocateRewriteByteBuffAllocator extends ByteBuffAllocator {
private static final Logger LOG = LoggerFactory.getLogger(
DeallocateRewriteByteBuffAllocator.class);

DeallocateRewriteByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize,
int minSizeForReservoirUse) {
super(reservoirEnabled, maxBufCount, bufSize, minSizeForReservoirUse);
}

@Override
protected void putbackBuffer(ByteBuffer buf) {
if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) {
LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored");
return;
}
buf.clear();
byte[] tmp = generateTmpBytes(buf.capacity());
buf.put(tmp, 0, tmp.length);
super.putbackBuffer(buf);
}

private byte[] generateTmpBytes(int length) {
StringBuilder result = new StringBuilder();
while (result.length() < length) {
result.append("-");
}
return Bytes.toBytes(result.substring(0, length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
Expand All @@ -47,6 +48,7 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -221,22 +223,27 @@ private boolean matches(Region region, ClientProtos.Condition condition) throws
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}

List<Cell> result = region.get(get, false);
boolean matches = false;
if (filter != null) {
if (!result.isEmpty()) {
matches = true;
}
} else {
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
if (result.isEmpty() && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
matches = true;
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
matches = matches(op, compareResult);
try (RegionScanner scanner = region.getScanner(new Scan(get))) {
// NOTE: Please don't use HRegion.get() instead,
// because it will copy cells to heap. See HBASE-26036
List<Cell> result = new ArrayList<>();
scanner.next(result);
if (filter != null) {
if (!result.isEmpty()) {
matches = true;
}
} else {
boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0;
if (result.isEmpty() && valueIsNull) {
matches = true;
} else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) {
matches = true;
} else if (result.size() == 1 && !valueIsNull) {
Cell kv = result.get(0);
int compareResult = PrivateCellUtil.compareValue(kv, comparator);
matches = matches(op, compareResult);
}
}
}
return matches;
Expand Down
Loading

0 comments on commit 692d384

Please sign in to comment.