Skip to content

Commit

Permalink
HBASE-27536: Include more request information in slowlog for Scans (#…
Browse files Browse the repository at this point in the history
…5155)

Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored Apr 16, 2023
1 parent 1e75a2a commit 2b098b0
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Optional;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -52,6 +53,11 @@ final public class OnlineLogRecord extends LogEntry {
if (slowLogPayload.getMultiServiceCalls() == 0) {
jsonObj.remove("multiServiceCalls");
}
if (slowLogPayload.getScan().isPresent()) {
jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap()));
} else {
jsonObj.remove("scan");
}
return jsonObj;
}).create();

Expand All @@ -72,6 +78,7 @@ final public class OnlineLogRecord extends LogEntry {
private final int multiGetsCount;
private final int multiMutationsCount;
private final int multiServiceCalls;
private final Optional<Scan> scan;

public long getStartTime() {
return startTime;
Expand Down Expand Up @@ -136,11 +143,20 @@ public int getMultiServiceCalls() {
return multiServiceCalls;
}

private OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
/**
* If {@value org.apache.hadoop.hbase.HConstants#SLOW_LOG_SCAN_PAYLOAD_ENABLED} is enabled then
* this value may be present and should represent the Scan that produced the given
* {@link OnlineLogRecord}
*/
public Optional<Scan> getScan() {
return scan;
}

OnlineLogRecord(final long startTime, final int processingTime, final int queueTime,
final long responseSize, final long blockBytesScanned, final String clientAddress,
final String serverClass, final String methodName, final String callDetails, final String param,
final String regionName, final String userName, final int multiGetsCount,
final int multiMutationsCount, final int multiServiceCalls) {
final int multiMutationsCount, final int multiServiceCalls, final Scan scan) {
this.startTime = startTime;
this.processingTime = processingTime;
this.queueTime = queueTime;
Expand All @@ -156,6 +172,7 @@ private OnlineLogRecord(final long startTime, final int processingTime, final in
this.multiGetsCount = multiGetsCount;
this.multiMutationsCount = multiMutationsCount;
this.multiServiceCalls = multiServiceCalls;
this.scan = Optional.ofNullable(scan);
}

public static class OnlineLogRecordBuilder {
Expand All @@ -174,6 +191,7 @@ public static class OnlineLogRecordBuilder {
private int multiGetsCount;
private int multiMutationsCount;
private int multiServiceCalls;
private Scan scan = null;

public OnlineLogRecordBuilder setStartTime(long startTime) {
this.startTime = startTime;
Expand Down Expand Up @@ -253,10 +271,15 @@ public OnlineLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) {
return this;
}

public OnlineLogRecordBuilder setScan(Scan scan) {
this.scan = scan;
return this;
}

public OnlineLogRecord build() {
return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize,
blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName,
userName, multiGetsCount, multiMutationsCount, multiServiceCalls);
userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan);
}
}

Expand All @@ -280,15 +303,17 @@ public boolean equals(Object o) {
.append(multiServiceCalls, that.multiServiceCalls).append(clientAddress, that.clientAddress)
.append(serverClass, that.serverClass).append(methodName, that.methodName)
.append(callDetails, that.callDetails).append(param, that.param)
.append(regionName, that.regionName).append(userName, that.userName).isEquals();
.append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan)
.isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(startTime).append(processingTime).append(queueTime)
.append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass)
.append(methodName).append(callDetails).append(param).append(regionName).append(userName)
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).toHashCode();
.append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan)
.toHashCode();
}

@Override
Expand All @@ -305,7 +330,7 @@ public String toString() {
.append("methodName", methodName).append("callDetails", callDetails).append("param", param)
.append("regionName", regionName).append("userName", userName)
.append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount)
.append("multiServiceCalls", multiServiceCalls).toString();
.append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* SlowLog params object that contains detailed info as params and region name : to be used for
* filter purpose
Expand All @@ -32,15 +34,24 @@ public class SlowLogParams {

private final String regionName;
private final String params;
private final ClientProtos.Scan scan;

public SlowLogParams(String regionName, String params, ClientProtos.Scan scan) {
this.regionName = regionName;
this.params = params;
this.scan = scan;
}

public SlowLogParams(String regionName, String params) {
this.regionName = regionName;
this.params = params;
this.scan = null;
}

public SlowLogParams(String params) {
this.regionName = StringUtils.EMPTY;
this.params = params;
this.scan = null;
}

public String getRegionName() {
Expand All @@ -51,10 +62,14 @@ public String getParams() {
return params;
}

public ClientProtos.Scan getScan() {
return scan;
}

@Override
public String toString() {
return new ToStringBuilder(this).append("regionName", regionName).append("params", params)
.toString();
.append("scan", scan).toString();
}

@Override
Expand All @@ -67,11 +82,11 @@ public boolean equals(Object o) {
}
SlowLogParams that = (SlowLogParams) o;
return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params)
.isEquals();
.append("scan", scan).isEquals();
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(regionName).append(params).toHashCode();
return new HashCodeBuilder(17, 37).append(regionName).append(params).append(scan).toHashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
Expand Down Expand Up @@ -231,6 +233,8 @@
@InterfaceAudience.Private // TODO: some clients (Hive, etc) use this class
public final class ProtobufUtil {

private static final Logger LOG = LoggerFactory.getLogger(ProtobufUtil.class.getName());

private ProtobufUtil() {
}

Expand Down Expand Up @@ -2144,15 +2148,19 @@ private static String getStringForByteString(ByteString bs) {
* @param message Message object {@link Message}
* @return SlowLogParams with regionName(for filter queries) and params
*/
public static SlowLogParams getSlowLogParams(Message message) {
public static SlowLogParams getSlowLogParams(Message message, boolean slowLogScanPayloadEnabled) {
if (message == null) {
return null;
}
if (message instanceof ScanRequest) {
ScanRequest scanRequest = (ScanRequest) message;
String regionName = getStringForByteString(scanRequest.getRegion().getValue());
String params = TextFormat.shortDebugString(message);
return new SlowLogParams(regionName, params);
if (slowLogScanPayloadEnabled) {
return new SlowLogParams(regionName, params, scanRequest.getScan());
} else {
return new SlowLogParams(regionName, params);
}
} else if (message instanceof MutationProto) {
MutationProto mutationProto = (MutationProto) message;
String params = "type= " + mutationProto.getMutateType().toString();
Expand Down Expand Up @@ -3367,7 +3375,7 @@ public static Set<String> toCompactedStoreFiles(byte[] bytes) throws IOException
* @return SlowLog Payload for client usecase
*/
private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLogPayload) {
OnlineLogRecord onlineLogRecord =
OnlineLogRecord.OnlineLogRecordBuilder onlineLogRecord =
new OnlineLogRecord.OnlineLogRecordBuilder().setCallDetails(slowLogPayload.getCallDetails())
.setClientAddress(slowLogPayload.getClientAddress())
.setMethodName(slowLogPayload.getMethodName())
Expand All @@ -3379,8 +3387,15 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog
.setResponseSize(slowLogPayload.getResponseSize())
.setBlockBytesScanned(slowLogPayload.getBlockBytesScanned())
.setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime())
.setUserName(slowLogPayload.getUserName()).build();
return onlineLogRecord;
.setUserName(slowLogPayload.getUserName());
if (slowLogPayload.hasScan()) {
try {
onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan()));
} catch (Exception e) {
LOG.warn("Failed to convert Scan proto {}", slowLogPayload.getScan(), e);
}
}
return onlineLogRecord.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ClientTests.class, SmallTests.class })
public class TestOnlineLogRecord {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestOnlineLogRecord.class);

@Test
public void itSerializesScan() {
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(123));
scan.withStopRow(Bytes.toBytes(456));
String expectedOutput = "{\n" + " \"startTime\": 1,\n" + " \"processingTime\": 2,\n"
+ " \"queueTime\": 3,\n" + " \"responseSize\": 4,\n" + " \"blockBytesScanned\": 5,\n"
+ " \"multiGetsCount\": 6,\n" + " \"multiMutationsCount\": 7,\n" + " \"scan\": {\n"
+ " \"startRow\": \"\\\\x00\\\\x00\\\\x00{\",\n"
+ " \"stopRow\": \"\\\\x00\\\\x00\\\\x01\\\\xC8\",\n" + " \"batch\": -1,\n"
+ " \"cacheBlocks\": true,\n" + " \"totalColumns\": 0,\n"
+ " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n"
+ " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n"
+ " 9223372036854775807\n" + " ]\n" + " }\n" + "}";
OnlineLogRecord o =
new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan);
String actualOutput = o.toJsonPrettyPrint();
System.out.println(actualOutput);
Assert.assertEquals(actualOutput, expectedOutput);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,9 @@ public enum OperationStatusCode {
// Default 10 mins.
public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;

public static final String SLOW_LOG_SCAN_PAYLOAD_ENABLED = "hbase.slowlog.scan.payload.enabled";
public static final boolean SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT = false;

public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
"hbase.shell.timestamp.format.epoch";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ option java_outer_classname = "TooSlowLog";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "client/Client.proto";

message SlowLogPayload {
required int64 start_time = 1;
required int32 processing_time = 2;
Expand All @@ -45,6 +47,7 @@ message SlowLogPayload {
required Type type = 15;

optional int64 block_bytes_scanned = 16;
optional Scan scan = 17;

// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
// Majority of times, slow logs are also large logs and hence, ALL is combination of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ public class SlowLogQueueService implements NamedQueueService {
private final boolean isSlowLogTableEnabled;
private final SlowLogPersistentService slowLogPersistentService;
private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
private final boolean slowLogScanPayloadEnabled;

public SlowLogQueueService(Configuration conf) {
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.slowLogScanPayloadEnabled = conf.getBoolean(HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED,
HConstants.SLOW_LOG_SCAN_PAYLOAD_ENABLED_DEFAULT);

if (!isOnlineLogProviderEnabled) {
this.isSlowLogTableEnabled = false;
Expand Down Expand Up @@ -129,7 +132,8 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
long endTime = EnvironmentEdgeManager.currentTime();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
final SlowLogParams slowLogParams =
ProtobufUtil.getSlowLogParams(param, slowLogScanPayloadEnabled);
int numGets = 0;
int numMutations = 0;
int numServiceCalls = 0;
Expand All @@ -152,16 +156,19 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
final String methodDescriptorName =
methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
TooSlowLog.SlowLogPayload.Builder slowLogPayloadBuilder = TooSlowLog.SlowLogPayload.newBuilder()
.setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
.setClientAddress(clientAddress).setMethodName(methodDescriptorName).setMultiGets(numGets)
.setMultiMutations(numMutations).setMultiServiceCalls(numServiceCalls)
.setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
.setProcessingTime(processingTime).setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName)
.build();
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName);
if (slowLogParams != null && slowLogParams.getScan() != null) {
slowLogPayloadBuilder.setScan(slowLogParams.getScan());
}
TooSlowLog.SlowLogPayload slowLogPayload = slowLogPayloadBuilder.build();
slowLogQueue.add(slowLogPayload);
if (isSlowLogTableEnabled) {
if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
Expand Down
Loading

0 comments on commit 2b098b0

Please sign in to comment.