diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index cc97a39c7ee4..0555202f88b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; @@ -27,7 +29,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; /** @@ -83,7 +84,18 @@ public interface RpcCall extends RpcCallContext { /** Returns The request header of this call. */ RequestHeader getHeader(); - ConnectionHeader getConnectionHeader(); + /** + * Returns the map of attributes specified when building the Connection. + * @see org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration, + * ExecutorService, User, Map) + */ + Map getConnectionAttributes(); + + /** + * Returns the map of attributes specified when building the request. + * @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[]) + */ + Map getRequestAttributes(); /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index bfd9e2091502..70c4e8f7ef7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -25,7 +25,9 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.CellScanner; @@ -43,14 +45,15 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -100,6 +103,7 @@ public abstract class ServerCall implements RpcCa // cumulative size of serialized exceptions private long exceptionSize = 0; private final boolean retryImmediatelySupported; + private volatile Map requestAttributes; // This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and // the rest of the bits are for WAL reference count. We can only call release if all of them are @@ -209,8 +213,25 @@ public RequestHeader getHeader() { } @Override - public RPCProtos.ConnectionHeader getConnectionHeader() { - return this.connection.connectionHeader; + public Map getConnectionAttributes() { + return this.connection.connectionAttributes; + } + + @Override + public Map getRequestAttributes() { + if (this.requestAttributes == null) { + if (header.getAttributeList().isEmpty()) { + this.requestAttributes = Collections.emptyMap(); + } else { + Map requestAttributes = + Maps.newHashMapWithExpectedSize(header.getAttributeList().size()); + for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { + requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray()); + } + this.requestAttributes = requestAttributes; + } + } + return this.requestAttributes; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index b09f33c47f9a..e0f69e4b84c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -31,6 +31,8 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Properties; import org.apache.commons.crypto.cipher.CryptoCipherFactory; @@ -65,6 +67,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; @@ -75,6 +78,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; @@ -103,6 +107,7 @@ abstract class ServerRpcConnection implements Closeable { protected int remotePort; protected InetAddress addr; protected ConnectionHeader connectionHeader; + protected Map connectionAttributes; /** * Codec the client asked use. @@ -405,6 +410,19 @@ private CodedInputStream createCis(ByteBuff buf) { // Reads the connection header following version private void processConnectionHeader(ByteBuff buf) throws IOException { this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf)); + + // we want to copy the attributes prior to releasing the buffer so that they don't get corrupted + // eventually + if (connectionHeader.getAttributeList().isEmpty()) { + this.connectionAttributes = Collections.emptyMap(); + } else { + this.connectionAttributes = + Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size()); + for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) { + this.connectionAttributes.put(nameBytesPair.getName(), + nameBytesPair.getValue().toByteArray()); + } + } String serviceName = connectionHeader.getServiceName(); if (serviceName == null) { throw new EmptyServiceNameException(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java index 793fdc5a1f91..ca29f7bd0e31 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -59,8 +60,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; - @Category({ ClientTests.class, MediumTests.class }) public class TestRequestAndConnectionAttributes { @@ -101,15 +100,22 @@ public void setup() { } @Test - public void testConnectionAttributes() throws IOException { + public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException { TableName tableName = TableName.valueOf("testConnectionAttributes"); - TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1, - HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName()); + byte[] cf = Bytes.toBytes("0"); + TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE, + AttributesCoprocessor.class.getName()); Configuration conf = TEST_UTIL.getConfiguration(); try (Connection conn = ConnectionFactory.createConnection(conf, null, AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) { - Result result = table.get(new Get(Bytes.toBytes(0))); + + // submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection + // header + byte[] bytes = new byte[300]; + new Random().nextBytes(bytes); + Result result = table.get(new Get(bytes)); + assertEquals(CONNECTION_ATTRIBUTES.size(), result.size()); for (Map.Entry attr : CONNECTION_ATTRIBUTES.entrySet()) { byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey())); @@ -291,15 +297,15 @@ public void preGetOp(ObserverContext c, Get get, // for connection attrs test RpcCall rpcCall = RpcServer.getCurrentCall().get(); - for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) { + for (Map.Entry attr : rpcCall.getRequestAttributes().entrySet()) { result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) - .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName())) - .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + .setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey())) + .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); } - for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) { + for (Map.Entry attr : rpcCall.getConnectionAttributes().entrySet()) { result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow()) - .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName())) - .setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build()); + .setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey())) + .setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build()); } result.sort(CellComparator.getInstance()); c.bypass(); @@ -320,15 +326,15 @@ public void prePut(ObserverContext c, Put put, WAL private void validateRequestAttributes() { RpcCall rpcCall = RpcServer.getCurrentCall().get(); - List attrs = rpcCall.getHeader().getAttributeList(); + Map attrs = rpcCall.getRequestAttributes(); if (attrs.size() != REQUEST_ATTRIBUTES.size()) { return; } - for (HBaseProtos.NameBytesPair attr : attrs) { - if (!REQUEST_ATTRIBUTES.containsKey(attr.getName())) { + for (Map.Entry attr : attrs.entrySet()) { + if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) { return; } - if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getName()), attr.getValue().toByteArray())) { + if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) { return; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index 6b4bc8fd39fa..0135062de408 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -695,7 +696,12 @@ public RPCProtos.RequestHeader getHeader() { } @Override - public RPCProtos.ConnectionHeader getConnectionHeader() { + public Map getConnectionAttributes() { + return null; + } + + @Override + public Map getRequestAttributes() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index dd49d00ac3a1..83f788ba1518 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetAddress; import java.util.HashSet; +import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.hadoop.hbase.CellScanner; @@ -222,7 +223,12 @@ public RPCProtos.RequestHeader getHeader() { } @Override - public RPCProtos.ConnectionHeader getConnectionHeader() { + public Map getConnectionAttributes() { + return null; + } + + @Override + public Map getRequestAttributes() { return null; }