From 96b4ce899699f715963374e1f6a4b3ec65cdc63a Mon Sep 17 00:00:00 2001 From: Ray Mattingly Date: Thu, 20 Jul 2023 14:19:04 -0400 Subject: [PATCH] add connection and request headers to slowlog payload --- .../hbase/shaded/protobuf/ProtobufUtil.java | 1 + .../shaded/protobuf/TestProtobufUtil.java | 1 + .../protobuf/server/region/TooSlowLog.proto | 4 ++ .../namequeues/impl/SlowLogQueueService.java | 4 +- .../namequeues/TestNamedQueueRecorder.java | 69 ++++++++++++++++++- 5 files changed, 76 insertions(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index aa3cb39c5971..f2b20fba07fb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -134,6 +134,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; +import org.apache.hbase.thirdparty.com.google.gson.Gson; import org.apache.hbase.thirdparty.com.google.gson.JsonArray; import org.apache.hbase.thirdparty.com.google.gson.JsonElement; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index 2b4380dfbb6d..91f5e8edc644 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.ArrayBackedTag; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto index d0abdd1af75a..4c275948b277 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto @@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "HBase.proto"; import "client/Client.proto"; message SlowLogPayload { @@ -49,6 +50,9 @@ message SlowLogPayload { optional int64 block_bytes_scanned = 16; optional Scan scan = 17; + repeated NameBytesPair connection_attribute = 18; + repeated NameBytesPair request_attribute = 19; + // SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large. // Majority of times, slow logs are also large logs and hence, ALL is combination of // both diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 48121a8b066a..372a9a5d6494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -164,7 +164,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { .setProcessingTime(processingTime).setQueueTime(qTime) .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned) - .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName); + .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName) + .addAllRequestAttribute(rpcCall.getHeader().getAttributeList()) + .addAllConnectionAttribute(rpcCall.getConnectionHeader().getAttributeList()); if (slowLogParams != null && slowLogParams.getScan() != null) { slowLogPayloadBuilder.setScan(slowLogParams.getScan()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 7a3ca0b7cf9f..fd6591802bd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -46,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -71,6 +72,20 @@ public class TestNamedQueueRecorder { private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); + private static final List REQUEST_HEADERS = + ImmutableList. builder() + .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") + .setValue(ByteString.copyFromUtf8("r")).build()) + .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") + .setValue(ByteString.copyFromUtf8("h")).build()) + .build(); + private static final List CONNECTION_HEADERS = + ImmutableList. builder() + .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") + .setValue(ByteString.copyFromUtf8("c")).build()) + .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") + .setValue(ByteString.copyFromUtf8("h")).build()) + .build(); private NamedQueueRecorder namedQueueRecorder; @@ -599,6 +614,52 @@ public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { })); } + @Test + public void testOnlineSlowLogRequestAttributes() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) { + return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS); + } + return false; + })); + } + + @Test + public void testOnlineSlowLogConnectionAttributes() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + Constructor constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty()) { + return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS); + } + return false; + })); + } + static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, int forcedParamIndex) { RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); @@ -691,12 +752,16 @@ public long getSize() { @Override public RPCProtos.RequestHeader getHeader() { - return null; + RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder(); + REQUEST_HEADERS.forEach(builder::addAttribute); + return builder.build(); } @Override public RPCProtos.ConnectionHeader getConnectionHeader() { - return null; + RPCProtos.ConnectionHeader.Builder builder = RPCProtos.ConnectionHeader.newBuilder(); + CONNECTION_HEADERS.forEach(builder::addAttribute); + return builder.build(); } @Override