diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index cf65859356c0..2fa13031a8d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2794,7 +2794,8 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, long flushOpSeqId = writeEntry.getWriteNumber(); FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, - "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); + "Nothing to flush", + writeCanNotFlushMarkerToWAL(writeEntry, wal, writeFlushWalMarker)); mvcc.completeAndWait(writeEntry); // Set to null so we don't complete it again down in finally block. writeEntry = null; @@ -2975,17 +2976,33 @@ private boolean isAllFamilies(Collection families) { } /** - * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various - * reasons. Ignores exceptions from WAL. Returns whether the write succeeded. + * This method is only used when we flush but the memstore is empty,if writeFlushWalMarker is + * true,we write the {@link FlushAction#CANNOT_FLUSH} flush marker to WAL when the memstore is + * empty. Ignores exceptions from WAL. Returns whether the write succeeded. * @return whether WAL write was successful */ - private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) { + private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WAL wal, + boolean writeFlushWalMarker) { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, getRegionInfo(), + -1, new TreeMap<>(Bytes.BYTES_COMPARATOR)); + RegionReplicationSink sink = regionReplicationSink.orElse(null); + + if (sink != null && !writeFlushWalMarker) { + /** + * Here for replication to secondary region replica could use {@link FlushAction#CANNOT_FLUSH} + * to recover writeFlushWalMarker is false, we create {@link WALEdit} for + * {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the + * flushOpSeqIdMVCCEntry,see HBASE-26960 for more details. + */ + this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, + desc, sink); + return false; + } + if (writeFlushWalMarker && wal != null && !writestate.readOnly) { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, - getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR)); try { WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc, - regionReplicationSink.orElse(null)); + sink); return true; } catch (IOException e) { LOG.warn(getRegionInfo().getEncodedName() + " : " + @@ -2995,6 +3012,24 @@ private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarke return false; } + /** + * Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add} + * to the flushOpSeqIdMVCCEntry. + */ + private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry, + FlushDescriptor desc, RegionReplicationSink sink) { + assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent(); + WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc); + WALKeyImpl walKey = + WALUtil.createWALKey(getRegionInfo(), mvcc, this.getReplicationScope(), null); + walKey.setWriteEntry(flushOpSeqIdMVCCEntry); + /** + * Here the {@link ServerCall} is null for {@link RegionReplicationSink#add} because the + * flushMarkerWALEdit is created by ourselves, not from rpc. + */ + flushOpSeqIdMVCCEntry.attachCompletionAction(() -> sink.add(walKey, flushMarkerWALEdit, null)); + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", justification="Intentional; notify is about completed flush") FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java index 91ae03d461e4..1832feb2cb08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java @@ -303,6 +303,10 @@ private void runCompletionAction() { completionAction.ifPresent(Runnable::run); } + public Optional getCompletionAction() { + return completionAction; + } + public long getWriteNumber() { return this.writeNumber; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 2076dd4fb35b..d0011d501526 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -158,8 +158,7 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, final MultiVersionConcurrencyControl mvcc, final Map extendedAttributes, final boolean sync, final RegionReplicationSink sink) throws IOException { // TODO: Pass in current time to use? - WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), - EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes); + WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes); long trx = MultiVersionConcurrencyControl.NONE; try { trx = wal.appendMarker(hri, walKey, edit); @@ -182,6 +181,13 @@ private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal, return walKey; } + public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc, + final NavigableMap replicationScope, + final Map extendedAttributes) { + return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), + EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes); + } + /** * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForFlushMarker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForFlushMarker.java new file mode 100644 index 000000000000..f3033df5496c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationForFlushMarker.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.regionreplication; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.StartTestingClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.MemStoreFlusher; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; + +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestRegionReplicationForFlushMarker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionReplicationForFlushMarker.class); + + private static final byte[] FAMILY = Bytes.toBytes("family_test"); + + private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); + + private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); + private static final int NB_SERVERS = 2; + + private static TableName tableName = TableName.valueOf("TestRegionReplicationForFlushMarker"); + private static volatile boolean startTest = false; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = HTU.getConfiguration(); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); + conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1); + conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); + conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); + conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); + conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); + conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); + conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3); + HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class) + .numRegionServers(NB_SERVERS).build()); + + } + + @AfterClass + public static void tearDown() throws Exception { + HTU.shutdownMiniCluster(); + } + + /** + * This test is for HBASE-26960, before HBASE-26960, {@link MemStoreFlusher} does not write the + * {@link FlushAction#CANNOT_FLUSH} marker to the WAL when the memstore is empty,so if the + * {@link RegionReplicationSink} request a flush when the memstore is empty, it could not receive + * the {@link FlushAction#CANNOT_FLUSH} and the replication may be hanged. After HBASE-26768,when + * the {@link RegionReplicationSink} request a flush when the memstore is empty,even it does not + * writes the {@link FlushAction#CANNOT_FLUSH} marker to the WAL,we also replicate the + * {@link FlushAction#CANNOT_FLUSH} marker to the secondary region replica. + */ + @Test + public void testCannotFlushMarker() throws Exception { + final HRegionForTest[] regions = this.createTable(); + RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); + assertTrue(regionReplicationSink != null); + + String oldThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); + try { + + byte[] rowKey1 = Bytes.toBytes(1); + startTest = true; + /** + * Write First cell,replicating to secondary replica is error,and then + * {@link RegionReplicationSink} request flush,after {@link RegionReplicationSink} receiving + * the {@link FlushAction#START_FLUSH},the {@link RegionReplicationSink#failedReplicas} is + * cleared,but replicating {@link FlushAction#START_FLUSH} is failed again,so + * {@link RegionReplicationSink} request flush once more, but now memstore is empty,so the + * {@link MemStoreFlusher} just write a {@link FlushAction#CANNOT_FLUSH} marker to the WAL. + */ + regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + /** + * Wait for the {@link FlushAction#CANNOT_FLUSH} is written and initiating replication + */ + regions[0].cyclicBarrier.await(); + assertTrue(regions[0].prepareFlushCounter.get() == 2); + /** + * The {@link RegionReplicationSink#failedReplicas} is cleared by the + * {@link FlushAction#CANNOT_FLUSH} marker. + */ + assertTrue(regionReplicationSink.getFailedReplicas().isEmpty()); + } finally { + startTest = false; + Thread.currentThread().setName(oldThreadName); + } + } + + private HRegionForTest[] createTable() throws Exception { + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) + .build(); + HTU.getAdmin().createTable(tableDescriptor); + final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; + for (int i = 0; i < NB_SERVERS; i++) { + HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); + List onlineRegions = rs.getRegions(tableName); + for (HRegion region : onlineRegions) { + int replicaId = region.getRegionInfo().getReplicaId(); + assertTrue(regions[replicaId] == null); + regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; + } + } + for (Region region : regions) { + assertNotNull(region); + } + return regions; + } + + public static final class HRegionForTest extends HRegion { + static final String USER_THREAD_NAME = "TestRegionReplicationForFlushMarker"; + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + final AtomicInteger prepareFlushCounter = new AtomicInteger(0); + + public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, + TableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + @SuppressWarnings("deprecation") + public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { + this.regionReplicationSink = Optional.of(regionReplicationSink); + } + + @Override + protected void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { + // not write the region open marker to interrupt the test. + } + + @Override + protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, + Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + if (!startTest) { + return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, + writeFlushWalMarker, tracker); + } + + if (this.getRegionInfo().getReplicaId() != 0) { + return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, + writeFlushWalMarker, tracker); + } + + try { + PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, + status, writeFlushWalMarker, tracker); + this.prepareFlushCounter.incrementAndGet(); + /** + * First flush is {@link FlushAction#START_FLUSH} marker and the second flush is + * {@link FlushAction#CANNOT_FLUSH} marker because the memstore is empty. + */ + if (this.prepareFlushCounter.get() == 2 + && result.getResult() != null + && result.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + + cyclicBarrier.await(); + } + return result; + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + + } + } + + public static final class ErrorReplayRSRpcServices extends RSRpcServices { + private static final AtomicInteger callCounter = new AtomicInteger(0); + + public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, + ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException { + + if (!startTest) { + return super.replicateToReplica(rpcController, replicateWALEntryRequest); + } + + List entries = replicateWALEntryRequest.getEntryList(); + if (CollectionUtils.isEmpty(entries)) { + return ReplicateWALEntryResponse.getDefaultInstance(); + } + ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + + HRegion region; + try { + region = server.getRegionByEncodedName(regionName.toStringUtf8()); + } catch (NotServingRegionException e) { + throw new ServiceException(e); + } + + if (!region.getRegionInfo().getTable().equals(tableName) + || region.getRegionInfo().getReplicaId() != 1) { + return super.replicateToReplica(rpcController, replicateWALEntryRequest); + } + + /** + * Simulate the first cell write and {@link FlushAction#START_FLUSH} marker replicating error. + */ + int count = callCounter.incrementAndGet(); + if (count > 2) { + return super.replicateToReplica(rpcController, replicateWALEntryRequest); + } + throw new ServiceException(new DoNotRetryIOException("Inject error!")); + } + } + + public static final class RSForTest + extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { + + public RSForTest(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new ErrorReplayRSRpcServices(this); + } + } + +}