From 46a6c1a0dd83d100898d4c339d959827b04a62ad Mon Sep 17 00:00:00 2001 From: Ray Mattingly <rmattingly@hubspot.com> Date: Thu, 14 Sep 2023 10:00:20 -0400 Subject: [PATCH 1/3] conflicts add connection and request headers to slowlog payload client side of slow log attributes. RS UI updates prefer toStringBinary in attribute deserialization rsOperationDetails UI fixes checkstyle spotless fix tests --- .../hadoop/hbase/client/OnlineLogRecord.java | 55 +++++++++++++-- .../hbase/shaded/protobuf/ProtobufUtil.java | 30 +++++++- .../hbase/client/TestOnlineLogRecord.java | 56 ++++++++++++++- .../protobuf/server/region/TooSlowLog.proto | 4 ++ .../namequeues/impl/SlowLogQueueService.java | 18 ++++- .../regionserver/rsOperationDetails.jsp | 9 +++ .../namequeues/TestNamedQueueRecorder.java | 70 ++++++++++++++++++- 7 files changed, 231 insertions(+), 11 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java index 65e2f58f4529..d9fd51e80a95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OnlineLogRecord.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -29,6 +30,8 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonObject; import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; + /** * Slow/Large Log payload for hbase-client, to be used by Admin API get_slow_responses and * get_large_responses @@ -53,6 +56,18 @@ final public class OnlineLogRecord extends LogEntry { if (slowLogPayload.getMultiServiceCalls() == 0) { jsonObj.remove("multiServiceCalls"); } + if (slowLogPayload.getRequestAttributes().isEmpty()) { + jsonObj.remove("requestAttributes"); + } else { + jsonObj.add("requestAttributes", gson + .toJsonTree(ProtobufUtil.deserializeAttributes(slowLogPayload.getRequestAttributes()))); + } + if (slowLogPayload.getConnectionAttributes().isEmpty()) { + jsonObj.remove("connectionAttributes"); + } else { + jsonObj.add("connectionAttributes", gson.toJsonTree( + ProtobufUtil.deserializeAttributes(slowLogPayload.getConnectionAttributes()))); + } if (slowLogPayload.getScan().isPresent()) { jsonObj.add("scan", gson.toJsonTree(slowLogPayload.getScan().get().toMap())); } else { @@ -79,6 +94,8 @@ final public class OnlineLogRecord extends LogEntry { private final int multiMutationsCount; private final int multiServiceCalls; private final Optional<Scan> scan; + private final Map<String, byte[]> requestAttributes; + private final Map<String, byte[]> connectionAttributes; public long getStartTime() { return startTime; @@ -152,11 +169,20 @@ public Optional<Scan> getScan() { return scan; } + public Map<String, byte[]> getRequestAttributes() { + return requestAttributes; + } + + public Map<String, byte[]> getConnectionAttributes() { + return connectionAttributes; + } + OnlineLogRecord(final long startTime, final int processingTime, final int queueTime, final long responseSize, final long blockBytesScanned, final String clientAddress, final String serverClass, final String methodName, final String callDetails, final String param, final String regionName, final String userName, final int multiGetsCount, - final int multiMutationsCount, final int multiServiceCalls, final Scan scan) { + final int multiMutationsCount, final int multiServiceCalls, final Scan scan, + final Map<String, byte[]> requestAttributes, final Map<String, byte[]> connectionAttributes) { this.startTime = startTime; this.processingTime = processingTime; this.queueTime = queueTime; @@ -173,6 +199,8 @@ public Optional<Scan> getScan() { this.multiMutationsCount = multiMutationsCount; this.multiServiceCalls = multiServiceCalls; this.scan = Optional.ofNullable(scan); + this.requestAttributes = requestAttributes; + this.connectionAttributes = connectionAttributes; } public static class OnlineLogRecordBuilder { @@ -192,6 +220,8 @@ public static class OnlineLogRecordBuilder { private int multiMutationsCount; private int multiServiceCalls; private Scan scan = null; + private Map<String, byte[]> requestAttributes; + private Map<String, byte[]> connectionAttributes; public OnlineLogRecordBuilder setStartTime(long startTime) { this.startTime = startTime; @@ -276,10 +306,22 @@ public OnlineLogRecordBuilder setScan(Scan scan) { return this; } + public OnlineLogRecordBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) { + this.requestAttributes = requestAttributes; + return this; + } + + public OnlineLogRecordBuilder + setConnectionAttributes(Map<String, byte[]> connectionAttributes) { + this.connectionAttributes = connectionAttributes; + return this; + } + public OnlineLogRecord build() { return new OnlineLogRecord(startTime, processingTime, queueTime, responseSize, blockBytesScanned, clientAddress, serverClass, methodName, callDetails, param, regionName, - userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan); + userName, multiGetsCount, multiMutationsCount, multiServiceCalls, scan, requestAttributes, + connectionAttributes); } } @@ -304,7 +346,8 @@ public boolean equals(Object o) { .append(serverClass, that.serverClass).append(methodName, that.methodName) .append(callDetails, that.callDetails).append(param, that.param) .append(regionName, that.regionName).append(userName, that.userName).append(scan, that.scan) - .isEquals(); + .append(requestAttributes, that.requestAttributes) + .append(connectionAttributes, that.connectionAttributes).isEquals(); } @Override @@ -313,7 +356,7 @@ public int hashCode() { .append(responseSize).append(blockBytesScanned).append(clientAddress).append(serverClass) .append(methodName).append(callDetails).append(param).append(regionName).append(userName) .append(multiGetsCount).append(multiMutationsCount).append(multiServiceCalls).append(scan) - .toHashCode(); + .append(requestAttributes).append(connectionAttributes).toHashCode(); } @Override @@ -330,7 +373,9 @@ public String toString() { .append("methodName", methodName).append("callDetails", callDetails).append("param", param) .append("regionName", regionName).append("userName", userName) .append("multiGetsCount", multiGetsCount).append("multiMutationsCount", multiMutationsCount) - .append("multiServiceCalls", multiServiceCalls).append("scan", scan).toString(); + .append("multiServiceCalls", multiServiceCalls).append("scan", scan) + .append("requestAttributes", requestAttributes) + .append("connectionAttributes", connectionAttributes).toString(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index aa3cb39c5971..c14a0d042823 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2196,6 +2196,25 @@ public static SlowLogParams getSlowLogParams(Message message, boolean slowLogSca return new SlowLogParams(params); } + /** + * Convert a list of NameBytesPair to a more readable CSV + */ + public static String convertAttributesToCsv(List<NameBytesPair> attributes) { + if (attributes.isEmpty()) { + return HConstants.EMPTY_STRING; + } + return deserializeAttributes(convertNameBytesPairsToMap(attributes)).entrySet().stream() + .map(entry -> entry.getKey() + " = " + entry.getValue()).collect(Collectors.joining(", ")); + } + + /** + * Convert a map of byte array attributes to a more readable map of binary string representations + */ + public static Map<String, String> deserializeAttributes(Map<String, byte[]> attributes) { + return attributes.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, entry -> Bytes.toStringBinary(entry.getValue()))); + } + /** * Print out some subset of a MutationProto rather than all of it and its data * @param proto Protobuf to print out @@ -3389,7 +3408,10 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog .setResponseSize(slowLogPayload.getResponseSize()) .setBlockBytesScanned(slowLogPayload.getBlockBytesScanned()) .setServerClass(slowLogPayload.getServerClass()).setStartTime(slowLogPayload.getStartTime()) - .setUserName(slowLogPayload.getUserName()); + .setUserName(slowLogPayload.getUserName()) + .setRequestAttributes(convertNameBytesPairsToMap(slowLogPayload.getRequestAttributeList())) + .setConnectionAttributes( + convertNameBytesPairsToMap(slowLogPayload.getConnectionAttributeList())); if (slowLogPayload.hasScan()) { try { onlineLogRecord.setScan(ProtobufUtil.toScan(slowLogPayload.getScan())); @@ -3400,6 +3422,12 @@ private static LogEntry getSlowLogRecord(final TooSlowLog.SlowLogPayload slowLog return onlineLogRecord.build(); } + private static Map<String, byte[]> + convertNameBytesPairsToMap(List<NameBytesPair> nameBytesPairs) { + return nameBytesPairs.stream().collect(Collectors.toMap(NameBytesPair::getName, + nameBytesPair -> nameBytesPair.getValue().toByteArray())); + } + /** * Convert AdminProtos#SlowLogResponses to list of {@link OnlineLogRecord} * @param logEntry slowlog response protobuf instance diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java index 846738d82987..fe753973ae20 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestOnlineLogRecord.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collections; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -26,6 +29,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + @Category({ ClientTests.class, SmallTests.class }) public class TestOnlineLogRecord { @@ -47,10 +53,56 @@ public void itSerializesScan() { + " \"maxResultSize\": -1,\n" + " \"families\": {},\n" + " \"caching\": -1,\n" + " \"maxVersions\": 1,\n" + " \"timeRange\": [\n" + " 0,\n" + " 9223372036854775807\n" + " ]\n" + " }\n" + "}"; - OnlineLogRecord o = - new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, 6, 7, 0, scan); + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, scan, Collections.emptyMap(), Collections.emptyMap()); String actualOutput = o.toJsonPrettyPrint(); System.out.println(actualOutput); Assert.assertEquals(actualOutput, expectedOutput); } + + @Test + public void itSerializesRequestAttributes() { + Map<String, byte[]> requestAttributes = ImmutableMap.<String, byte[]> builder() + .put("r", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build(); + Set<String> expectedOutputs = + ImmutableSet.<String> builder().add("requestAttributes").add("\"r\": \"1\"") + .add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build(); + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, requestAttributes, Collections.emptyMap()); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected))); + } + + @Test + public void itOmitsEmptyRequestAttributes() { + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap()); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + Assert.assertFalse(actualOutput.contains("requestAttributes")); + } + + @Test + public void itSerializesConnectionAttributes() { + Map<String, byte[]> connectionAttributes = ImmutableMap.<String, byte[]> builder() + .put("c", Bytes.toBytes("1")).put("2", Bytes.toBytes(0.0)).build(); + Set<String> expectedOutputs = + ImmutableSet.<String> builder().add("connectionAttributes").add("\"c\": \"1\"") + .add("\"2\": \"\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\\\\x00\"").build(); + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, Collections.emptyMap(), connectionAttributes); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + expectedOutputs.forEach(expected -> Assert.assertTrue(actualOutput.contains(expected))); + } + + @Test + public void itOmitsEmptyConnectionAttributes() { + OnlineLogRecord o = new OnlineLogRecord(1, 2, 3, 4, 5, null, null, null, null, null, null, null, + 6, 7, 0, null, Collections.emptyMap(), Collections.emptyMap()); + String actualOutput = o.toJsonPrettyPrint(); + System.out.println(actualOutput); + Assert.assertFalse(actualOutput.contains("connectionAttributes")); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto index d0abdd1af75a..4c275948b277 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/TooSlowLog.proto @@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog"; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "HBase.proto"; import "client/Client.proto"; message SlowLogPayload { @@ -49,6 +50,9 @@ message SlowLogPayload { optional int64 block_bytes_scanned = 16; optional Scan scan = 17; + repeated NameBytesPair connection_attribute = 18; + repeated NameBytesPair request_attribute = 19; + // SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large. // Majority of times, slow logs are also large logs and hence, ALL is combination of // both diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 48121a8b066a..62e4c5d96696 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hbase.namequeues.impl; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -42,12 +44,14 @@ import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; import org.apache.hbase.thirdparty.com.google.common.collect.Queues; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog; /** @@ -164,7 +168,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { .setProcessingTime(processingTime).setQueueTime(qTime) .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned) - .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName); + .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName) + .addAllRequestAttribute(buildNameBytesPairs(rpcCall.getRequestAttributes())) + .addAllConnectionAttribute(buildNameBytesPairs(rpcCall.getConnectionAttributes())); if (slowLogParams != null && slowLogParams.getScan() != null) { slowLogPayloadBuilder.setScan(slowLogParams.getScan()); } @@ -177,6 +183,16 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { } } + private static Collection<HBaseProtos.NameBytesPair> + buildNameBytesPairs(Map<String, byte[]> attributes) { + if (attributes == null) { + return Collections.emptySet(); + } + return attributes.entrySet().stream().map(attr -> HBaseProtos.NameBytesPair.newBuilder() + .setName(attr.getKey()).setValue(ByteString.copyFrom(attr.getValue())).build()) + .collect(Collectors.toSet()); + } + @Override public boolean clearNamedQueue() { if (!isOnlineLogProviderEnabled) { diff --git a/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp b/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp index a1ff23143bad..e8944b63f435 100644 --- a/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/regionserver/rsOperationDetails.jsp @@ -26,6 +26,7 @@ import="org.apache.hadoop.hbase.regionserver.HRegionServer" import="org.apache.hadoop.hbase.HConstants" import="org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog" + import="org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil" import="org.apache.hadoop.hbase.namequeues.NamedQueueRecorder" import="org.apache.hadoop.hbase.namequeues.RpcLogDetails" import="org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest" @@ -108,6 +109,8 @@ <th>MultiService Calls</th> <th>Call Details</th> <th>Param</th> + <th>Request Attributes</th> + <th>Connection Attributes</th> </tr> <% if (slowLogs != null && !slowLogs.isEmpty()) {%> <% for (TooSlowLog.SlowLogPayload r : slowLogs) { %> @@ -127,6 +130,8 @@ <td><%=r.getMultiServiceCalls()%></td> <td><%=r.getCallDetails()%></td> <td><%=r.getParam()%></td> + <td><%=ProtobufUtil.convertAttributesToCsv(r.getRequestAttributeList())%></td> + <td><%=ProtobufUtil.convertAttributesToCsv(r.getConnectionAttributeList())%></td> </tr> <% } %> <% } %> @@ -151,6 +156,8 @@ <th>MultiService Calls</th> <th>Call Details</th> <th>Param</th> + <th>Request Attributes</th> + <th>Connection Attributes</th> </tr> <% if (largeLogs != null && !largeLogs.isEmpty()) {%> <% for (TooSlowLog.SlowLogPayload r : largeLogs) { %> @@ -170,6 +177,8 @@ <td><%=r.getMultiServiceCalls()%></td> <td><%=r.getCallDetails()%></td> <td><%=r.getParam()%></td> + <td><%=ProtobufUtil.convertAttributesToCsv(r.getRequestAttributeList())%></td> + <td><%=ProtobufUtil.convertAttributesToCsv(r.getConnectionAttributeList())%></td> </tr> <% } %> <% } %> diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index c24b364a2277..af6c51260fd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -47,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -72,6 +74,20 @@ public class TestNamedQueueRecorder { private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class); private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil(); + private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS = + ImmutableList.<HBaseProtos.NameBytesPair> builder() + .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") + .setValue(ByteString.copyFromUtf8("r")).build()) + .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") + .setValue(ByteString.copyFromUtf8("h")).build()) + .build(); + private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS = + ImmutableList.<HBaseProtos.NameBytesPair> builder() + .add(HBaseProtos.NameBytesPair.newBuilder().setName("1") + .setValue(ByteString.copyFromUtf8("c")).build()) + .add(HBaseProtos.NameBytesPair.newBuilder().setName("2") + .setValue(ByteString.copyFromUtf8("h")).build()) + .build(); private NamedQueueRecorder namedQueueRecorder; @@ -600,6 +616,54 @@ public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception { })); } + @Test + public void testOnlineSlowLogRequestAttributes() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) { + return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS); + } + return false; + })); + } + + @Test + public void testOnlineSlowLogConnectionAttributes() throws Exception { + Configuration conf = applySlowLogRecorderConf(1); + Constructor<NamedQueueRecorder> constructor = + NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class); + constructor.setAccessible(true); + namedQueueRecorder = constructor.newInstance(conf); + AdminProtos.SlowLogResponseRequest request = + AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build(); + + Assert.assertEquals(getSlowLogPayloads(request).size(), 0); + LOG.debug("Initially ringbuffer of Slow Log records is empty"); + RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan(); + namedQueueRecorder.addRecord(rpcLogDetails); + Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> { + Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny(); + if ( + slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty() + ) { + return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS); + } + return false; + })); + } + static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className, int forcedParamIndex) { RpcCall rpcCall = getRpcCall(userName, forcedParamIndex); @@ -697,12 +761,14 @@ public RPCProtos.RequestHeader getHeader() { @Override public Map<String, byte[]> getConnectionAttributes() { - return null; + return CONNECTION_HEADERS.stream().collect(Collectors + .toMap(HBaseProtos.NameBytesPair::getName, pair -> pair.getValue().toByteArray())); } @Override public Map<String, byte[]> getRequestAttributes() { - return null; + return REQUEST_HEADERS.stream().collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName, + pair -> pair.getValue().toByteArray())); } @Override From 67c4f4f15a871bf2c4f0b7c42ba1cea7daac9b8d Mon Sep 17 00:00:00 2001 From: Ray Mattingly <rmattingly@hubspot.com> Date: Mon, 18 Sep 2023 13:13:21 -0400 Subject: [PATCH 2/3] add attributes to RpcLogDetails --- .../hadoop/hbase/namequeues/RpcLogDetails.java | 16 +++++++++++++++- .../namequeues/impl/SlowLogQueueService.java | 4 ++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index c0baf21e4340..01f229240cc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.namequeues; +import java.util.Map; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.yetus.audience.InterfaceAudience; @@ -39,6 +40,8 @@ public class RpcLogDetails extends NamedQueuePayload { private final String className; private final boolean isSlowLog; private final boolean isLargeLog; + private final Map<String, byte[]> connectionAttributes; + private final Map<String, byte[]> requestAttributes; public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize, long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) { @@ -51,6 +54,8 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long this.className = className; this.isSlowLog = isSlowLog; this.isLargeLog = isLargeLog; + this.connectionAttributes = rpcCall.getConnectionAttributes(); + this.requestAttributes = rpcCall.getRequestAttributes(); } public RpcCall getRpcCall() { @@ -85,11 +90,20 @@ public Message getParam() { return param; } + public Map<String, byte[]> getConnectionAttributes() { + return connectionAttributes; + } + + public Map<String, byte[]> getRequestAttributes() { + return requestAttributes; + } + @Override public String toString() { return new ToStringBuilder(this).append("rpcCall", rpcCall).append("param", param) .append("clientAddress", clientAddress).append("responseSize", responseSize) .append("className", className).append("isSlowLog", isSlowLog) - .append("isLargeLog", isLargeLog).toString(); + .append("isLargeLog", isLargeLog).append("connectionAttributes", connectionAttributes) + .append("requestAttributes", requestAttributes).toString(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java index 62e4c5d96696..fb29b8563ef7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java @@ -169,8 +169,8 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY) .setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned) .setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName) - .addAllRequestAttribute(buildNameBytesPairs(rpcCall.getRequestAttributes())) - .addAllConnectionAttribute(buildNameBytesPairs(rpcCall.getConnectionAttributes())); + .addAllRequestAttribute(buildNameBytesPairs(rpcLogDetails.getRequestAttributes())) + .addAllConnectionAttribute(buildNameBytesPairs(rpcLogDetails.getConnectionAttributes())); if (slowLogParams != null && slowLogParams.getScan() != null) { slowLogPayloadBuilder.setScan(slowLogParams.getScan()); } From 089705dd16f722e912780f4bce89f4da8bce6d5d Mon Sep 17 00:00:00 2001 From: Ray Mattingly <rmattingly@hubspot.com> Date: Mon, 18 Sep 2023 13:19:20 -0400 Subject: [PATCH 3/3] include comment about attr evaluation --- .../org/apache/hadoop/hbase/namequeues/RpcLogDetails.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java index 01f229240cc1..eb35d886bbb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java @@ -54,6 +54,10 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long this.className = className; this.isSlowLog = isSlowLog; this.isLargeLog = isLargeLog; + + // it's important to call getConnectionAttributes and getRequestAttributes here + // because otherwise the buffers may get released before the log details are processed which + // would result in corrupted attributes this.connectionAttributes = rpcCall.getConnectionAttributes(); this.requestAttributes = rpcCall.getRequestAttributes(); }