Skip to content

Commit

Permalink
HBASE-22539 WAL corruption due to early DBBs re-use when Durability.A…
Browse files Browse the repository at this point in the history
…SYNC_WAL is used (#437)

Signed-off-by: Zheng Hu <[email protected]>
  • Loading branch information
Apache9 authored Aug 5, 2019
1 parent f6ece8d commit 66a2fc5
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
Expand Down Expand Up @@ -51,7 +51,7 @@
* the result.
*/
@InterfaceAudience.Private
abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {
public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, RpcResponse {

protected final int id; // the client's call id
protected final BlockingService service;
Expand Down Expand Up @@ -91,6 +91,12 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;

// This is a dirty hack to address HBASE-22539. The lowest bit is for normal rpc cleanup, and the
// second bit is for WAL reference. We can only call release if both of them are zero. The reason
// why we can not use a general reference counting is that, we may call cleanup multiple times in
// the current implementation. We should fix this in the future.
private final AtomicInteger reference = new AtomicInteger(0b01);

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Expand Down Expand Up @@ -141,14 +147,43 @@ public void done() {
cleanup();
}

private void release(int mask) {
for (;;) {
int ref = reference.get();
if ((ref & mask) == 0) {
return;
}
int nextRef = ref & (~mask);
if (reference.compareAndSet(ref, nextRef)) {
if (nextRef == 0) {
if (this.reqCleanup != null) {
this.reqCleanup.run();
}
}
return;
}
}
}

@Override
public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
release(0b01);
}

public void retainByWAL() {
for (;;) {
int ref = reference.get();
int nextRef = ref | 0b10;
if (reference.compareAndSet(ref, nextRef)) {
return;
}
}
}

public void releaseByWAL() {
release(0b10);
}

@Override
public String toString() {
return toShortString() + " param: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -971,7 +973,7 @@ boolean isUnflushedEntries() {
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
*/
@VisibleForTesting
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
// Noop
}

Expand Down Expand Up @@ -1061,8 +1063,10 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ private void syncFailed(long epochWhenSync, Throwable error) {
private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) {
highestSyncedTxid.set(processedTxid);
for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {
if (iter.next().getTxid() <= processedTxid) {
FSWALEntry entry = iter.next();
if (entry.getTxid() <= processedTxid) {
entry.release();
iter.remove();
} else {
break;
Expand Down Expand Up @@ -487,6 +489,7 @@ private void drainNonMarkerEditsAndFailSyncs() {
while (iter.hasNext()) {
FSWALEntry entry = iter.next();
if (!entry.getEdit().isMetaEdit()) {
entry.release();
hasNonMarkerEdits = true;
break;
}
Expand All @@ -497,7 +500,10 @@ private void drainNonMarkerEditsAndFailSyncs() {
if (!iter.hasNext()) {
break;
}
iter.next();
iter.next().release();
}
for (FSWALEntry entry : unackedAppends) {
entry.release();
}
unackedAppends.clear();
// fail the sync futures which are under the txid of the first remaining edit, if none, fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
Expand All @@ -39,7 +38,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -64,6 +62,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
Expand Down Expand Up @@ -985,7 +984,6 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
//TODO handle htrace API change, see HBASE-18895
//TraceScope scope = Trace.continueSpan(entry.detachSpan());
try {

if (this.exception != null) {
// Return to keep processing events coming off the ringbuffer
return;
Expand All @@ -1002,6 +1000,8 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
: new DamagedWALException("On sync", this.exception));
// Return to keep processing events coming off the ringbuffer
return;
} finally {
entry.release();
}
} else {
// What is this if not an append or sync. Fail all up to this!!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static java.util.stream.Collectors.toCollection;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand All @@ -56,19 +54,24 @@ class FSWALEntry extends Entry {
private final transient boolean inMemstore;
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient Optional<ServerCall<?>> rpcCall;

FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
final RegionInfo regionInfo, final boolean inMemstore) {
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
Set<byte []> families = edit.getFamilies();
this.familyNames = families != null? families: collectFamilies(edit.getCells());
Set<byte[]> families = edit.getFamilies();
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
} else {
this.familyNames = Collections.<byte[]>emptySet();
this.familyNames = Collections.<byte[]> emptySet();
}
this.rpcCall = Optional.ofNullable(rpcCall);
if (rpcCall != null) {
rpcCall.retainByWAL();
}
}

Expand All @@ -77,12 +80,13 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
if (CollectionUtils.isEmpty(cells)) {
return Collections.emptySet();
} else {
return cells.stream()
.filter(v -> !CellUtil.matchingFamily(v, WALEdit.METAFAMILY))
.collect(toCollection(() -> new TreeSet<>(CellComparator.getInstance()::compareFamilies)))
.stream()
.map(CellUtil::cloneFamily)
.collect(toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
set.add(CellUtil.cloneFamily(cell));
}
}
return set;
}
}

Expand Down Expand Up @@ -129,4 +133,8 @@ long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws
Set<byte[]> getFamilyNames() {
return familyNames;
}

void release() {
rpcCall.ifPresent(ServerCall::releaseByWAL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,9 +1156,8 @@ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, Environ
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
rowName, family, ee, index), hri, true);
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
createWALEdit(rowName, family, ee, index), hri, true, null);
entry.stampRegionSequenceId(mvcc.begin());
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String logDir
failIfWALExists, prefix, suffix, GROUP, CHANNEL_CLASS) {

@Override
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected AbstractFSWAL<?> newSlowWAL(FileSystem fs, Path rootDir, String walDir
prefix, suffix) {

@Override
void atHeadOfRingBufferEventHandlerAppend() {
protected void atHeadOfRingBufferEventHandlerAppend() {
action.run();
super.atHeadOfRingBufferEventHandlerAppend();
}
Expand Down
Loading

0 comments on commit 66a2fc5

Please sign in to comment.