diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index c4bbe67f3220..38051ff475b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -341,13 +341,31 @@ private void syncCompleted(AsyncWriter writer, long processedTxid, long startTim requestLogRoll(); } + // find all the sync futures between these two txids to see if we need to issue a hsync, if no + // sync futures then just use the default one. + private boolean isHsync(long beginTxid, long endTxid) { + SortedSet futures = + syncFutures.subSet(new SyncFuture().reset(beginTxid), new SyncFuture().reset(endTxid + 1)); + if (futures.isEmpty()) { + return useHsync; + } + for (SyncFuture future : futures) { + if (future.isForceSync()) { + return true; + } + } + return false; + } + private void sync(AsyncWriter writer) { fileLengthAtLastSync = writer.getLength(); long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; + boolean shouldUseHsync = + isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid); highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; final long startTimeNs = System.nanoTime(); final long epoch = (long) epochAndState >>> 2L; - addListener(writer.sync(useHsync), (result, error) -> { + addListener(writer.sync(shouldUseHsync), (result, error) -> { if (error != null) { syncFailed(epoch, error); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index d63af0bfdcae..407edc1910bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -574,7 +574,7 @@ public void run() { Throwable lastException = null; try { TraceUtil.addTimelineAnnotation("syncing writer"); - writer.sync(useHsync); + writer.sync(takeSyncFuture.isForceSync()); TraceUtil.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java index 04996c0ff30d..f9dee0729a7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -72,11 +74,19 @@ protected void resetSyncFlag(CustomAsyncFSWAL wal) { protected Boolean getSyncFlag(CustomAsyncFSWAL wal) { return wal.getSyncFlag(); } + + @Override + protected Boolean getWriterSyncFlag(CustomAsyncFSWAL wal) { + return wal.getWriterSyncFlag(); + } } class CustomAsyncFSWAL extends AsyncFSWAL { + private Boolean syncFlag; + private Boolean writerSyncFlag; + public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuration conf, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { @@ -84,6 +94,34 @@ public CustomAsyncFSWAL(FileSystem fs, Path rootDir, String logDir, Configuratio eventLoopGroup, channelClass); } + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + AsyncWriter writer = super.createWriterInstance(path); + return new AsyncWriter() { + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public long getLength() { + return writer.getLength(); + } + + @Override + public CompletableFuture sync(boolean forceSync) { + writerSyncFlag = forceSync; + return writer.sync(forceSync); + } + + @Override + public void append(Entry entry) { + writer.append(entry); + } + }; + } + @Override public void sync(boolean forceSync) throws IOException { syncFlag = forceSync; @@ -98,9 +136,14 @@ public void sync(long txid, boolean forceSync) throws IOException { void resetSyncFlag() { this.syncFlag = null; + this.writerSyncFlag = null; } Boolean getSyncFlag() { return syncFlag; } + + Boolean getWriterSyncFlag() { + return writerSyncFlag; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java index e7f73d0c6d9f..9c460588fdbc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -51,16 +52,51 @@ protected void resetSyncFlag(CustomFSHLog wal) { protected Boolean getSyncFlag(CustomFSHLog wal) { return wal.getSyncFlag(); } + + @Override + protected Boolean getWriterSyncFlag(CustomFSHLog wal) { + return wal.getWriterSyncFlag(); + } } class CustomFSHLog extends FSHLog { private Boolean syncFlag; + private Boolean writerSyncFlag; + public CustomFSHLog(FileSystem fs, Path root, String logDir, Configuration conf) throws IOException { super(fs, root, logDir, conf); } + @Override + protected Writer createWriterInstance(Path path) throws IOException { + Writer writer = super.createWriterInstance(path); + return new Writer() { + + @Override + public void close() throws IOException { + writer.close(); + } + + @Override + public long getLength() { + return writer.getLength(); + } + + @Override + public void sync(boolean forceSync) throws IOException { + writerSyncFlag = forceSync; + writer.sync(forceSync); + } + + @Override + public void append(Entry entry) throws IOException { + writer.append(entry); + } + }; + } + @Override public void sync(boolean forceSync) throws IOException { syncFlag = forceSync; @@ -75,9 +111,14 @@ public void sync(long txid, boolean forceSync) throws IOException { void resetSyncFlag() { this.syncFlag = null; + this.writerSyncFlag = null; } Boolean getSyncFlag() { return syncFlag; } + + Boolean getWriterSyncFlag() { + return writerSyncFlag; + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java index bc0255b3ea42..f100b06904dc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WALDurabilityTestBase.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -77,25 +76,39 @@ protected abstract T getWAL(FileSystem fs, Path root, String logDir, Configurati protected abstract Boolean getSyncFlag(T wal); + protected abstract Boolean getWriterSyncFlag(T wal); + @Test public void testWALDurability() throws IOException { + byte[] bytes = Bytes.toBytes(getName()); + Put put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + // global hbase.wal.hsync false, no override in put call - hflush conf.set(HRegion.WAL_HSYNC_CONF_KEY, "false"); FileSystem fs = FileSystem.get(conf); Path rootDir = new Path(dir + getName()); T wal = getWAL(fs, rootDir, getName(), conf); HRegion region = initHRegion(tableName, null, null, wal); - byte[] bytes = Bytes.toBytes(getName()); - Put put = new Put(bytes); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertFalse(getSyncFlag(wal)); - - region.close(); - wal.close(); + try { + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertFalse(getSyncFlag(wal)); + assertFalse(getWriterSyncFlag(wal)); + + // global hbase.wal.hsync false, durability set in put call - fsync + put.setDurability(Durability.FSYNC_WAL); + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertTrue(getSyncFlag(wal)); + assertTrue(getWriterSyncFlag(wal)); + } finally { + HBaseTestingUtility.closeRegionAndWAL(region); + } // global hbase.wal.hsync true, no override in put call conf.set(HRegion.WAL_HSYNC_CONF_KEY, "true"); @@ -103,28 +116,36 @@ public void testWALDurability() throws IOException { wal = getWAL(fs, rootDir, getName(), conf); region = initHRegion(tableName, null, null, wal); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertEquals(getSyncFlag(wal), true); - - // global hbase.wal.hsync true, durability set in put call - fsync - put.setDurability(Durability.FSYNC_WAL); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertTrue(getSyncFlag(wal)); - - // global hbase.wal.hsync true, durability set in put call - sync - put = new Put(bytes); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - put.setDurability(Durability.SYNC_WAL); - resetSyncFlag(wal); - assertNull(getSyncFlag(wal)); - region.put(put); - assertFalse(getSyncFlag(wal)); - - HBaseTestingUtility.closeRegionAndWAL(region); + try { + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertTrue(getSyncFlag(wal)); + assertTrue(getWriterSyncFlag(wal)); + + // global hbase.wal.hsync true, durability set in put call - fsync + put.setDurability(Durability.FSYNC_WAL); + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertTrue(getSyncFlag(wal)); + assertTrue(getWriterSyncFlag(wal)); + + // global hbase.wal.hsync true, durability set in put call - sync + put = new Put(bytes); + put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); + put.setDurability(Durability.SYNC_WAL); + resetSyncFlag(wal); + assertNull(getSyncFlag(wal)); + assertNull(getWriterSyncFlag(wal)); + region.put(put); + assertFalse(getSyncFlag(wal)); + assertFalse(getWriterSyncFlag(wal)); + } finally { + HBaseTestingUtility.closeRegionAndWAL(region); + } } private String getName() {