Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
HBASE-26474 Implement connection-level attributes
Browse files Browse the repository at this point in the history
Add support for `db.system`, `db.connection_string`, `db.user`.
ndimiduk committed Dec 8, 2021
1 parent 66ecb3f commit fd1c668
Showing 21 changed files with 501 additions and 164 deletions.
Original file line number Diff line number Diff line change
@@ -271,6 +271,11 @@ public CompletableFuture<ServerName> getActiveMaster() {
getClass().getSimpleName() + ".getClusterId");
}

@Override
public String getConnectionString() {
return "unimplemented";
}

@Override
public void close() {
trace(() -> {
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncConnection {
public class AsyncConnectionImpl implements AsyncConnection {

private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);

@@ -198,6 +198,14 @@ synchronized ChoreService getChoreService() {
return choreService;
}

public User getUser() {
return user;
}

public ConnectionRegistry getConnectionRegistry() {
return registry;
}

@Override
public Configuration getConfiguration() {
return conf;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,24 +20,27 @@
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -96,9 +99,12 @@ private boolean isMeta(TableName tableName) {
return TableName.isMetaTableName(tableName);
}

private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
private <T> CompletableFuture<T> tracedLocationFuture(
Supplier<CompletableFuture<T>> action,
Function<T, List<String>> getRegionNames,
Supplier<Span> spanSupplier
) {
final Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
@@ -117,18 +123,29 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
}
}

private List<String> getRegionName(RegionLocations locs) {
List<String> names = new ArrayList<>();
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
names.add(loc.getRegion().getRegionNameAsString());
}
}
return names;
private static List<String> getRegionNames(RegionLocations locs) {
if (locs == null) { return Collections.emptyList(); }
if (locs.getRegionLocations() == null) { return Collections.emptyList(); }
return Arrays.stream(locs.getRegionLocations())
.filter(Objects::nonNull)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.collect(Collectors.toList());
}

private static List<String> getRegionNames(HRegionLocation location) {
return Optional.ofNullable(location)
.map(HRegionLocation::getRegion)
.map(RegionInfo::getRegionNameAsString)
.map(Collections::singletonList)
.orElseGet(Collections::emptyList);
}

CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder<>(conn)
.setName("AsyncRegionLocator.getRegionLocations")
.setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
@@ -138,11 +155,14 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
}, this::getRegionName, tableName, "getRegionLocations");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
final Supplier<Span> supplier = new TableSpanBuilder<>(conn)
.setName("AsyncRegionLocator.getRegionLocation")
.setTableName(tableName);
return tracedLocationFuture(() -> {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
@@ -173,8 +193,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[]
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
}, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
"getRegionLocation");
}, AsyncRegionLocator::getRegionNames, supplier);
}

CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
@@ -202,31 +221,38 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}

void clearCache(TableName tableName) {
Supplier<Span> supplier = new TableSpanBuilder<>(conn)
.setName("AsyncRegionLocator.clearCache")
.setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
}
}, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
}, supplier);
}

void clearCache(ServerName serverName) {
Supplier<Span> supplier = new ConnectionSpanBuilder<>(conn)
.setName("AsyncRegionLocator.clearCache")
.addAttribute(SERVER_NAME_KEY, serverName.getServerName());
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
serverName.getServerName()));
}, supplier);
}

void clearCache() {
Supplier<Span> supplier = new ConnectionSpanBuilder<>(conn)
.setName("AsyncRegionLocator.clearCache");
TraceUtil.trace(() -> {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}, "AsyncRegionLocator.clearCache");
}, supplier);
}

AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@
* Internal use only.
*/
@InterfaceAudience.Private
interface ConnectionRegistry extends Closeable {
public interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region(s).
@@ -48,6 +48,11 @@ interface ConnectionRegistry extends Closeable {
*/
CompletableFuture<ServerName> getActiveMaster();

/**
* Return the connection string associated with this registry instance.
*/
String getConnectionString();

/**
* Closes this instance and releases any system resources associated with it
*/
Original file line number Diff line number Diff line change
@@ -87,9 +87,12 @@ public static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unknow
return masterAddrs;
}

private final String connectionString;

MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
connectionString = getConnectionString(conf);
}

@Override
@@ -102,6 +105,15 @@ protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return getMasters();
}

@Override
public String getConnectionString() {
return connectionString;
}

static String getConnectionString(Configuration conf) throws UnknownHostException {
return getMasterAddr(conf);
}

/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
Loading

0 comments on commit fd1c668

Please sign in to comment.