Skip to content

Commit

Permalink
HBASE-23681 Add UT for procedure store region flusher (#1024)
Browse files Browse the repository at this point in the history
Signed-off-by: stack <[email protected]>
  • Loading branch information
Apache9 authored Jan 12, 2020
1 parent 0a9e1f8 commit 4ad12e0
Showing 1 changed file with 150 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreFlush {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreFlush.class);

private Configuration conf;

private HRegion region;

private RegionFlusherAndCompactor flusher;

private AtomicInteger flushCalled;

private AtomicLong memstoreHeapSize;

private AtomicLong memstoreOffHeapSize;

@Before
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
region = mock(HRegion.class);
HStore store = mock(HStore.class);
when(store.getStorefilesCount()).thenReturn(1);
when(region.getStores()).thenReturn(Collections.singletonList(store));
flushCalled = new AtomicInteger(0);
memstoreHeapSize = new AtomicLong(0);
memstoreOffHeapSize = new AtomicLong(0);
when(region.getMemStoreHeapSize()).thenAnswer(invocation -> memstoreHeapSize.get());
when(region.getMemStoreOffHeapSize()).thenAnswer(invocation -> memstoreOffHeapSize.get());
when(region.flush(anyBoolean())).thenAnswer(invocation -> {
assertTrue(invocation.getArgument(0));
memstoreHeapSize.set(0);
memstoreOffHeapSize.set(0);
flushCalled.incrementAndGet();
return null;
});
}

@After
public void tearDown() {
if (flusher != null) {
flusher.close();
flusher = null;
}
}

private void initFlusher() {
flusher = new RegionFlusherAndCompactor(conf, new Abortable() {

@Override
public boolean isAborted() {
return false;
}

@Override
public void abort(String why, Throwable e) {
}
}, region);
}

@Test
public void testTriggerFlushBySize() throws IOException, InterruptedException {
conf.setLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY, 1024 * 1024);
initFlusher();
memstoreHeapSize.set(1000 * 1024);
flusher.onUpdate();
Thread.sleep(1000);
assertEquals(0, flushCalled.get());
memstoreOffHeapSize.set(1000 * 1024);
flusher.onUpdate();
Waiter.waitFor(conf, 2000, () -> flushCalled.get() == 1);
}

private void assertTriggerFlushByChanges(int changes) throws InterruptedException {
int currentFlushCalled = flushCalled.get();
for (int i = 0; i < changes; i++) {
flusher.onUpdate();
}
Thread.sleep(1000);
assertEquals(currentFlushCalled, flushCalled.get());
flusher.onUpdate();
Waiter.waitFor(conf, 5000, () -> flushCalled.get() == currentFlushCalled + 1);
}

@Test
public void testTriggerFlushByChanges() throws InterruptedException {
conf.setLong(RegionFlusherAndCompactor.FLUSH_PER_CHANGES_KEY, 10);
initFlusher();
assertTriggerFlushByChanges(10);
assertTriggerFlushByChanges(10);
}

@Test
public void testPeriodicalFlush() throws InterruptedException {
conf.setLong(RegionFlusherAndCompactor.FLUSH_INTERVAL_MS_KEY, 1000);
initFlusher();
assertEquals(0, flushCalled.get());
Thread.sleep(1500);
assertEquals(1, flushCalled.get());
Thread.sleep(1000);
assertEquals(2, flushCalled.get());

}
}

0 comments on commit 4ad12e0

Please sign in to comment.