Skip to content

Commit

Permalink
HBASE-26366 Provide meaningful parent spans to ZK interactions
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
ndimiduk authored Jun 10, 2022
1 parent 6b21d3f commit ed7e15d
Show file tree
Hide file tree
Showing 14 changed files with 812 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -144,23 +143,20 @@ protected String featureValueOf(SpanData item) {
};
}

public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
return new TypeSafeMatcher<SpanData>() {
@Override
protected boolean matchesSafely(SpanData item) {
final StatusData statusData = item.getStatus();
return statusData != null && statusData.getStatusCode() != null
&& matcher.matches(statusData.getStatusCode());
}

public static Matcher<SpanData> hasStatusWithCode(Matcher<StatusCode> matcher) {
return new FeatureMatcher<SpanData, StatusCode>(matcher, "SpanData with StatusCode that",
"statusWithCode") {
@Override
public void describeTo(Description description) {
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
protected StatusCode featureValueOf(SpanData actual) {
return actual.getStatus().getStatusCode();
}
};
}

public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
return hasStatusWithCode(is(equalTo(statusCode)));
}

public static Matcher<SpanData> hasTraceId(String traceId) {
return hasTraceId(is(equalTo(traceId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -163,8 +164,9 @@ public boolean scheduleChore(ScheduledChore chore) {
chore.getChoreService().cancelChore(chore);
}
chore.setChoreService(this);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
ScheduledFuture<?> future =
scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()),
chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ private static void endSpan(CompletableFuture<?> future, Span span) {
});
}

/**
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
*/
public static Runnable tracedRunnable(final Runnable runnable, final String spanName) {
return tracedRunnable(runnable, () -> createSpan(spanName));
}

/**
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
*/
public static Runnable tracedRunnable(final Runnable runnable,
final Supplier<Span> spanSupplier) {
// N.B. This method name follows the convention of this class, i.e., tracedFuture, rather than
// the convention of the OpenTelemetry classes, i.e., Context#wrap.
return () -> {
final Span span = spanSupplier.get();
try (final Scope ignored = span.makeCurrent()) {
runnable.run();
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
};
}

/**
* A {@link Runnable} that may also throw.
* @param <T> the type of {@link Throwable} that can be produced.
Expand All @@ -144,11 +169,17 @@ public interface ThrowingRunnable<T extends Throwable> {
void run() throws T;
}

/**
* Trace the execution of {@code runnable}.
*/
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
final String spanName) throws T {
trace(runnable, () -> createSpan(spanName));
}

/**
* Trace the execution of {@code runnable}.
*/
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
final Supplier<Span> spanSupplier) throws T {
Span span = spanSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;

import com.google.errorprone.annotations.RestrictedApi;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.lang.management.MemoryType;
import java.net.BindException;
Expand Down Expand Up @@ -57,6 +60,7 @@
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -231,60 +235,68 @@ protected final void initializeFileSystem() throws IOException {
!canUpdateTableDescriptor(), cacheTableDescriptor());
}

public HBaseServerBase(Configuration conf, String name)
throws ZooKeeperConnectionException, IOException {
public HBaseServerBase(Configuration conf, String name) throws IOException {
super(name); // thread name
this.conf = conf;
this.eventLoopGroupConfig =
NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
this.startcode = EnvironmentEdgeManager.currentTime();
this.userProvider = UserProvider.instantiate(conf);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.namedQueueRecorder = createNamedQueueRecord();
this.rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
InetSocketAddress addr = rpcServices.getSocketAddress();
String hostName = StringUtils.isBlank(useThisHostnameInstead)
? addr.getHostName()
: this.useThisHostnameInstead;
serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode);
// login the zookeeper client principal (if using security)
ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
// login the server principal (if using secure Hadoop)
login(userProvider, hostName);
// init superusers and add the server principal (if using security)
// or process owner as default super user.
Superusers.initialize(conf);
zooKeeper =
new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode());

this.configurationManager = new ConfigurationManager();
setupWindows(conf, configurationManager);

initializeFileSystem();

this.choreService = new ChoreService(getName(), true);
this.executorService = new ExecutorService(getName());

this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);

if (clusterMode()) {
if (
conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
csm = new ZkCoordinatedStateManager(this);
final Span span = TraceUtil.createSpan("HBaseServerBase.cxtor");
try (Scope ignored = span.makeCurrent()) {
this.conf = conf;
this.eventLoopGroupConfig =
NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup");
this.startcode = EnvironmentEdgeManager.currentTime();
this.userProvider = UserProvider.instantiate(conf);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
this.namedQueueRecorder = createNamedQueueRecord();
this.rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
InetSocketAddress addr = rpcServices.getSocketAddress();
String hostName = StringUtils.isBlank(useThisHostnameInstead)
? addr.getHostName()
: this.useThisHostnameInstead;
serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode);
// login the zookeeper client principal (if using security)
ZKAuthentication.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE,
HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName);
// login the server principal (if using secure Hadoop)
login(userProvider, hostName);
// init superusers and add the server principal (if using security)
// or process owner as default super user.
Superusers.initialize(conf);
zooKeeper =
new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode());

this.configurationManager = new ConfigurationManager();
setupWindows(conf, configurationManager);

initializeFileSystem();

this.choreService = new ChoreService(getName(), true);
this.executorService = new ExecutorService(getName());

this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);

if (clusterMode()) {
if (
conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
csm = new ZkCoordinatedStateManager(this);
} else {
csm = null;
}
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
} else {
csm = null;
clusterStatusTracker = null;
}
clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
clusterStatusTracker.start();
} else {
csm = null;
clusterStatusTracker = null;
putUpWebUI();
span.setStatus(StatusCode.OK);
} catch (Throwable t) {
TraceUtil.setError(span, t);
throw t;
} finally {
span.end();
}
putUpWebUI();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
Expand Down Expand Up @@ -101,41 +102,43 @@ public MetaRegionLocationCache(ZKWatcher zkWatcher) {
* @param retryCounter controls the number of retries and sleep between retries.
*/
private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
TraceUtil.trace(() -> {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
}
}
}
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to listen
// for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode : znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to
// listen for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode : znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
}, "MetaRegionLocationCache.loadMetaLocationsFromZk");
}

/**
Expand Down
Loading

0 comments on commit ed7e15d

Please sign in to comment.