Skip to content

Commit

Permalink
HBASE-23221 Polish the WAL interface after HBASE-23181 (apache#774)
Browse files Browse the repository at this point in the history
Removes the closeRegion flag added by HBASE-23181 and instead
relies on reading meta WALEdit content. Modified how qualifier is
written when the meta WALEdit is for a RegionEventDescriptor
so the 'type' is added to the qualifer so can figure type
w/o having to deserialize protobuf value content: e.g.
HBASE::REGION_EVENT::REGION_CLOSE

Added doc on WALEdit and tried to formalize the 'meta' WALEdit
type and how it works. Needs complete redo in part as suggested
by HBASE-8457. Meantime, some doc and cleanup.

Also changed the LogRoller constructor to remove redundant param.
Because of constructor change, need to change also
TestFailedAppendAndSync, TestWALLockup, TestAsyncFSWAL &
WALPerformanceEvaluation.java

Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Lijin Bin <[email protected]>
  • Loading branch information
saintstack authored and Apache9 committed Oct 31, 2019
1 parent ff1cf69 commit 702f429
Show file tree
Hide file tree
Showing 28 changed files with 315 additions and 235 deletions.
13 changes: 10 additions & 3 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ public static boolean matchingRow(final Cell left, final Cell right) {

/**
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Instead use
* {@link #matchingRows(Cell, byte[]))}
* {@link #matchingRows(Cell, byte[])}
*/
@Deprecated
public static boolean matchingRow(final Cell left, final byte[] buf) {
Expand Down Expand Up @@ -894,8 +894,15 @@ public static boolean matchingQualifier(final Cell left, final byte[] buf, final
}

public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
if (!matchingFamily(left, fam)) return false;
return matchingQualifier(left, qual);
return matchingFamily(left, fam) && matchingQualifier(left, qual);
}

/**
* @return True if matching column family and the qualifier starts with <code>qual</code>
*/
public static boolean matchingColumnFamilyAndQualifierPrefix(final Cell left, final byte[] fam,
final byte[] qual) {
return matchingFamily(left, fam) && PrivateCellUtil.qualifierStartsWith(left, qual);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,10 @@ public void map(WALKey key, WALEdit value, Context context)
Delete del = null;
Cell lastCell = null;
for (Cell cell : value.getCells()) {
// filtering WAL meta entries
// Filtering WAL meta marker entries.
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}

// Allow a subclass filter out this cell.
if (filter(context, cell)) {
// A WALEdit may contain multiple operations (HBASE-3584) and/or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3604,7 +3604,7 @@ public List<ReplicationPeerDescription> listReplicationPeers(String regex)
if (cpHost != null) {
cpHost.preListReplicationPeers(regex);
}
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
Pattern pattern = regex == null ? null : Pattern.compile(regex);
List<ReplicationPeerDescription> peers =
this.replicationPeerManager.listPeers(pattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,7 +1797,7 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {

status.setStatus("Writing region close event to WAL");
// Always write close marker to wal even for read only table. This is not a big problem as we
// do not write any data into the region.
// do not write any data into the region; it is just a meta edit in the WAL file.
if (!abort && wal != null && getRegionServerServices() != null &&
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
writeRegionCloseMarker(wal);
Expand Down Expand Up @@ -2780,7 +2780,8 @@ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId
}
}
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
LOG.info("Flushing " + storesToFlush.size() + "/" + stores.size() + " column families," +
LOG.info("Flushing " + this.getRegionInfo().getEncodedName() + " " +
storesToFlush.size() + "/" + stores.size() + " column families," +
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
Expand Down Expand Up @@ -4986,7 +4987,7 @@ private long replayRecoveredEdits(final Path edits,
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (WALEdit.isMetaEditFamily(cell)) {
// if region names don't match, skipp replaying compaction marker
if (!checkRowWithinBoundary) {
//this is a special edit, we should handle it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ private void startServices() throws IOException {
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}

this.walRoller = new LogRoller(this, this);
this.walRoller = new LogRoller(this);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
this.procedureResultReporter = new RemoteProcedureResultReporter(this);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
Expand Down Expand Up @@ -58,7 +57,6 @@
public class LogRoller extends HasThread implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
private final Server server;
protected final RegionServerServices services;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
Expand Down Expand Up @@ -101,16 +99,14 @@ public void requestRollAll() {
}
}

/** @param server */
public LogRoller(final Server server, final RegionServerServices services) {
public LogRoller(RegionServerServices services) {
super("LogRoller");
this.server = server;
this.services = services;
this.rollPeriod = this.server.getConfiguration().
this.rollPeriod = this.services.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
this.threadWakeFrequency = this.services.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}

Expand Down Expand Up @@ -146,7 +142,7 @@ private void abort(String reason, Throwable cause) {
LOG.warn("Failed to shutdown wal", e);
}
}
server.abort(reason, cause);
this.services.abort(reason, cause);
}

@Override
Expand All @@ -158,7 +154,7 @@ public void run() {
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
// Time for periodic roll, fall through
LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
private static final class WalProps {

/**
* Map the encoded region name to the highest sequence id. Contain all the regions it has
* entries of
* Map the encoded region name to the highest sequence id.
* <p/>Contains all the regions it has an entry for.
*/
public final Map<byte[], Long> encodedName2HighestSequenceId;

Expand Down Expand Up @@ -611,9 +611,9 @@ public int getNumLogFiles() {
}

/**
* If the number of un-archived WAL files is greater than maximum allowed, check the first
* (oldest) WAL file, and returns those regions which should be flushed so that it can be
* archived.
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
* check the first (oldest) WAL, and return those regions which should be flushed so that
* it can be let-go/'archived'.
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
*/
byte[][] findRegionsToForceFlush() throws IOException {
Expand Down Expand Up @@ -888,10 +888,6 @@ public void close() throws IOException {
/**
* updates the sequence number of a specific store. depending on the flag: replaces current seq
* number if the given seq id is bigger, or even if it is lower than existing one
* @param encodedRegionName
* @param familyName
* @param sequenceid
* @param onlyIfGreater
*/
@Override
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
Expand Down Expand Up @@ -1015,7 +1011,7 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
}

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
Expand All @@ -1029,7 +1025,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down Expand Up @@ -1067,13 +1063,13 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {

@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true, false);
return append(info, key, edits, true);
}

@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
throws IOException {
return append(info, key, edits, false, closeRegion);
return append(info, key, edits, false);
}

/**
Expand All @@ -1097,17 +1093,17 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param inMemstore Always true except for case where we are writing a region event marker, for
* example, a compaction completion record into the WAL; in this case the entry is just
* so we can finish an unfinished compaction -- it is not an edit for memstore.
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
* region on this region server. The WAL implementation should remove all the related
* stuff, for example, the sequence id accounting.
* @param inMemstore Always true except for case where we are writing a region event meta
* marker edit, for example, a compaction completion record into the WAL or noting a
* Region Open event. In these cases the entry is just so we can finish an unfinished
* compaction after a crash when the new Server reads the WAL on recovery, etc. These
* transition event 'Markers' do not go via the memstore. When memstore is false,
* we presume a Marker event edit.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException;
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException;

protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,12 +611,12 @@ protected boolean markerEditOnly() {
}

@Override
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
if (markerEditOnly() && !edits.isMetaEdit()) {
throw new IOException("WAL is closing, only marker edit is allowed");
}
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ protected void doShutdown() throws IOException {

@Override
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore, boolean closeRegion) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
final boolean inMemstore) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
disruptor.getRingBuffer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
* region sequence id (we want to use this later, just before we write the WAL to ensure region
* region sequenceid (we want to use this later, just before we write the WAL to ensure region
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
* the assign of the region sequence id. See #stampRegionSequenceId().
Expand All @@ -50,25 +50,40 @@ class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long txid;

/**
* If false, means this is a meta edit written by the hbase system itself. It was not in
* memstore. HBase uses these edit types to note in the log operational transitions such
* as compactions, flushes, or region open/closes.
*/
private final transient boolean inMemstore;

/**
* Set if this is a meta edit and it is of close region type.
*/
private final transient boolean closeRegion;

private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient ServerCall<?> rpcCall;

/**
* @param inMemstore If true, then this is a data edit, one that came from client. If false, it
* is a meta edit made by the hbase system itself and is for the WAL only.
*/
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.closeRegion = closeRegion;
this.closeRegion = !inMemstore && edit.isRegionCloseMarker();
this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
Set<byte[]> families = edit.getFamilies();
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
} else {
this.familyNames = Collections.<byte[]> emptySet();
this.familyNames = Collections.emptySet();
}
this.rpcCall = rpcCall;
if (rpcCall != null) {
Expand All @@ -83,7 +98,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
} else {
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (!WALEdit.isMetaEditFamily(cell)) {
set.add(CellUtil.cloneFamily(cell));
}
}
Expand All @@ -94,7 +109,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
@Override
public String toString() {
return "sequence=" + this.txid + ", " + super.toString();
};
}

boolean isInMemStore() {
return this.inMemstore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@
import com.xiaomi.infra.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* <p>
* Accounting of sequence ids per region and then by column family. So we can our accounting
* Accounting of sequence ids per region and then by column family. So we can keep our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append.
* </p>
* <p>
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
* For the implementation, we assume that all the {@code encodedRegionName} passed in are gotten by
* {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
* it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
* hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
Expand All @@ -53,8 +51,8 @@
*/
@InterfaceAudience.Private
class SequenceIdAccounting {

private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);

/**
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
Expand Down Expand Up @@ -110,7 +108,6 @@ class SequenceIdAccounting {

/**
* Returns the lowest unflushed sequence id for the region.
* @param encodedRegionName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
* return {@link HConstants#NO_SEQNUM} when none.
*/
Expand All @@ -125,8 +122,6 @@ long getLowestSequenceId(final byte[] encodedRegionName) {
}

/**
* @param encodedRegionName
* @param familyName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
* <code>familyName</code>. Returned sequenceid may be for an edit currently being
* flushed.
Expand Down
Loading

0 comments on commit 702f429

Please sign in to comment.