Skip to content

Commit

Permalink
HBASE-26768 Avoid unnecessary replication suspending in RegionReplica… (
Browse files Browse the repository at this point in the history
  • Loading branch information
comnetwork authored Mar 10, 2022
1 parent 1dd29db commit 1057da8
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Result getResult() {
}

/** A result object from prepare flush cache stage */
static class PrepareFlushResult {
protected static class PrepareFlushResult {
final FlushResultImpl result; // indicating a failure result from prepare
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
final TreeMap<byte[], List<Path>> committedFiles;
Expand Down Expand Up @@ -729,7 +729,7 @@ void sawNoSuchFamily() {

private final StoreHotnessProtector storeHotnessProtector;

private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
protected Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();

/**
* HRegion constructor. This constructor should only be used for testing and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver.regionreplication;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -204,7 +204,7 @@ public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescri
this.failedReplicas = new IntHashSet(regionReplication - 1);
}

private void onComplete(List<SinkEntry> sent,
void onComplete(List<SinkEntry> sent,
Map<Integer, MutableObject<Throwable>> replica2Error) {
long maxSequenceId = Long.MIN_VALUE;
long toReleaseSize = 0;
Expand All @@ -214,31 +214,32 @@ private void onComplete(List<SinkEntry> sent,
toReleaseSize += entry.size;
}
manager.decrease(toReleaseSize);
Set<Integer> failed = new HashSet<>();
for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
Integer replicaId = entry.getKey();
Throwable error = entry.getValue().getValue();
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is greater than the last flush SN {}," +
" we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId);
} else {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
" we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
}
synchronized (entries) {
pendingSize -= toReleaseSize;
if (!failed.isEmpty()) {
failedReplicas.addAll(failed);
boolean addFailedReplicas = false;
for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
Integer replicaId = entry.getKey();
Throwable error = entry.getValue().getValue();
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence"
+ " id of sunk entris is {}, which is greater than the last flush SN {},"
+ " we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failedReplicas.add(replicaId);
addFailedReplicas = true;
} else {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence"
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+ " we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
}

if (addFailedReplicas) {
flushRequester.requestFlush(maxSequenceId);
}
sending = false;
Expand Down Expand Up @@ -323,7 +324,7 @@ private boolean isStartFlushAllStores(FlushDescriptor flushDesc) {
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
}

private Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
return Optional.empty();
}
Expand Down Expand Up @@ -448,4 +449,12 @@ public void waitUntilStopped() throws InterruptedException {
}
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
IntHashSet getFailedReplicas() {
synchronized (entries) {
return this.failedReplicas;
}
}
}
Loading

0 comments on commit 1057da8

Please sign in to comment.