Skip to content

Commit

Permalink
HBASE-26913 Replication Observability Framework (#4862)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Viraj Jasani <[email protected]>
  • Loading branch information
shahrs87 authored Nov 8, 2022
1 parent ea4ccf0 commit ecf3deb
Show file tree
Hide file tree
Showing 39 changed files with 1,971 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -48,8 +46,6 @@ public class SlowLogTableAccessor {

private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);

private static Connection connection;

/**
* hbase:slowlog table name - can be enabled with config -
* hbase.regionserver.slowlog.systable.enabled
Expand All @@ -66,10 +62,10 @@ private static void doPut(final Connection connection, final List<Put> puts) thr
/**
* Add slow/large log records to hbase:slowlog table
* @param slowLogPayloads List of SlowLogPayload to process
* @param configuration Configuration to use for connection
* @param connection connection
*/
public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
final Configuration configuration) {
Connection connection) {
List<Put> puts = new ArrayList<>(slowLogPayloads.size());
for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
final byte[] rowKey = getRowKey(slowLogPayload);
Expand Down Expand Up @@ -102,26 +98,12 @@ public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowL
puts.add(put);
}
try {
if (connection == null) {
createConnection(configuration);
}
doPut(connection, puts);
} catch (Exception e) {
LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
}
}

private static synchronized void createConnection(Configuration configuration)
throws IOException {
Configuration conf = new Configuration(configuration);
// rpc timeout: 20s
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
// retry count: 5
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
connection = ConnectionFactory.createConnection(conf);
}

/**
* Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep
* records with sorted order of time, however records added at the very same time could be in
Expand All @@ -140,5 +122,4 @@ private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload)
final long rowKeyLong = Long.parseLong(timeAndHashcode);
return Bytes.toBytes(rowKeyLong);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1643,6 +1643,14 @@ public enum OperationStatusCode {
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;

@Deprecated
// since <need to know the version number> and will be removed in <version number>
// Instead use hbase.regionserver.named.queue.chore.duration config property
public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY =
"hbase.slowlog.systable.chore.duration";
// Default 10 mins.
public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;

public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
"hbase.shell.timestamp.format.epoch";

Expand Down
2 changes: 1 addition & 1 deletion hbase-common/src/main/resources/hbase-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2004,7 +2004,7 @@ possible configurations would overwhelm and obscure the important.
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;

import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public interface MetricsWALEventTrackerSource extends BaseSource {
/**
* The name of the metrics
*/
String METRICS_NAME = "WALEventTracker";

/**
* The name of the metrics context that metrics will be under.
*/
String METRICS_CONTEXT = "regionserver";

/**
* Description
*/
String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker";

/**
* The name of the metrics context that metrics will be under in jmx
*/
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;

String NUM_FAILED_PUTS = "numFailedPuts";
String NUM_FAILED_PUTS_DESC = "Number of put requests that failed";

String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts";
String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts";

/*
* Increment 2 counters, numFailedPuts and numRecordsFailedPuts
*/
void incrFailedPuts(long numRecords);

/*
* Get the failed puts counter.
*/
long getFailedPuts();

/*
* Get the number of records in failed puts.
*/
long getNumRecordsFailedPuts();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;

import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl
implements MetricsWALEventTrackerSource {

private final MutableFastCounter numFailedPutsCount;
private final MutableFastCounter numRecordsFailedPutsCount;

public MetricsWALEventTrackerSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}

public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription,
String metricsContext, String metricsJmxContext) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
numFailedPutsCount =
this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L);
numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS,
NUM_RECORDS_FAILED_PUTS_DESC, 0L);
}

@Override
public void incrFailedPuts(long numRecords) {
numFailedPutsCount.incr();
numRecordsFailedPutsCount.incr(numRecords);
}

@Override
public long getFailedPuts() {
return numFailedPutsCount.value();
}

@Override
public long getNumRecordsFailedPuts() {
return numRecordsFailedPutsCount.value();
}
}
9 changes: 9 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/WAL.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,12 @@ message RegionEventDescriptor {
*/
message WALTrailer {
}

/**
* Special WAL entry for replication marker event.
*/
message ReplicationMarkerDescriptor {
required string region_server_name = 1;
required string wal_name = 2;
required uint64 offset = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
Expand Down Expand Up @@ -214,6 +215,7 @@
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.SecurityConstants;
Expand Down Expand Up @@ -1243,6 +1245,10 @@ private void finishActiveMasterInitialization(MonitoredTask status)
final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
slowLogMasterService.init();

WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// Create REPLICATION.SINK_TRACKER table if needed.
ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);

// clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before
// master initialization. See HBASE-5916.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.waleventtracker;

import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* WALEventTracker Table creation to be used by HMaster
*/
@InterfaceAudience.Private
public final class WALEventTrackerTableCreator {
private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class);

public static final String WAL_EVENT_TRACKER_ENABLED_KEY =
"hbase.regionserver.wal.event.tracker.enabled";
public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false;

/** The walEventTracker info family as a string */
private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info";

/** The walEventTracker info family in array of bytes */
public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY =
Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR);

private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds

private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder
.newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME).setRegionReplication(1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(WAL_EVENT_TRACKER_INFO_FAMILY)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1)
.setTimeToLive((int) TTL).build());

/* Private default constructor */
private WALEventTrackerTableCreator() {
}

/*
* We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and
* table doesn't exists already.
*/
public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
throws IOException {
boolean walEventTrackerEnabled =
conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
if (!walEventTrackerEnabled) {
LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR
+ " is disabled. Quitting.");
return;
}
if (
!masterServices.getTableDescriptors()
.exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
) {
LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating.");
masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -70,7 +71,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz);
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz,
e);
}
}
}
Expand Down Expand Up @@ -105,8 +107,8 @@ boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
* Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system.
*/
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
namedQueueServices.get(namedQueueEvent).persistAll();
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
namedQueueServices.get(namedQueueEvent).persistAll(connection);
}

/**
Expand Down
Loading

0 comments on commit ecf3deb

Please sign in to comment.