diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 3ac2d1fbc3bf..c594122c29b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.WALObserver; import org.apache.hadoop.hbase.metrics.MetricRegistry; -import org.apache.hadoop.hbase.wal.NoRegionWALEdit; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -145,10 +144,6 @@ public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdi if (this.coprocEnvironments.isEmpty()) { return; } - // Skip running this hook if edit is not bound to any region. - if (logEdit instanceof NoRegionWALEdit) { - return; - } execOperation(new WALObserverOperation() { @Override public void call(WALObserver oserver) throws IOException { @@ -159,10 +154,6 @@ public void call(WALObserver oserver) throws IOException { public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { - // Skip running this hook if edit is not bound to any region. - if (logEdit instanceof NoRegionWALEdit) { - return; - } execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index e78e7eaf806c..d9c9a10a163b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.wal.NoRegionWALEdit; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -248,6 +247,6 @@ public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrenc NavigableMap replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); writeMarker(wal, replicationScope, regionInfo, - NoRegionWALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null); + WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java index 9c96bc780b40..a646baf689fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java @@ -20,11 +20,14 @@ import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; import java.io.IOException; +import java.util.List; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; @@ -60,6 +63,7 @@ public class ReplicationMarkerChore extends ScheduledChore { private final Configuration conf; private final RegionServerServices rsServices; private WAL wal; + Random random = new Random(); public static final String REPLICATION_MARKER_ENABLED_KEY = "hbase.regionserver.replication.marker.enabled"; @@ -95,8 +99,18 @@ protected void chore() { if (LOG.isTraceEnabled()) { LOG.trace("Creating replication marker edit."); } + + // This creates a new ArrayList of all the online regions for every call. + List regions = rsServices.getRegions(); + + if (regions.isEmpty()) { + LOG.info("There are no online regions for this server, so skipping adding replication marker" + + " rows for this regionserver"); + return; + } + HRegion region = regions.get(random.nextInt(regions.size())); try { - WALUtil.writeReplicationMarkerAndSync(wal, MVCC, REGION_INFO, rowKey, timeStamp); + WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp); } catch (IOException ioe) { LOG.error("Exception while sync'ing replication tracker edit", ioe); // TODO: Should we stop region server or add a metric and keep going. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 54c582e86e56..baac2dcb3a05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; @@ -243,6 +244,7 @@ public void replicateEntries(List entries, final CellScanner cells, if (put == null) { continue; } + table = REPLICATION_SINK_TRACKER_TABLE_NAME; List clusterIds = new ArrayList<>(); for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { clusterIds.add(toUUID(clusterId)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 49c002845b4a..2f53c4026dbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -143,11 +144,13 @@ public void run() { } batch = tryAdvanceStreamAndCreateWALBatch(entryStream); if (batch == null) { + LOG.info("RSS batch null"); // got no entries and didn't advance position in WAL handleEmptyWALEntryBatch(); entryStream.reset(); // reuse stream continue; } + LOG.info("RSS batch not null"); // if we have already switched a file, skip reading and put it directly to the ship queue if (!batch.isEndOfFile()) { readWALEntries(entryStream, batch); @@ -347,6 +350,10 @@ private WALEntryBatch createBatch(WALEntryStream entryStream) { } protected final Entry filterEntry(Entry entry) { + // Always replicate if this edit is Replication Marker edit. + if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) { + return entry; + } Entry filtered = filter.filter(entry); if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { LOG.trace("Filtered entry for replication: {}", entry); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NoRegionWALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NoRegionWALEdit.java deleted file mode 100644 index 1f1d3beeedf0..000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/NoRegionWALEdit.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * This creates WALEdit which are not tied to any HRegion. We skip running co-processor methods - * {@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost#preWALWrite( RegionInfo, WALKey, WALEdit)} - * and @{@link org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost#postWALWrite( RegionInfo, WALKey, WALEdit)} - * for this edit. - */ -@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION, - HBaseInterfaceAudience.COPROC }) -public class NoRegionWALEdit extends WALEdit { - - public NoRegionWALEdit() { - super(); - } - - /** - * Creates a replication tracker edit with {@link #METAFAMILY} family and - * {@link #REPLICATION_MARKER} qualifier and has null value. - * @param rowKey rowkey - * @param timestamp timestamp - */ - public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) { - KeyValue kv = - new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put); - return new NoRegionWALEdit().add(kv); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index 658f64a823f7..0a68efe1d7b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -495,6 +495,18 @@ private WALEdit addCell(Cell cell) { return this; } + /** + * Creates a replication tracker edit with {@link #METAFAMILY} family and + * {@link #REPLICATION_MARKER} qualifier and has null value. + * @param rowKey rowkey + * @param timestamp timestamp + */ + public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) { + KeyValue kv = + new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put); + return new WALEdit().add(kv); + } + /** * Checks whether this edit is a replication marker edit. * @param edit edit @@ -506,8 +518,4 @@ public static boolean isReplicationMarkerEdit(WALEdit edit) { return edit.getCells().size() == 1 && CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER); } - - public boolean isEditBoundToRegion() { - return true; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java index a458cce0c786..33c6bfc18e1d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.wal.NoRegionWALEdit; import org.apache.hadoop.hbase.wal.WALEdit; import org.junit.ClassRule; import org.junit.Test; @@ -44,7 +43,7 @@ public class TestWALEdit { /** * Tests that - * {@link org.apache.hadoop.hbase.wal.NoRegionWALEdit#createReplicationMarkerEdit(byte[], long)} + * {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)} * method is creating WALEdit with correct family and qualifier. */ @Test @@ -52,7 +51,7 @@ public void testCreateReplicationMarkerEdit() { long timestamp = EnvironmentEdgeManager.currentTime(); byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp); - WALEdit edit = NoRegionWALEdit.createReplicationMarkerEdit(rowkey, timestamp); + WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp); assertEquals(1, edit.getCells().size()); Cell cell = edit.getCells().get(0); assertTrue(CellUtil.matchingFamily(cell, METAFAMILY));