Skip to content

Commit

Permalink
HBASE-28010 Connection attributes can become corrupted on the server …
Browse files Browse the repository at this point in the history
…side (#5366)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Aug 23, 2023
1 parent 7dfc2f3 commit 2b46973
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -27,7 +29,6 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;

/**
Expand Down Expand Up @@ -83,7 +84,18 @@ public interface RpcCall extends RpcCallContext {
/** Returns The request header of this call. */
RequestHeader getHeader();

ConnectionHeader getConnectionHeader();
/**
* Returns the map of attributes specified when building the Connection.
* @see org.apache.hadoop.hbase.client.ConnectionFactory#createConnection(Configuration,
* ExecutorService, User, Map)
*/
Map<String, byte[]> getConnectionAttributes();

/**
* Returns the map of attributes specified when building the request.
* @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
*/
Map<String, byte[]> getRequestAttributes();

/** Returns Port of remote address in this call */
int getRemotePort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -43,14 +45,15 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
Expand Down Expand Up @@ -100,6 +103,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// cumulative size of serialized exceptions
private long exceptionSize = 0;
private final boolean retryImmediatelySupported;
private volatile Map<String, byte[]> requestAttributes;

// This is a dirty hack to address HBASE-22539. The highest bit is for rpc ref and cleanup, and
// the rest of the bits are for WAL reference count. We can only call release if all of them are
Expand Down Expand Up @@ -209,8 +213,25 @@ public RequestHeader getHeader() {
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
return this.connection.connectionHeader;
public Map<String, byte[]> getConnectionAttributes() {
return this.connection.connectionAttributes;
}

@Override
public Map<String, byte[]> getRequestAttributes() {
if (this.requestAttributes == null) {
if (header.getAttributeList().isEmpty()) {
this.requestAttributes = Collections.emptyMap();
} else {
Map<String, byte[]> requestAttributes =
Maps.newHashMapWithExpectedSize(header.getAttributeList().size());
for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
requestAttributes.put(nameBytesPair.getName(), nameBytesPair.getValue().toByteArray());
}
this.requestAttributes = requestAttributes;
}
}
return this.requestAttributes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
Expand Down Expand Up @@ -65,6 +67,7 @@
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
Expand All @@ -75,6 +78,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
Expand Down Expand Up @@ -103,6 +107,7 @@ abstract class ServerRpcConnection implements Closeable {
protected int remotePort;
protected InetAddress addr;
protected ConnectionHeader connectionHeader;
protected Map<String, byte[]> connectionAttributes;

/**
* Codec the client asked use.
Expand Down Expand Up @@ -405,6 +410,19 @@ private CodedInputStream createCis(ByteBuff buf) {
// Reads the connection header following version
private void processConnectionHeader(ByteBuff buf) throws IOException {
this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf));

// we want to copy the attributes prior to releasing the buffer so that they don't get corrupted
// eventually
if (connectionHeader.getAttributeList().isEmpty()) {
this.connectionAttributes = Collections.emptyMap();
} else {
this.connectionAttributes =
Maps.newHashMapWithExpectedSize(connectionHeader.getAttributeList().size());
for (HBaseProtos.NameBytesPair nameBytesPair : connectionHeader.getAttributeList()) {
this.connectionAttributes.put(nameBytesPair.getName(),
nameBytesPair.getValue().toByteArray());
}
}
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) {
throw new EmptyServiceNameException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -59,8 +60,6 @@

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;

import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;

@Category({ ClientTests.class, MediumTests.class })
public class TestRequestAndConnectionAttributes {

Expand Down Expand Up @@ -101,15 +100,22 @@ public void setup() {
}

@Test
public void testConnectionAttributes() throws IOException {
public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException {
TableName tableName = TableName.valueOf("testConnectionAttributes");
TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1,
HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
byte[] cf = Bytes.toBytes("0");
TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE,
AttributesCoprocessor.class.getName());

Configuration conf = TEST_UTIL.getConfiguration();
try (Connection conn = ConnectionFactory.createConnection(conf, null,
AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) {
Result result = table.get(new Get(Bytes.toBytes(0)));

// submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection
// header
byte[] bytes = new byte[300];
new Random().nextBytes(bytes);
Result result = table.get(new Get(bytes));

assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey()));
Expand Down Expand Up @@ -291,15 +297,15 @@ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,

// for connection attrs test
RpcCall rpcCall = RpcServer.getCurrentCall().get();
for (HBaseProtos.NameBytesPair attr : rpcCall.getHeader().getAttributeList()) {
for (Map.Entry<String, byte[]> attr : rpcCall.getRequestAttributes().entrySet()) {
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
.setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName()))
.setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build());
.setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getKey()))
.setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
}
for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) {
for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) {
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
.setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName()))
.setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build());
.setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey()))
.setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
}
result.sort(CellComparator.getInstance());
c.bypass();
Expand All @@ -320,15 +326,15 @@ public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WAL

private void validateRequestAttributes() {
RpcCall rpcCall = RpcServer.getCurrentCall().get();
List<HBaseProtos.NameBytesPair> attrs = rpcCall.getHeader().getAttributeList();
Map<String, byte[]> attrs = rpcCall.getRequestAttributes();
if (attrs.size() != REQUEST_ATTRIBUTES.size()) {
return;
}
for (HBaseProtos.NameBytesPair attr : attrs) {
if (!REQUEST_ATTRIBUTES.containsKey(attr.getName())) {
for (Map.Entry<String, byte[]> attr : attrs.entrySet()) {
if (!REQUEST_ATTRIBUTES.containsKey(attr.getKey())) {
return;
}
if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getName()), attr.getValue().toByteArray())) {
if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getKey()), attr.getValue())) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -695,7 +696,12 @@ public RPCProtos.RequestHeader getHeader() {
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
public Map<String, byte[]> getConnectionAttributes() {
return null;
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.hbase.CellScanner;
Expand Down Expand Up @@ -222,7 +223,12 @@ public RPCProtos.RequestHeader getHeader() {
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
public Map<String, byte[]> getConnectionAttributes() {
return null;
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return null;
}

Expand Down

0 comments on commit 2b46973

Please sign in to comment.