Skip to content

Commit

Permalink
Add a randomly picked region info with replication marker edits
Browse files Browse the repository at this point in the history
  • Loading branch information
shahrs87 committed Oct 10, 2022
1 parent ebcba5d commit d497720
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.WALObserver;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.wal.NoRegionWALEdit;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
Expand Down Expand Up @@ -145,10 +144,6 @@ public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdi
if (this.coprocEnvironments.isEmpty()) {
return;
}
// Skip running this hook if edit is not bound to any region.
if (logEdit instanceof NoRegionWALEdit) {
return;
}
execOperation(new WALObserverOperation() {
@Override
public void call(WALObserver oserver) throws IOException {
Expand All @@ -159,10 +154,6 @@ public void call(WALObserver oserver) throws IOException {

public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
// Skip running this hook if edit is not bound to any region.
if (logEdit instanceof NoRegionWALEdit) {
return;
}
execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {
@Override
protected void call(WALObserver observer) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.NoRegionWALEdit;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand Down Expand Up @@ -248,6 +247,6 @@ public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrenc
NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
writeMarker(wal, replicationScope, regionInfo,
NoRegionWALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null);
WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
Expand Down Expand Up @@ -60,6 +63,7 @@ public class ReplicationMarkerChore extends ScheduledChore {
private final Configuration conf;
private final RegionServerServices rsServices;
private WAL wal;
Random random = new Random();

public static final String REPLICATION_MARKER_ENABLED_KEY =
"hbase.regionserver.replication.marker.enabled";
Expand Down Expand Up @@ -95,8 +99,18 @@ protected void chore() {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating replication marker edit.");
}

// This creates a new ArrayList of all the online regions for every call.
List<HRegion> regions = rsServices.getRegions();

if (regions.isEmpty()) {
LOG.info("There are no online regions for this server, so skipping adding replication marker"
+ " rows for this regionserver");
return;
}
HRegion region = regions.get(random.nextInt(regions.size()));
try {
WALUtil.writeReplicationMarkerAndSync(wal, MVCC, REGION_INFO, rowKey, timeStamp);
WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp);
} catch (IOException ioe) {
LOG.error("Exception while sync'ing replication tracker edit", ioe);
// TODO: Should we stop region server or add a metric and keep going.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
Expand Down Expand Up @@ -243,6 +244,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
if (put == null) {
continue;
}
table = REPLICATION_SINK_TRACKER_TABLE_NAME;
List<UUID> clusterIds = new ArrayList<>();
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -143,11 +144,13 @@ public void run() {
}
batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
if (batch == null) {
LOG.info("RSS batch null");
// got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
continue;
}
LOG.info("RSS batch not null");
// if we have already switched a file, skip reading and put it directly to the ship queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
Expand Down Expand Up @@ -347,6 +350,10 @@ private WALEntryBatch createBatch(WALEntryStream entryStream) {
}

protected final Entry filterEntry(Entry entry) {
// Always replicate if this edit is Replication Marker edit.
if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
return entry;
}
Entry filtered = filter.filter(entry);
if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
LOG.trace("Filtered entry for replication: {}", entry);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,18 @@ private WALEdit addCell(Cell cell) {
return this;
}

/**
* Creates a replication tracker edit with {@link #METAFAMILY} family and
* {@link #REPLICATION_MARKER} qualifier and has null value.
* @param rowKey rowkey
* @param timestamp timestamp
*/
public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) {
KeyValue kv =
new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put);
return new WALEdit().add(kv);
}

/**
* Checks whether this edit is a replication marker edit.
* @param edit edit
Expand All @@ -506,8 +518,4 @@ public static boolean isReplicationMarkerEdit(WALEdit edit) {
return edit.getCells().size() == 1
&& CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER);
}

public boolean isEditBoundToRegion() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.NoRegionWALEdit;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -44,15 +43,15 @@ public class TestWALEdit {

/**
* Tests that
* {@link org.apache.hadoop.hbase.wal.NoRegionWALEdit#createReplicationMarkerEdit(byte[], long)}
* {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)}
* method is creating WALEdit with correct family and qualifier.
*/
@Test
public void testCreateReplicationMarkerEdit() {
long timestamp = EnvironmentEdgeManager.currentTime();

byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp);
WALEdit edit = NoRegionWALEdit.createReplicationMarkerEdit(rowkey, timestamp);
WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp);
assertEquals(1, edit.getCells().size());
Cell cell = edit.getCells().get(0);
assertTrue(CellUtil.matchingFamily(cell, METAFAMILY));
Expand Down

0 comments on commit d497720

Please sign in to comment.