Skip to content

Commit

Permalink
HBASE-18158 Two running in-memory compaction threads may lose data fo…
Browse files Browse the repository at this point in the history
…r flushing
  • Loading branch information
chia7712 committed Jun 7, 2017
1 parent 9329a18 commit 9c8c749
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ public CompactingMemStore(Configuration conf, CellComparator c,
this.store = store;
this.regionServices = regionServices;
this.pipeline = new CompactionPipeline(getRegionServices());
this.compactor = new MemStoreCompactor(this, compactionPolicy);
this.compactor = createMemStoreCompactor(compactionPolicy);
initInmemoryFlushSize(conf);
}

@VisibleForTesting
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
return new MemStoreCompactor(this, compactionPolicy);
}

private void initInmemoryFlushSize(Configuration conf) {
long memstoreFlushSize = getRegionServices().getMemstoreFlushSize();
int numStores = getRegionServices().getNumStores();
Expand Down Expand Up @@ -410,7 +415,8 @@ private ThreadPoolExecutor getPool() {
return getRegionServices().getInMemoryCompactionPool();
}

private boolean shouldFlushInMemory() {
@VisibleForTesting
protected boolean shouldFlushInMemory() {
if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
if (inWalReplay) { // when replaying edits from WAL there is no need in in-memory flush
return false; // regardless the size
Expand All @@ -430,7 +436,6 @@ private boolean shouldFlushInMemory() {
private void stopCompaction() {
if (inMemoryFlushInProgress.get()) {
compactor.stop();
inMemoryFlushInProgress.set(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
Expand Down Expand Up @@ -102,6 +104,7 @@
import org.mockito.Mockito;

import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Test class for the Store
Expand Down Expand Up @@ -1213,6 +1216,59 @@ public void testReclaimChunkWhenScaning() throws IOException {
}
}

/**
* If there are two running InMemoryFlushRunnable, the later InMemoryFlushRunnable
* may change the versionedList. And the first InMemoryFlushRunnable will use the chagned
* versionedList to remove the corresponding segments.
* In short, there will be some segements which isn't in merge are removed.
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=30000)
public void testRunDoubleMemStoreCompactors() throws IOException, InterruptedException {
int flushSize = 500;
Configuration conf = HBaseConfiguration.create();
conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStoreWithCustomCompactor.class.getName());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(flushSize));
// Set the lower threshold to invoke the "MERGE" policy
conf.set(MemStoreCompactor.COMPACTING_MEMSTORE_THRESHOLD_KEY, String.valueOf(0));
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
init(name.getMethodName(), conf, hcd);
byte[] value = Bytes.toBytes("thisisavarylargevalue");
MemstoreSize memStoreSize = new MemstoreSize();
long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
// older data whihc shouldn't be "seen" by client
store.add(createCell(qf1, ts, seqId, value), memStoreSize);
store.add(createCell(qf2, ts, seqId, value), memStoreSize);
store.add(createCell(qf3, ts, seqId, value), memStoreSize);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction.
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
store.add(createCell(qf1, ts + 1, seqId + 1, value), memStoreSize);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
//okay. Let the compaction be completed
MyMemStoreCompactor.START_COMPACTOR_LATCH.countDown();
CompactingMemStore mem = (CompactingMemStore) ((HStore)store).memstore;
while (mem.isMemStoreFlushingInMemory()) {
TimeUnit.SECONDS.sleep(1);
}
// This should invoke another in-memory flush.
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
store.add(createCell(qf1, ts + 2, seqId + 2, value), memStoreSize);
assertEquals(2, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
String.valueOf(TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE));
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}

private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook) throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
HColumnDescriptor hcd = new HColumnDescriptor(family);
Expand Down Expand Up @@ -1242,6 +1298,51 @@ private interface MyScannerHook {
void hook(MyStore store) throws IOException;
}

private static class MyMemStoreCompactor extends MemStoreCompactor {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);
public MyMemStoreCompactor(CompactingMemStore compactingMemStore, MemoryCompactionPolicy compactionPolicy) {
super(compactingMemStore, compactionPolicy);
}

@Override
public boolean start() throws IOException {
boolean isFirst = RUNNER_COUNT.getAndIncrement() == 0;
boolean rval = super.start();
if (isFirst) {
try {
START_COMPACTOR_LATCH.await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return rval;
}
}

public static class MyCompactingMemStoreWithCustomCompactor extends CompactingMemStore {
private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
public MyCompactingMemStoreWithCustomCompactor(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, c, store, regionServices, compactionPolicy);
}

@Override
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy) {
return new MyMemStoreCompactor(this, compactionPolicy);
}

@Override
protected boolean shouldFlushInMemory() {
boolean rval = super.shouldFlushInMemory();
if (rval) {
RUNNER_COUNT.incrementAndGet();
}
return rval;
}
}

public static class MyCompactingMemStore extends CompactingMemStore {
private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
private final CountDownLatch getScannerLatch = new CountDownLatch(1);
Expand Down

0 comments on commit 9c8c749

Please sign in to comment.