Skip to content

Commit

Permalink
HBASE-24037 Add ut for root dir and wal root dir are different (#1336)
Browse files Browse the repository at this point in the history
Signed-off-by: stack <[email protected]>
  • Loading branch information
infraio committed Mar 25, 2020
1 parent 8521207 commit 41baf71
Showing 1 changed file with 62 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class TestWALSplitToHFile {
private WALFactory wals;

private static final byte[] ROW = Bytes.toBytes("row");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final byte[] VALUE1 = Bytes.toBytes("value1");
private static final byte[] VALUE2 = Bytes.toBytes("value2");
private static final int countPerFamily = 10;
Expand Down Expand Up @@ -178,6 +179,12 @@ private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws
return wal;
}

private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException {
FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf);
wal.init();
return wal;
}

private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final TableDescriptor td = createBasic3FamilyTD(tableName);
Expand All @@ -190,29 +197,62 @@ private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOExcepti
return new Pair<>(td, ri);
}

private void writeData(TableDescriptor td, HRegion region) throws IOException {
final long timestamp = this.ee.currentTime();
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
}
}

@Test
public void testCorruptRecoveredHFile() throws Exception {
public void testDifferentRootDirAndWALRootDir() throws Exception {
// Change wal root dir and reset the configuration
Path walRootDir = UTIL.createWALRootDir();
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());

FileSystem walFs = FSUtils.getWALFileSystem(this.conf);
this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String serverName =
ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
.toString();
this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
this.logDir = new Path(walRootDir, logName);
this.wals = new WALFactory(conf, TEST_NAME.getMethodName());

Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
TableDescriptor td = pair.getFirst();
RegionInfo ri = pair.getSecond();

WAL wal = createWAL(this.conf, rootDir, logName);
WAL wal = createWAL(walFs, walRootDir, logName);
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
final long timestamp = this.ee.currentTime();
// Write data and flush
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
}
region.flush(true);
writeData(td, region);

// Now assert edits made it in.
Result result1 = region.get(new Get(ROW));
assertEquals(td.getColumnFamilies().length, result1.size());
// Now close the region without flush
region.close(true);
wal.shutdown();
// split the log
WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);

WAL wal2 = createWAL(walFs, walRootDir, logName);
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
Result result2 = region2.get(new Get(ROW));
assertEquals(td.getColumnFamilies().length, result2.size());
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
}
}

// Now close the region
@Test
public void testCorruptRecoveredHFile() throws Exception {
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
TableDescriptor td = pair.getFirst();
RegionInfo ri = pair.getSecond();

WAL wal = createWAL(this.conf, rootDir, logName);
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
writeData(td, region);

// Now close the region without flush
region.close(true);
wal.shutdown();
// split the log
Expand Down Expand Up @@ -244,7 +284,7 @@ public void testCorruptRecoveredHFile() throws Exception {
Result result2 = region2.get(new Get(ROW));
assertEquals(td.getColumnFamilies().length, result2.size());
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
// Assert the corrupt file was skipped and still exist
FileStatus[] files =
WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
Expand All @@ -265,22 +305,15 @@ public void testPutWithSameTimestamp() throws Exception {
final long timestamp = this.ee.currentTime();
// Write data and flush
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
}
region.flush(true);

// Now assert edits made it in.
Result result1 = region.get(new Get(ROW));
assertEquals(td.getColumnFamilies().length, result1.size());
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
}

// Write data with same timestamp and do not flush
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE2));
region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2));
}
// Now close the region (without flush)
// Now close the region without flush
region.close(true);
wal.shutdown();
// split the log
Expand All @@ -292,7 +325,7 @@ public void testPutWithSameTimestamp() throws Exception {
Result result2 = region2.get(new Get(ROW));
assertEquals(td.getColumnFamilies().length, result2.size());
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER)));
}
}

Expand All @@ -308,17 +341,17 @@ public void testRecoverSequenceId() throws Exception {
// Write data and do not flush
for (int i = 0; i < countPerFamily; i++) {
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), Bytes.toBytes("x"), VALUE1));
region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1));
Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x"))));
assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
List<Cell> cells = result.listCells();
assertEquals(1, cells.size());
seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
cells.get(0).getSequenceId());
}
}

// Now close the region (without flush)
// Now close the region without flush
region.close(true);
wal.shutdown();
// split the log
Expand All @@ -331,7 +364,7 @@ public void testRecoverSequenceId() throws Exception {
for (int i = 0; i < countPerFamily; i++) {
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x"))));
assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
List<Cell> cells = result.listCells();
assertEquals(1, cells.size());
assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()),
Expand Down

0 comments on commit 41baf71

Please sign in to comment.