diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java index 5ea6144d0376..e6db8f430173 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java @@ -21,12 +21,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -48,8 +46,6 @@ public class SlowLogTableAccessor { private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); - private static Connection connection; - /** * hbase:slowlog table name - can be enabled with config - * hbase.regionserver.slowlog.systable.enabled @@ -66,10 +62,10 @@ private static void doPut(final Connection connection, final List puts) thr /** * Add slow/large log records to hbase:slowlog table * @param slowLogPayloads List of SlowLogPayload to process - * @param configuration Configuration to use for connection + * @param connection connection */ public static void addSlowLogRecords(final List slowLogPayloads, - final Configuration configuration) { + Connection connection) { List puts = new ArrayList<>(slowLogPayloads.size()); for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { final byte[] rowKey = getRowKey(slowLogPayload); @@ -102,26 +98,12 @@ public static void addSlowLogRecords(final List slowL puts.add(put); } try { - if (connection == null) { - createConnection(configuration); - } doPut(connection, puts); } catch (Exception e) { LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); } } - private static synchronized void createConnection(Configuration configuration) - throws IOException { - Configuration conf = new Configuration(configuration); - // rpc timeout: 20s - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000); - // retry count: 5 - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - connection = ConnectionFactory.createConnection(conf); - } - /** * Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep * records with sorted order of time, however records added at the very same time could be in @@ -140,5 +122,4 @@ private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) final long rowKeyLong = Long.parseLong(timeAndHashcode); return Bytes.toBytes(rowKeyLong); } - } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 91861d34e927..609da6378528 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1643,6 +1643,14 @@ public enum OperationStatusCode { "hbase.regionserver.slowlog.systable.enabled"; public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false; + @Deprecated + // since and will be removed in + // Instead use hbase.regionserver.named.queue.chore.duration config property + public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY = + "hbase.slowlog.systable.chore.duration"; + // Default 10 mins. + public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000; + public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY = "hbase.shell.timestamp.format.epoch"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index c0212607d56a..25d4ebac53f3 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -2004,7 +2004,7 @@ possible configurations would overwhelm and obscure the important. hbase.namedqueue.provider.classes - org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService + org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService Default values for NamedQueueService implementors. This comma separated full class names represent all implementors of NamedQueueService that we would like to be invoked by diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java new file mode 100644 index 000000000000..8bd95aefe8e1 --- /dev/null +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface MetricsWALEventTrackerSource extends BaseSource { + /** + * The name of the metrics + */ + String METRICS_NAME = "WALEventTracker"; + + /** + * The name of the metrics context that metrics will be under. + */ + String METRICS_CONTEXT = "regionserver"; + + /** + * Description + */ + String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker"; + + /** + * The name of the metrics context that metrics will be under in jmx + */ + String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + String NUM_FAILED_PUTS = "numFailedPuts"; + String NUM_FAILED_PUTS_DESC = "Number of put requests that failed"; + + String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts"; + String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts"; + + /* + * Increment 2 counters, numFailedPuts and numRecordsFailedPuts + */ + void incrFailedPuts(long numRecords); + + /* + * Get the failed puts counter. + */ + long getFailedPuts(); + + /* + * Get the number of records in failed puts. + */ + long getNumRecordsFailedPuts(); +} diff --git a/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource new file mode 100644 index 000000000000..5870bf1a9cf6 --- /dev/null +++ b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java new file mode 100644 index 000000000000..0ae5b12c4d6a --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl + implements MetricsWALEventTrackerSource { + + private final MutableFastCounter numFailedPutsCount; + private final MutableFastCounter numRecordsFailedPutsCount; + + public MetricsWALEventTrackerSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + numFailedPutsCount = + this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L); + numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS, + NUM_RECORDS_FAILED_PUTS_DESC, 0L); + } + + @Override + public void incrFailedPuts(long numRecords) { + numFailedPutsCount.incr(); + numRecordsFailedPutsCount.incr(numRecords); + } + + @Override + public long getFailedPuts() { + return numFailedPutsCount.value(); + } + + @Override + public long getNumRecordsFailedPuts() { + return numRecordsFailedPutsCount.value(); + } +} diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index 48a108bb8a79..ba12dcf3edfc 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -182,3 +182,12 @@ message RegionEventDescriptor { */ message WALTrailer { } + +/** + * Special WAL entry for replication marker event. + */ +message ReplicationMarkerDescriptor { + required string region_server_name = 1; + required string wal_name = 2; + required uint64 offset = 3; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c49c0db06728..5a9d9fc1aef1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator; import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; import org.apache.hadoop.hbase.mob.MobFileCleanerChore; @@ -214,6 +215,7 @@ import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; +import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.SecurityConstants; @@ -1243,6 +1245,10 @@ private void finishActiveMasterInitialization(MonitoredTask status) final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this); slowLogMasterService.init(); + WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this); + // Create REPLICATION.SINK_TRACKER table if needed. + ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this); + // clear the dead servers with same host name and port of online server because we are not // removing dead server with same hostname and port of rs which is trying to check in before // master initialization. See HBASE-5916. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java new file mode 100644 index 000000000000..8ad1b93f77ec --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.waleventtracker; + +import static org.apache.hadoop.hbase.HConstants.NO_NONCE; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * WALEventTracker Table creation to be used by HMaster + */ +@InterfaceAudience.Private +public final class WALEventTrackerTableCreator { + private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class); + + public static final String WAL_EVENT_TRACKER_ENABLED_KEY = + "hbase.regionserver.wal.event.tracker.enabled"; + public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false; + + /** The walEventTracker info family as a string */ + private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info"; + + /** The walEventTracker info family in array of bytes */ + public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY = + Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR); + + private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds + + private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder + .newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME).setRegionReplication(1) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(WAL_EVENT_TRACKER_INFO_FAMILY) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1) + .setTimeToLive((int) TTL).build()); + + /* Private default constructor */ + private WALEventTrackerTableCreator() { + } + + /* + * We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and + * table doesn't exists already. + */ + public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) + throws IOException { + boolean walEventTrackerEnabled = + conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); + if (!walEventTrackerEnabled) { + LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR + + " is disabled. Quitting."); + return; + } + if ( + !masterServices.getTableDescriptors() + .exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME) + ) { + LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating."); + masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java index ed4b470d577f..2d6f5bf57348 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.yetus.audience.InterfaceAudience; @@ -70,7 +71,8 @@ class LogEventHandler implements EventHandler { namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz); + LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz, + e); } } } @@ -105,8 +107,8 @@ boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { * Add all in memory queue records to system table. The implementors can use system table or * direct HDFS file or ZK as persistence system. */ - void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { - namedQueueServices.get(namedQueueEvent).persistAll(); + void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) { + namedQueueServices.get(namedQueueEvent).persistAll(connection); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java index ba2eb3322d6e..39cc093b2aa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java @@ -29,7 +29,8 @@ public class NamedQueuePayload { public enum NamedQueueEvent { SLOW_LOG(0), BALANCE_DECISION(1), - BALANCE_REJECTION(2); + BALANCE_REJECTION(2), + WAL_EVENT_TRACKER(3); private final int value; @@ -48,6 +49,9 @@ public static NamedQueueEvent getEventByOrdinal(int value) { case 2: { return BALANCE_REJECTION; } + case 3: { + return WAL_EVENT_TRACKER; + } default: { throw new IllegalArgumentException( "NamedQueue event with ordinal " + value + " not defined"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java index efe512b1a856..e079cef97455 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java @@ -22,6 +22,7 @@ import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.util.Threads; @@ -60,7 +61,7 @@ private NamedQueueRecorder(Configuration conf) { // disruptor initialization with BlockingWaitStrategy this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount), - new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d") + new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d") .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), ProducerType.MULTI, new BlockingWaitStrategy()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); @@ -139,10 +140,9 @@ public void addRecord(NamedQueuePayload namedQueuePayload) { * Add all in memory queue records to system table. The implementors can use system table or * direct HDFS file or ZK as persistence system. */ - public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { + public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) { if (this.logEventHandler != null) { - this.logEventHandler.persistAll(namedQueueEvent); + this.logEventHandler.persistAll(namedQueueEvent, connection); } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java index 889323d9592d..e0504a3c495c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.namequeues; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.yetus.audience.InterfaceAudience; @@ -58,5 +59,5 @@ public interface NamedQueueService { * Add all in memory queue records to system table. The implementors can use system table or * direct HDFS file or ZK as persistence system. */ - void persistAll(); + void persistAll(Connection connection); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java similarity index 61% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java index 0de6c8769895..67974681252e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,11 +28,16 @@ * Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table */ @InterfaceAudience.Private -public class SlowLogTableOpsChore extends ScheduledChore { +public class NamedQueueServiceChore extends ScheduledChore { - private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class); + private static final Logger LOG = LoggerFactory.getLogger(NamedQueueServiceChore.class); + public static final String NAMED_QUEUE_CHORE_DURATION_KEY = + "hbase.regionserver.named.queue.chore.duration"; + // 10 mins default. + public static final int NAMED_QUEUE_CHORE_DURATION_DEFAULT = 10 * 60 * 1000; private final NamedQueueRecorder namedQueueRecorder; + private final Connection connection; /** * Chore Constructor @@ -41,21 +47,23 @@ public class SlowLogTableOpsChore extends ScheduledChore { * scheduled * @param namedQueueRecorder {@link NamedQueueRecorder} instance */ - public SlowLogTableOpsChore(final Stoppable stopper, final int period, - final NamedQueueRecorder namedQueueRecorder) { - super("SlowLogTableOpsChore", stopper, period); + public NamedQueueServiceChore(final Stoppable stopper, final int period, + final NamedQueueRecorder namedQueueRecorder, Connection connection) { + super("NamedQueueServiceChore", stopper, period); this.namedQueueRecorder = namedQueueRecorder; + this.connection = connection; } @Override protected void chore() { - if (LOG.isTraceEnabled()) { - LOG.trace("SlowLog Table Ops Chore is starting up."); - } - namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); - if (LOG.isTraceEnabled()) { - LOG.trace("SlowLog Table Ops Chore is closing."); + for (NamedQueuePayload.NamedQueueEvent event : NamedQueuePayload.NamedQueueEvent.values()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Starting chore for event %s", event.name())); + } + namedQueueRecorder.persistAll(event, connection); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Stopping chore for event %s", event.name())); + } } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java index 95c1ed53f52c..b4104e6008f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java @@ -22,6 +22,7 @@ import java.util.Queue; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -66,7 +67,7 @@ public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) { /** * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch */ - public void addAllLogsToSysTable() { + public void addAllLogsToSysTable(Connection connection) { if (queueForSysTable == null) { LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); return; @@ -82,13 +83,13 @@ public void addAllLogsToSysTable() { slowLogPayloads.add(queueForSysTable.poll()); i++; if (i == SYSTABLE_PUT_BATCH_SIZE) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection); slowLogPayloads.clear(); i = 0; } } if (slowLogPayloads.size() > 0) { - SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); + SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection); } } finally { LOCK.unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java new file mode 100644 index 000000000000..9f549a72e51a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class WALEventTrackerPayload extends NamedQueuePayload { + + private final String rsName; + private final String walName; + private final long timeStamp; + private final String state; + private final long walLength; + + public WALEventTrackerPayload(String rsName, String walName, long timeStamp, String state, + long walLength) { + super(NamedQueueEvent.WAL_EVENT_TRACKER.getValue()); + this.rsName = rsName; + this.walName = walName; + this.timeStamp = timeStamp; + this.state = state; + this.walLength = walLength; + } + + public String getRsName() { + return rsName; + } + + public String getWalName() { + return walName; + } + + public long getTimeStamp() { + return timeStamp; + } + + public String getState() { + return state; + } + + public long getWalLength() { + return walLength; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(this.getClass().getSimpleName()); + sb.append("["); + sb.append("rsName=").append(rsName); + sb.append(", walName=").append(walName); + sb.append(", timeStamp=").append(timeStamp); + sb.append(", walState=").append(state); + sb.append(", walLength=").append(walLength); + sb.append("]"); + return sb.toString(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java new file mode 100644 index 000000000000..ee57e23ab996 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; + +/* + This class provides the queue to save Wal events from backing RingBuffer. + */ +@InterfaceAudience.Private +public class WALEventTrackerQueueService implements NamedQueueService { + + private EvictingQueue queue; + private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE = + "hbase.regionserver.wal.event.tracker.ringbuffer.size"; + private final boolean walEventTrackerEnabled; + private int queueSize; + private MetricsWALEventTrackerSource source = null; + + private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class); + + public WALEventTrackerQueueService(Configuration conf) { + this(conf, null); + } + + public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) { + this.walEventTrackerEnabled = + conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); + if (!walEventTrackerEnabled) { + return; + } + + this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256); + queue = EvictingQueue.create(queueSize); + if (source == null) { + this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class); + } else { + this.source = source; + } + } + + @Override + public NamedQueuePayload.NamedQueueEvent getEvent() { + return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER; + } + + @Override + public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { + if (!walEventTrackerEnabled) { + return; + } + if (!(namedQueuePayload instanceof WALEventTrackerPayload)) { + LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type" + + " WALEventTrackerPayload."); + return; + } + + WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload; + if (LOG.isDebugEnabled()) { + LOG.debug("Adding wal event tracker payload " + payload); + } + addToQueue(payload); + } + + /* + * Made it default to use it in testing. + */ + synchronized void addToQueue(WALEventTrackerPayload payload) { + queue.add(payload); + } + + @Override + public boolean clearNamedQueue() { + if (!walEventTrackerEnabled) { + return false; + } + LOG.debug("Clearing wal event tracker queue"); + queue.clear(); + return true; + } + + @Override + public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { + return null; + } + + @Override + public void persistAll(Connection connection) { + if (!walEventTrackerEnabled) { + return; + } + if (queue.isEmpty()) { + LOG.debug("Wal Event tracker queue is empty."); + return; + } + + Queue queue = getWALEventTrackerList(); + try { + WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection); + } catch (Exception ioe) { + // If we fail to persist the records with retries then just forget about them. + // This is a best effort service. + LOG.error("Failed while persisting wal tracker records", ioe); + // Increment metrics for failed puts + source.incrFailedPuts(queue.size()); + } + } + + private synchronized Queue getWALEventTrackerList() { + Queue retQueue = new ArrayDeque<>(); + Iterator iterator = queue.iterator(); + while (iterator.hasNext()) { + retQueue.add(iterator.next()); + } + queue.clear(); + return retQueue; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java new file mode 100644 index 000000000000..46ac5e3f9145 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public final class WALEventTrackerTableAccessor { + public static final String RS_COLUMN = "region_server_name"; + public static final String WAL_NAME_COLUMN = "wal_name"; + public static final String TIMESTAMP_COLUMN = "timestamp"; + public static final String WAL_STATE_COLUMN = "wal_state"; + public static final String WAL_LENGTH_COLUMN = "wal_length"; + public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts"; + public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec"; + public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec"; + public static final int DEFAULT_MAX_ATTEMPTS = 3; + public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second + public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds + public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER"; + public static final String DELIMITER = "_"; + + private WALEventTrackerTableAccessor() { + } + + /** + * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config - + * hbase.regionserver.wal.event.tracker.enabled + */ + public static final TableName WAL_EVENT_TRACKER_TABLE_NAME = + TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR); + + private static void doPut(final Connection connection, final List puts) throws Exception { + RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create(); + while (true) { + try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) { + table.put(puts); + return; + } catch (IOException ioe) { + retryOrThrow(retryCounter, ioe); + } + retryCounter.sleepUntilNextRetry(); + } + } + + private static RetryCounterFactory getRetryFactory(Configuration conf) { + int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS); + long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL); + long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME); + RetryCounter.RetryConfig retryConfig = + new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs, + TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit()); + return new RetryCounterFactory(retryConfig); + } + + private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException { + if (retryCounter.shouldRetry()) { + return; + } + throw ioe; + } + + /** + * Add wal event tracker rows to hbase:waleventtracker table + * @param walEventPayloads List of walevents to process + * @param connection Connection to use. + */ + public static void addWalEventTrackerRows(Queue walEventPayloads, + final Connection connection) throws Exception { + List puts = new ArrayList<>(walEventPayloads.size()); + for (WALEventTrackerPayload payload : walEventPayloads) { + final byte[] rowKey = getRowKey(payload); + final Put put = new Put(rowKey); + // TODO Do we need to SKIP_WAL ? + put.setPriority(HConstants.NORMAL_QOS); + put + .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN), + Bytes.toBytes(payload.getRsName())) + .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN), + Bytes.toBytes(payload.getWalName())) + .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN), + Bytes.toBytes(payload.getTimeStamp())) + .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN), + Bytes.toBytes(payload.getState())) + .addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN), + Bytes.toBytes(payload.getWalLength())); + puts.add(put); + } + doPut(connection, puts); + } + + /** + * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS + * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was + * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a + * unique rowkey. + * @param payload payload to process + * @return rowKey byte[] + */ + public static byte[] getRowKey(final WALEventTrackerPayload payload) { + String walName = payload.getWalName(); + // converting to string since this will help seeing the timestamp in string format using + // hbase shell commands. + String timestampStr = String.valueOf(payload.getTimeStamp()); + String walState = payload.getState(); + final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState; + return Bytes.toBytes(rowKeyStr); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java index 45bfca112700..885e2d44279c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.BalancerDecision; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; @@ -141,7 +142,7 @@ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) } @Override - public void persistAll() { + public void persistAll(Connection connection) { // no-op for now } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java index 79b7325b305d..fb94db2b917d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.BalancerRejection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; @@ -127,7 +128,7 @@ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) } @Override - public void persistAll() { + public void persistAll(Connection connection) { // no-op for now } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 03b6aa719ea9..86b24e9d975e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.SlowLogParams; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; @@ -223,12 +224,12 @@ private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) * table. */ @Override - public void persistAll() { + public void persistAll(Connection connection) { if (!isOnlineLogProviderEnabled) { return; } if (slowLogPersistentService != null) { - slowLogPersistentService.addAllLogsToSysTable(); + slowLogPersistentService.addAllLogsToSysTable(connection); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b3bda6c27d79..c3589d57df12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -19,8 +19,17 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT; +import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; import io.opentelemetry.api.trace.Span; @@ -129,12 +138,11 @@ import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.LoadBalancer; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; -import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; +import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; @@ -156,7 +164,10 @@ import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.SecurityConstants; @@ -457,7 +468,7 @@ public class HRegionServer extends Thread private final RegionServerAccounting regionServerAccounting; - private SlowLogTableOpsChore slowLogTableOpsChore = null; + private NamedQueueServiceChore namedQueueServiceChore = null; // Block cache private BlockCache blockCache; @@ -590,6 +601,11 @@ public class HRegionServer extends Thread // A timer to shutdown the process if abort takes too long private Timer abortMonitor; + /* + * Chore that creates replication marker rows. + */ + private ReplicationMarkerChore replicationMarkerChore; + /** * Starts a HRegionServer at the default location. *

@@ -636,7 +652,7 @@ public HRegionServer(final Configuration conf) throws IOException { this.abortRequested = new AtomicBoolean(false); this.stopped = false; - initNamedQueueRecorder(conf); + this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); rpcServices = createRpcServices(); useThisHostnameInstead = getUseThisHostnameInstead(conf); @@ -725,26 +741,6 @@ public HRegionServer(final Configuration conf) throws IOException { } } - private void initNamedQueueRecorder(Configuration conf) { - if (!(this instanceof HMaster)) { - final boolean isOnlineLogProviderEnabled = conf.getBoolean( - HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED); - if (isOnlineLogProviderEnabled) { - this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); - } - } else { - final boolean isBalancerDecisionRecording = - conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, - BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); - final boolean isBalancerRejectionRecording = - conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED, - BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED); - if (isBalancerDecisionRecording || isBalancerRejectionRecording) { - this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf); - } - } - } - // HMaster should override this method to load the specific config for master protected String getUseThisHostnameInstead(Configuration conf) throws IOException { String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); @@ -1036,6 +1032,17 @@ public boolean isClusterUp() { || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); } + private void initializeReplicationMarkerChore() { + boolean replicationMarkerEnabled = + conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + // If replication or replication marker is not enabled then return immediately. + if (replicationMarkerEnabled) { + int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, + REPLICATION_MARKER_CHORE_DURATION_DEFAULT); + replicationMarkerChore = new ReplicationMarkerChore(this, this, period); + } + } + /** * The HRegionServer sticks in this loop until closed. */ @@ -2049,9 +2056,23 @@ private void setupWALAndReplication() throws IOException { } // Instantiate replication if replication enabled. Pass it the log directories. createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); + + WALActionsListener walEventListener = getWALEventTrackerListener(conf); + if (walEventListener != null && factory.getWALProvider() != null) { + factory.getWALProvider().addWALActionsListener(walEventListener); + } this.walFactory = factory; } + private WALActionsListener getWALEventTrackerListener(Configuration conf) { + if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) { + WALEventTrackerListener listener = + new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName()); + return listener; + } + return null; + } + /** * Start up replication source and sink handlers. */ @@ -2205,16 +2226,19 @@ executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_SNAPSHOT_OP if (this.fsUtilizationChore != null) { choreService.scheduleChore(fsUtilizationChore); } - if (this.slowLogTableOpsChore != null) { - choreService.scheduleChore(slowLogTableOpsChore); + if (this.namedQueueServiceChore != null) { + choreService.scheduleChore(namedQueueServiceChore); } if (this.brokenStoreFileCleaner != null) { choreService.scheduleChore(brokenStoreFileCleaner); } - if (this.rsMobFileCleanerChore != null) { choreService.scheduleChore(rsMobFileCleanerChore); } + if (replicationMarkerChore != null) { + LOG.info("Starting replication marker chore"); + choreService.scheduleChore(replicationMarkerChore); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -2262,10 +2286,22 @@ private void initializeThreads() { final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); - if (isSlowLogTableEnabled) { + final boolean walEventTrackerEnabled = + conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT); + + if (isSlowLogTableEnabled || walEventTrackerEnabled) { // default chore duration: 10 min - final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); - slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); + // After , we will remove hbase.slowlog.systable.chore.duration conf property + final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY, + DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION); + + final int namedQueueChoreDuration = + conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT); + // Considering min of slowLogChoreDuration and namedQueueChoreDuration + int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration); + + namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration, + this.namedQueueRecorder, this.getConnection()); } if (this.nonceManager != null) { @@ -2315,6 +2351,7 @@ private void initializeThreads() { this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); registerConfigurationObservers(); + initializeReplicationMarkerChore(); } private void registerConfigurationObservers() { @@ -2795,7 +2832,8 @@ protected void stopServiceThreads() { shutdownChore(healthCheckChore); shutdownChore(storefileRefresher); shutdownChore(fsUtilizationChore); - shutdownChore(slowLogTableOpsChore); + shutdownChore(namedQueueServiceChore); + shutdownChore(replicationMarkerChore); shutdownChore(rsMobFileCleanerChore); // cancel the remaining scheduled chores (in case we missed out any) // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 3d7678c37c6e..18b8f7990c8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -601,7 +601,7 @@ private Path getNewPath() throws IOException { return newPath; } - Path getOldPath() { + public Path getOldPath() { long currentFilenum = this.filenum.get(); Path oldPath = null; if (currentFilenum > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java new file mode 100644 index 000000000000..487c7de41707 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.WALEventTrackerPayload; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class WALEventTrackerListener implements WALActionsListener { + private final Configuration conf; + private final NamedQueueRecorder namedQueueRecorder; + private final String serverName; + + public enum WalState { + ROLLING, + ROLLED, + ACTIVE + } + + public WALEventTrackerListener(Configuration conf, NamedQueueRecorder namedQueueRecorder, + ServerName serverName) { + this.conf = conf; + this.namedQueueRecorder = namedQueueRecorder; + this.serverName = serverName.getHostname(); + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) { + if (oldPath != null) { + // oldPath can be null for first wal + // Just persist the last component of path not the whole walName which includes filesystem + // scheme, walDir. + WALEventTrackerPayload payloadForOldPath = + getPayload(oldPath.getName(), WalState.ROLLING.name(), 0L); + this.namedQueueRecorder.addRecord(payloadForOldPath); + } + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) { + // Create 2 entries entry in RingBuffer. + // 1. Change state to Rolled for oldPath + // 2. Change state to Active for newPath. + if (oldPath != null) { + // oldPath can be null for first wal + // Just persist the last component of path not the whole walName which includes filesystem + // scheme, walDir. + + long fileLength = 0L; + try { + FileSystem fs = oldPath.getFileSystem(this.conf); + fileLength = fs.getFileStatus(oldPath).getLen(); + } catch (IOException ioe) { + // Saving wal length is best effort. In case of any exception just ignore. + } + WALEventTrackerPayload payloadForOldPath = + getPayload(oldPath.getName(), WalState.ROLLED.name(), fileLength); + this.namedQueueRecorder.addRecord(payloadForOldPath); + } + + WALEventTrackerPayload payloadForNewPath = + getPayload(newPath.getName(), WalState.ACTIVE.name(), 0L); + this.namedQueueRecorder.addRecord(payloadForNewPath); + } + + private WALEventTrackerPayload getPayload(String path, String state, long walLength) { + long timestamp = EnvironmentEdgeManager.currentTime(); + WALEventTrackerPayload payload = + new WALEventTrackerPayload(serverName, path, timestamp, state, walLength); + return payload; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index d89d03a145c0..cf41beb10746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; + import java.io.IOException; import java.util.ArrayList; import java.util.Map; import java.util.NavigableMap; +import java.util.TreeMap; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -28,6 +31,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; @@ -217,4 +221,12 @@ public static void filterCells(WALEdit edit, Function mapper) { cells.trimToSize(); } } + + public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc, + RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException { + NavigableMap replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); + replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); + writeMarker(wal, replicationScope, regionInfo, + WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java new file mode 100644 index 000000000000..38cf33090d9b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import static org.apache.hadoop.hbase.HConstants.NO_NONCE; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if + * hbase.regionserver.replication.sink.tracker.enabled config key is enabled and table not created + **/ +@InterfaceAudience.Private +public final class ReplicationSinkTrackerTableCreator { + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationSinkTrackerTableCreator.class); + private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds + + public static final byte[] RS_COLUMN = Bytes.toBytes("region_server_name"); + public static final byte[] WAL_NAME_COLUMN = Bytes.toBytes("wal_name"); + public static final byte[] TIMESTAMP_COLUMN = Bytes.toBytes("timestamp"); + public static final byte[] OFFSET_COLUMN = Bytes.toBytes("offset"); + + /** Will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if this conf is enabled **/ + public static final String REPLICATION_SINK_TRACKER_ENABLED_KEY = + "hbase.regionserver.replication.sink.tracker.enabled"; + public static final boolean REPLICATION_SINK_TRACKER_ENABLED_DEFAULT = false; + + /** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family as a string */ + private static final String REPLICATION_SINK_TRACKER_INFO_FAMILY_STR = "info"; + + /** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family in array of bytes */ + public static final byte[] REPLICATION_SINK_TRACKER_INFO_FAMILY = + Bytes.toBytes(REPLICATION_SINK_TRACKER_INFO_FAMILY_STR); + + public static final String REPLICATION_SINK_TRACKER_TABLE_NAME_STR = "REPLICATION.SINK_TRACKER"; + + /* Private default constructor */ + private ReplicationSinkTrackerTableCreator() { + } + + /** + * {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table name - can be enabled with config - + * hbase.regionserver.replication.sink.tracker.enabled + */ + public static final TableName REPLICATION_SINK_TRACKER_TABLE_NAME = + TableName.valueOf(REPLICATION_SINK_TRACKER_TABLE_NAME_STR); + + private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder + .newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).setRegionReplication(1) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(REPLICATION_SINK_TRACKER_INFO_FAMILY) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1) + .setTimeToLive((int) TTL).build()); + + /* + * We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is + * enabled and table doesn't exists already. + */ + public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) + throws IOException { + boolean replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, + REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); + if (!replicationSinkTrackerEnabled) { + LOG.info("replication sink tracker requests logging to table {} is disabled." + " Quitting.", + REPLICATION_SINK_TRACKER_TABLE_NAME_STR); + return; + } + if (!masterServices.getTableDescriptors().exists(REPLICATION_SINK_TRACKER_TABLE_NAME)) { + LOG.info("{} table not found. Creating.", REPLICATION_SINK_TRACKER_TABLE_NAME_STR); + masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java new file mode 100644 index 000000000000..b14f546540f6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This chore is responsible to create replication marker rows with special WALEdit with family as + * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as + * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key + * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every + * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate + * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in + * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor} + * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the + * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster, + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the + * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table. + */ +@InterfaceAudience.Private +public class ReplicationMarkerChore extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class); + private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl(); + public static final RegionInfo REGION_INFO = + RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build(); + private static final String DELIMITER = "_"; + private final RegionServerServices rsServices; + private WAL wal; + + public static final String REPLICATION_MARKER_ENABLED_KEY = + "hbase.regionserver.replication.marker.enabled"; + public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false; + + public static final String REPLICATION_MARKER_CHORE_DURATION_KEY = + "hbase.regionserver.replication.marker.chore.duration"; + public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds + + public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices, + int period) { + super("ReplicationTrackerChore", stopper, period); + this.rsServices = rsServices; + } + + @Override + protected void chore() { + if (wal == null) { + try { + // TODO: We need to add support for multi WAL implementation. + wal = rsServices.getWAL(null); + } catch (IOException ioe) { + LOG.warn("Unable to get WAL ", ioe); + // Shouldn't happen. Ignore and wait for the next chore run. + return; + } + } + String serverName = rsServices.getServerName().getServerName(); + long timeStamp = EnvironmentEdgeManager.currentTime(); + // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname, + // regionserver name and wal offset at ReplicationSourceWALReaderThread. + byte[] rowKey = getRowKey(serverName, timeStamp); + if (LOG.isTraceEnabled()) { + LOG.trace("Creating replication marker edit."); + } + + // This creates a new ArrayList of all the online regions for every call. + List regions = rsServices.getRegions(); + + if (regions.isEmpty()) { + LOG.info("There are no online regions for this server, so skipping adding replication marker" + + " rows for this regionserver"); + return; + } + Region region = regions.get(ThreadLocalRandom.current().nextInt(regions.size())); + try { + WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp); + } catch (IOException ioe) { + LOG.error("Exception while sync'ing replication tracker edit", ioe); + // TODO: Should we stop region server or add a metric and keep going. + } + } + + /** + * Creates a rowkey with region server name and timestamp. + * @param serverName region server name + * @param timestamp timestamp + */ + public static byte[] getRowKey(String serverName, long timestamp) { + // converting to string since this will help seeing the timestamp in string format using + // hbase shell commands. + String timestampStr = String.valueOf(timestamp); + final String rowKeyStr = serverName + DELIMITER + timestampStr; + return Bytes.toBytes(rowKeyStr); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 3b4ec3ea8b98..7c5583a87429 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -17,6 +17,16 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; + +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -61,6 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -107,6 +118,7 @@ public class ReplicationSink { * Row size threshold for multi requests above which a warning is logged */ private final int rowSizeWarnThreshold; + private boolean replicationSinkTrackerEnabled; /** * Create a sink for replication @@ -117,6 +129,8 @@ public ReplicationSink(Configuration conf) throws IOException { this.conf = HBaseConfiguration.create(conf); rowSizeWarnThreshold = conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); + replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, + REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); @@ -230,6 +244,18 @@ public void replicateEntries(List entries, final CellScanner cells, bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } + } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { + Mutation put = processReplicationMarkerEntry(cell); + if (put == null) { + continue; + } + table = REPLICATION_SINK_TRACKER_TABLE_NAME; + List clusterIds = new ArrayList<>(); + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + put.setClusterIds(clusterIds); + addToHashMultiMap(rowMap, table, clusterIds, put); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { @@ -292,6 +318,33 @@ public void replicateEntries(List entries, final CellScanner cells, } } + /* + * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. + * If false, then ignore this cell. If set to true, de-serialize value into + * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and + * timestamp from ReplicationMarkerDescriptor. + */ + private Put processReplicationMarkerEntry(Cell cell) throws IOException { + // If source is emitting replication marker rows but sink is not accepting them, + // ignore the edits. + if (!replicationSinkTrackerEnabled) { + return null; + } + WALProtos.ReplicationMarkerDescriptor descriptor = + WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(), + (Bytes.toBytes(descriptor.getRegionServerName()))); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(), + Bytes.toBytes(descriptor.getWalName())); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(), + Bytes.toBytes(cell.getTimestamp())); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(), + Bytes.toBytes(descriptor.getOffset())); + return put; + } + private void buildBulkLoadHFileMap( final Map>>> bulkLoadHFileMap, TableName table, BulkLoadDescriptor bld) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a827a2555fb0..2db8acbd2dc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -827,4 +827,9 @@ public ReplicationQueueStorage getReplicationQueueStorage() { public String logPeerId() { return "peerId=" + this.getPeerId() + ","; } + + // Visible for testing purpose + public long getTotalReplicatedEdits() { + return totalReplicatedEdits.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 6e5da0feffb1..7337694addbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -69,6 +69,10 @@ static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) { if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { return; } + // Allow replication marker row to pass through. + if (WALEdit.isReplicationMarkerEdit(logEdit)) { + return; + } // For replay, or if all the cells are markers, do not need to store replication scope. if ( logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c)) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 28768679fb65..d6351ea0eab7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -19,6 +19,7 @@ import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -42,6 +44,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -178,6 +183,7 @@ protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { } LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); + updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry, entrySize); @@ -341,6 +347,10 @@ private WALEntryBatch createBatch(WALEntryStream entryStream) { } protected final Entry filterEntry(Entry entry) { + // Always replicate if this edit is Replication Marker edit. + if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) { + return entry; + } Entry filtered = filter.filter(entry); if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { LOG.trace("Filtered entry for replication: {}", entry); @@ -451,6 +461,36 @@ private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) { return totalStoreFilesSize; } + /* + * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to + * cell's value. + */ + private void updateReplicationMarkerEdit(Entry entry, long offset) { + WALEdit edit = entry.getEdit(); + // Return early if it is not ReplicationMarker edit. + if (!WALEdit.isReplicationMarkerEdit(edit)) { + return; + } + List cells = edit.getCells(); + Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell"); + Cell cell = cells.get(0); + // Create a descriptor with region_server_name, wal_name and offset + WALProtos.ReplicationMarkerDescriptor.Builder builder = + WALProtos.ReplicationMarkerDescriptor.newBuilder(); + builder.setRegionServerName(this.source.getServer().getServerName().getHostname()); + builder.setWalName(getCurrentPath().getName()); + builder.setOffset(offset); + WALProtos.ReplicationMarkerDescriptor descriptor = builder.build(); + + // Create a new KeyValue + KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), + CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray()); + ArrayList newCells = new ArrayList<>(); + newCells.add(kv); + // Update edit with new cell. + edit.setCells(newCells); + } + /** * @param size delta size for grown buffer * @return true if we should clear buffer and push all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index bc24f8e8d3c4..7a2086486bba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -132,6 +132,22 @@ public class WALEdit implements HeapSize { @InterfaceAudience.Private public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); + /** + * Periodically {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} + * will create marker edits with family as {@link WALEdit#METAFAMILY} and + * {@link WALEdit#REPLICATION_MARKER} as qualifier and an empty value. + * org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader will populate the + * Replication Marker edit with region_server_name, wal_name and wal_offset encoded in + * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor} + * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the + * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster, + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the + * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR + * table. + */ + @InterfaceAudience.Private + public static final byte[] REPLICATION_MARKER = Bytes.toBytes("HBASE::REPLICATION_MARKER"); + private final transient boolean replay; private ArrayList cells; @@ -454,4 +470,28 @@ private WALEdit addCell(Cell cell) { this.cells.add(cell); return this; } + + /** + * Creates a replication tracker edit with {@link #METAFAMILY} family and + * {@link #REPLICATION_MARKER} qualifier and has null value. + * @param rowKey rowkey + * @param timestamp timestamp + */ + public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) { + KeyValue kv = + new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put); + return new WALEdit().add(kv); + } + + /** + * Checks whether this edit is a replication marker edit. + * @param edit edit + * @return true if the cell within an edit has column = METAFAMILY and qualifier = + * REPLICATION_MARKER, false otherwise + */ + public static boolean isReplicationMarkerEdit(WALEdit edit) { + // Check just the first cell from the edit. ReplicationMarker edit will have only 1 cell. + return edit.getCells().size() == 1 + && CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index a6463094beae..ee093855bcef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -318,6 +318,13 @@ SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) thr Entry entry; startTS = EnvironmentEdgeManager.currentTime(); while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { + if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) { + // Skip processing the replication marker edits. + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring Replication marker edits."); + } + continue; + } byte[] region = entry.getKey().getEncodedRegionName(); String encodedRegionNameAsStr = Bytes.toString(region); Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java new file mode 100644 index 000000000000..d4d210879d3e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestWALEventTracker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEventTracker.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); + private static HBaseTestingUtility TEST_UTIL; + public static Configuration CONF; + + @BeforeClass + public static void setup() throws Exception { + CONF = HBaseConfiguration.create(); + CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true); + // Set the chore for less than a second. + CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900); + CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100); + TEST_UTIL = new HBaseTestingUtility(CONF); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void teardown() throws Exception { + LOG.info("Calling teardown"); + TEST_UTIL.shutdownMiniHBaseCluster(); + } + + @Before + public void waitForWalEventTrackerTableCreation() { + Waiter.waitFor(CONF, 10000, + (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); + } + + @Test + public void testWALRolling() throws Exception { + Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection(); + waitForWALEventTrackerTable(connection); + List wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs(); + assertEquals(1, wals.size()); + AbstractFSWAL wal = (AbstractFSWAL) wals.get(0); + Path wal1Path = wal.getOldPath(); + wal.rollWriter(true); + + FileSystem fs = TEST_UTIL.getTestFileSystem(); + long wal1Length = fs.getFileStatus(wal1Path).getLen(); + Path wal2Path = wal.getOldPath(); + String hostName = + TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname(); + + TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3); + List walEventsList = getRows(hostName, connection); + + // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the + // time we will lose ACTIVE event for the first wal creates since hmaster will take some time + // to create hbase:waleventtracker table and by that time RS will already create the first wal + // and will try to persist it. + compareEvents(hostName, wal1Path.getName(), walEventsList, + new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(), + WALEventTrackerListener.WalState.ROLLED.name())), + false); + + // There should be only 1 event for wal2Name which is current wal, with ACTIVE state + compareEvents(hostName, wal2Path.getName(), walEventsList, + new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true); + + // Check that event with wal1Path and state ROLLED has the wal length set. + checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length); + } + + private void checkWALRolledEventHasSize(List walEvents, String walName, + long actualSize) { + List eventsFilteredByNameState = new ArrayList<>(); + // Filter the list by walName and wal state. + for (WALEventTrackerPayload event : walEvents) { + if ( + walName.equals(event.getWalName()) + && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState()) + ) { + eventsFilteredByNameState.add(event); + } + } + + assertEquals(1, eventsFilteredByNameState.size()); + // We are not comparing the size of the WAL in the tracker table with actual size. + // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length + // will always be incorrect. + // For FSHLog implementation, we close the WAL in an executor thread. So there will always be + // a difference of trailer size bytes. + // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength()); + } + + /** + * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME} + * @param hostName hostname + * @param walName walname + * @param walEvents event from table + * @param expectedStates expected states for the hostname and wal name + * @param strict whether to check strictly or not. Sometimes we lose the ACTIVE state + * event for the first wal since it takes some time for hmaster to create + * the table and by that time RS already creates the first WAL and will try + * to persist ACTIVE event to waleventtracker table. + */ + private void compareEvents(String hostName, String walName, + List walEvents, List expectedStates, boolean strict) { + List eventsFilteredByWalName = new ArrayList<>(); + + // Assert that all the events have the same host name i.e they came from the same RS. + for (WALEventTrackerPayload event : walEvents) { + assertEquals(hostName, event.getRsName()); + } + + // Filter the list by walName. + for (WALEventTrackerPayload event : walEvents) { + if (walName.equals(event.getWalName())) { + eventsFilteredByWalName.add(event); + } + } + + // Assert that the list of events after filtering by walName should be same as expected states. + if (strict) { + assertEquals(expectedStates.size(), eventsFilteredByWalName.size()); + } + + for (WALEventTrackerPayload event : eventsFilteredByWalName) { + expectedStates.remove(event.getState()); + } + assertEquals(0, expectedStates.size()); + } + + private void waitForWALEventTrackerTable(Connection connection) throws IOException { + TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME)); + } + + private List getRows(String rowKeyPrefix, Connection connection) + throws IOException { + List list = new ArrayList<>(); + Scan scan = new Scan(); + scan.withStartRow(Bytes.toBytes(rowKeyPrefix)); + Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); + ResultScanner scanner = table.getScanner(scan); + + Result r; + while ((r = scanner.next()) != null) { + List cells = r.listCells(); + list.add(getPayload(cells)); + } + return list; + } + + private WALEventTrackerPayload getPayload(List cells) { + String rsName = null, walName = null, walState = null; + long timestamp = 0L, walLength = 0L; + for (Cell cell : cells) { + byte[] qualifier = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + String qualifierStr = Bytes.toString(qualifier); + + if (RS_COLUMN.equals(qualifierStr)) { + rsName = Bytes.toString(value); + } else if (WAL_NAME_COLUMN.equals(qualifierStr)) { + walName = Bytes.toString(value); + } else if (WAL_STATE_COLUMN.equals(qualifierStr)) { + walState = Bytes.toString(value); + } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) { + timestamp = Bytes.toLong(value); + } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) { + walLength = Bytes.toLong(value); + } + } + return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength); + } + + private int getTableCount(Connection connection) throws Exception { + Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME); + ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + LOG.info("Table count: " + count); + return count; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java new file mode 100644 index 000000000000..47ca5a8252bf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestWALEventTrackerTableAccessor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEventTrackerTableAccessor.class); + + /* + * Tests that rowkey is getting constructed correctly. + */ + @Test + public void testRowKey() { + String rsName = "test-region-server"; + String walName = "test-wal-0"; + long timeStamp = EnvironmentEdgeManager.currentTime(); + String walState = WALEventTrackerListener.WalState.ACTIVE.name(); + long walLength = 100L; + WALEventTrackerPayload payload = + new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength); + byte[] rowKeyBytes = WALEventTrackerTableAccessor.getRowKey(payload); + + String rowKeyBytesStr = Bytes.toString(rowKeyBytes); + String[] fields = rowKeyBytesStr.split(WALEventTrackerTableAccessor.DELIMITER, -1); + // This is the format of rowkey: walName_timestamp_walState; + assertEquals(walName, fields[0]); + assertEquals(timeStamp, Long.valueOf(fields[1]).longValue()); + assertEquals(walState, fields[2]); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java new file mode 100644 index 000000000000..4fbb03b13eee --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.namequeues; + +import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(SmallTests.class) +public class TestWalEventTrackerQueueService { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWalEventTrackerQueueService.class); + + @Rule + public TestName name = new TestName(); + + /* + * Test whether wal event tracker metrics are being incremented. + */ + @Test + public void testMetrics() throws Exception { + String rsName = "test-region-server"; + String walName = "test-wal-0"; + long timeStamp = EnvironmentEdgeManager.currentTime(); + String walState = WALEventTrackerListener.WalState.ACTIVE.name(); + long walLength = 100L; + WALEventTrackerPayload payload = + new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength); + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true); + conf.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100); + MetricsWALEventTrackerSourceImpl source = new MetricsWALEventTrackerSourceImpl( + name.getMethodName(), name.getMethodName(), name.getMethodName(), name.getMethodName()); + WALEventTrackerQueueService service = new WALEventTrackerQueueService(conf, source); + service.addToQueue(payload); + Connection mockConnection = mock(Connection.class); + doReturn(conf).when(mockConnection).getConfiguration(); + // Always throw IOException whenever mock connection is being used. + doThrow(new IOException()).when(mockConnection).getTable(WAL_EVENT_TRACKER_TABLE_NAME); + assertEquals(0L, source.getFailedPuts()); + assertEquals(0L, source.getNumRecordsFailedPuts()); + // Persist all the events. + service.persistAll(mockConnection); + assertEquals(1L, source.getFailedPuts()); + assertEquals(1L, source.getNumRecordsFailedPuts()); + // Verify that we tried MAX_RETRY_ATTEMPTS retry attempts to persist. + verify(mockConnection, times(1 + WALEventTrackerTableAccessor.DEFAULT_MAX_ATTEMPTS)) + .getTable(WAL_EVENT_TRACKER_TABLE_NAME); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java new file mode 100644 index 000000000000..00de2118795a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class }) +public class TestWALEdit { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEdit.class); + + private static final String RS_NAME = "test-region-server-name"; + + /** + * Tests that + * {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)} method is + * creating WALEdit with correct family and qualifier. + */ + @Test + public void testCreateReplicationMarkerEdit() { + long timestamp = EnvironmentEdgeManager.currentTime(); + + byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp); + WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp); + assertEquals(1, edit.getCells().size()); + Cell cell = edit.getCells().get(0); + assertTrue(CellUtil.matchingFamily(cell, METAFAMILY)); + assertTrue(CellUtil.matchingQualifier(cell, REPLICATION_MARKER)); + assertTrue(WALEdit.isReplicationMarkerEdit(edit)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java new file mode 100644 index 000000000000..5a30bc717112 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test creates 2 mini hbase cluster. One cluster with + * "hbase.regionserver.replication.marker.enabled" conf key. This will create + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create + * marker rows to be replicated to sink cluster. Second cluster with + * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the + * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table. + **/ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationMarker { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationMarker.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class); + + private static Configuration conf1; + private static Configuration conf2; + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf2 = new Configuration(conf1); + // Run the replication marker chore in cluster1. + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec + utility1 = new HBaseTestingUtility(conf1); + + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + // Enable the replication sink tracker for cluster 2 + conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true); + utility2 = new HBaseTestingUtility(conf2); + + // Start cluster 2 first so that hbase:replicationsinktracker table gets created first. + utility2.startMiniCluster(1); + waitForReplicationTrackerTableCreation(); + + // Start cluster1 + utility1.startMiniCluster(1); + Admin admin1 = utility1.getAdmin(); + ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder(); + rpcBuilder.setClusterKey(utility2.getClusterKey()); + admin1.addReplicationPeer("1", rpcBuilder.build()); + + ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) + .getReplicationSourceService().getReplicationManager(); + // Wait until the peer gets established. + Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1); + } + + private static void waitForReplicationTrackerTableCreation() { + Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin() + .tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); + } + + @AfterClass + public static void tearDown() throws Exception { + utility1.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + } + + @Test + public void testReplicationMarkerRow() throws Exception { + // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will + // create enough sentinel rows. + Thread.sleep(5000); + WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null); + String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName(); + String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname(); + // Since we sync the marker edits while appending to wal, all the edits should be visible + // to Replication threads immediately. + assertTrue(getReplicatedEntries() >= 5); + // Force log roll. + wal1.rollWriter(true); + String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName(); + Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection(); + // Sleep for 5 more seconds to get marker rows with new wal name. + Thread.sleep(5000); + // Wait for cluster 2 to have atleast 8 tracker rows from cluster1. + utility2.waitFor(5000, () -> getTableCount(connection2) >= 8); + // Get replication marker rows from cluster2 + List list = getRows(connection2); + for (ReplicationSinkTrackerRow desc : list) { + // All the tracker rows should have same region server name i.e. rs of cluster1 + assertEquals(rs1Name, desc.getRegionServerName()); + // All the tracker rows will have either wal1 or wal2 name. + assertTrue(walName1ForCluster1.equals(desc.getWalName()) + || walName2ForCluster1.equals(desc.getWalName())); + } + + // This table shouldn't exist on cluster1 since + // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster. + assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); + // This table shouldn't exist on cluster1 since + // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster. + assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); + } + + /* + * Get rows for replication sink tracker table. + */ + private List getRows(Connection connection) throws IOException { + List list = new ArrayList<>(); + Scan scan = new Scan(); + Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); + ResultScanner scanner = table.getScanner(scan); + + Result r; + while ((r = scanner.next()) != null) { + List cells = r.listCells(); + list.add(getPayload(cells)); + } + return list; + } + + private ReplicationSinkTrackerRow getPayload(List cells) { + String rsName = null, walName = null; + Long offset = null; + long timestamp = 0L; + for (Cell cell : cells) { + byte[] qualifier = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + + if (Bytes.equals(RS_COLUMN, qualifier)) { + rsName = Bytes.toString(value); + } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) { + walName = Bytes.toString(value); + } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) { + timestamp = Bytes.toLong(value); + } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) { + offset = Bytes.toLong(value); + } + } + ReplicationSinkTrackerRow row = + new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset); + return row; + } + + static class ReplicationSinkTrackerRow { + private String region_server_name; + private String wal_name; + private long timestamp; + private long offset; + + public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp, + long offset) { + this.region_server_name = region_server_name; + this.wal_name = wal_name; + this.timestamp = timestamp; + this.offset = offset; + } + + public String getRegionServerName() { + return region_server_name; + } + + public String getWalName() { + return wal_name; + } + + public long getTimestamp() { + return timestamp; + } + + public long getOffset() { + return offset; + } + + @Override + public String toString() { + return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\'' + + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset + + '}'; + } + } + + private int getTableCount(Connection connection) throws Exception { + Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); + ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + LOG.info("Table count: " + count); + return count; + } + + /* + * Return replicated entries from cluster1. + */ + private long getReplicatedEntries() { + ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) + .getReplicationSourceService().getReplicationManager(); + List sources = manager.getSources(); + assertEquals(1, sources.size()); + ReplicationSource source = (ReplicationSource) sources.get(0); + return source.getTotalReplicatedEdits(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index c14c589a9196..c978a412a25b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey; +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -60,10 +64,12 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; +import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -485,6 +491,39 @@ public void testSplitLeavesCompactionEventsEdits() throws IOException { assertEquals(11, countWAL(splitLog[0])); } + /* + * Tests that WalSplitter ignores replication marker edits. + */ + @Test(timeout = 30000) + public void testSplitRemovesReplicationMarkerEdits() throws IOException { + RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO; + Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1"); + generateReplicationMarkerEdits(path, regionInfo); + useDifferentDFSClient(); + List logFiles = + SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null); + assertEquals(1, logFiles.size()); + assertEquals(path, logFiles.get(0).getPath()); + List splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + // Make sure that WALSplitter doesn't fail. + assertEquals(0, splitPaths.size()); + } + + private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException { + long timestamp = EnvironmentEdgeManager.currentTime(); + fs.mkdirs(WALDIR); + try (Writer writer = wals.createWALWriter(fs, path)) { + WALProtos.ReplicationMarkerDescriptor.Builder builder = + WALProtos.ReplicationMarkerDescriptor.newBuilder(); + builder.setWalName("wal-name"); + builder.setRegionServerName("rs-name"); + builder.setOffset(0L); + WALProtos.ReplicationMarkerDescriptor desc = builder.build(); + appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(), + getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1); + } + } + /** * @param expectedEntries -1 to not assert * @return the count across all regions