Skip to content

Commit

Permalink
add retry and jmx means for the Thrift's delegation token generation
Browse files Browse the repository at this point in the history
  • Loading branch information
osscm authored and mmalhotra committed Oct 17, 2024
1 parent 6b97209 commit 3202480
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public ThriftMetastore createMetastore(Optional<ConnectorIdentity> identity)
fileSystemFactory,
metastoreClientFactory,
RetryDriver.DEFAULT_SCALE_FACTOR,
RetryDriver.DEFAULT_SLEEP_TIME,
RetryDriver.DEFAULT_SLEEP_TIME,
RetryDriver.DEFAULT_MIN_BACKOFF_DELAY,
RetryDriver.DEFAULT_MAX_BACKOFF_DELAY,
RetryDriver.DEFAULT_MAX_RETRY_TIME,
new Duration(10, MINUTES),
RetryDriver.DEFAULT_MAX_ATTEMPTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreAuthenticationConfig.ThriftMetastoreAuthenticationType.KERBEROS;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class ThriftMetastoreAuthenticationModule
extends AbstractConfigurationAwareModule
Expand All @@ -36,6 +37,8 @@ protected void setup(Binder binder)
{
newOptionalBinder(binder, IdentityAwareMetastoreClientFactory.class)
.setDefault().to(UgiBasedMetastoreClientFactory.class).in(SINGLETON);
newExporter(binder).export(IdentityAwareMetastoreClientFactory.class)
.as(generator -> generator.generatedNameOf(ThriftMetastoreStats.class));
newOptionalBinder(binder, HiveMetastoreAuthentication.class)
.setDefault().to(NoHiveMetastoreAuthentication.class).in(SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public class ThriftMetastoreConfig
private HostAndPort socksProxy;
private int maxRetries = RetryDriver.DEFAULT_MAX_ATTEMPTS - 1;
private double backoffScaleFactor = RetryDriver.DEFAULT_SCALE_FACTOR;
private Duration minBackoffDelay = RetryDriver.DEFAULT_SLEEP_TIME;
private Duration maxBackoffDelay = RetryDriver.DEFAULT_SLEEP_TIME;
private Duration minBackoffDelay = RetryDriver.DEFAULT_MIN_BACKOFF_DELAY;
private Duration maxBackoffDelay = RetryDriver.DEFAULT_MAX_BACKOFF_DELAY;
private Duration maxRetryTime = RetryDriver.DEFAULT_MAX_RETRY_TIME;
private boolean impersonationEnabled;
private boolean useSparkTableStatisticsFallback = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ThriftMetastoreStats
private final ThriftMetastoreApiStats createFunction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats alterFunction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats dropFunction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats getThriftDelegationToken = new ThriftMetastoreApiStats();

@Managed
@Nested
Expand Down Expand Up @@ -426,4 +427,11 @@ public ThriftMetastoreApiStats getDropFunction()
{
return dropFunction;
}

@Managed
@Nested
public ThriftMetastoreApiStats getThriftDelegationToken()
{
return getThriftDelegationToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.inject.Inject;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedSupplier;
import io.trino.cache.NonEvictableLoadingCache;
import io.trino.plugin.base.security.UserNameProvider;
import io.trino.plugin.hive.ForHiveMetastore;
Expand All @@ -28,8 +32,10 @@
import java.util.Optional;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand All @@ -41,6 +47,8 @@ public class TokenFetchingMetastoreClientFactory
private final boolean impersonationEnabled;
private final NonEvictableLoadingCache<String, DelegationToken> delegationTokenCache;
private final long refreshPeriod;
private final ThriftMetastoreStats stats = new ThriftMetastoreStats();
private final RetryPolicy retryPolicy;

@Inject
public TokenFetchingMetastoreClientFactory(
Expand All @@ -58,6 +66,12 @@ public TokenFetchingMetastoreClientFactory(
.maximumSize(thriftConfig.getDelegationTokenCacheMaximumSize()),
CacheLoader.from(this::loadDelegationToken));
this.refreshPeriod = Duration.ofMinutes(1).toNanos();
retryPolicy = RetryPolicy.builder()
.withMaxDuration(thriftConfig.getMaxRetryTime().toJavaTime())
.withMaxAttempts(thriftConfig.getMaxRetries() + 1)
.withBackoff(thriftConfig.getMinBackoffDelay().toMillis(), thriftConfig.getMaxBackoffDelay().toMillis(), MILLIS, thriftConfig.getBackoffScaleFactor())
.abortOn(TException.class)
.build();
}

private ThriftMetastoreClient createMetastoreClient()
Expand Down Expand Up @@ -106,11 +120,21 @@ private DelegationToken getDelegationToken(String username)

private DelegationToken loadDelegationToken(String username)
{
try (ThriftMetastoreClient client = createMetastoreClient()) {
return new DelegationToken(System.nanoTime(), client.getDelegationToken(username));
try {
// added retry and stats for the thrift delegation token
return (DelegationToken) Failsafe.with(retryPolicy).get((CheckedSupplier<DelegationToken>) () ->
stats.getThriftDelegationToken().wrap(() -> {
try (ThriftMetastoreClient client = createMetastoreClient()) {
return new DelegationToken(System.nanoTime(), client.getDelegationToken(username));
}
}
).call());
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
catch (FailsafeException e) {
if (e.getCause() instanceof TException) {
throw new TrinoException(HIVE_METASTORE_ERROR, e.getCause());
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class RetryDriver
{
private static final Logger log = Logger.get(RetryDriver.class);
public static final int DEFAULT_MAX_ATTEMPTS = 10;
public static final Duration DEFAULT_SLEEP_TIME = new Duration(1, SECONDS);
public static final Duration DEFAULT_MIN_BACKOFF_DELAY = new Duration(1, SECONDS);
public static final Duration DEFAULT_MAX_BACKOFF_DELAY = new Duration(2, SECONDS);
public static final Duration DEFAULT_MAX_RETRY_TIME = new Duration(30, SECONDS);
public static final double DEFAULT_SCALE_FACTOR = 2.0;

Expand Down Expand Up @@ -61,8 +62,8 @@ private RetryDriver(
private RetryDriver()
{
this(DEFAULT_MAX_ATTEMPTS,
DEFAULT_SLEEP_TIME,
DEFAULT_SLEEP_TIME,
DEFAULT_MIN_BACKOFF_DELAY,
DEFAULT_MAX_BACKOFF_DELAY,
DEFAULT_SCALE_FACTOR,
DEFAULT_MAX_RETRY_TIME,
ImmutableList.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testDefaults()
.setMaxRetries(9)
.setBackoffScaleFactor(2.0)
.setMinBackoffDelay(new Duration(1, SECONDS))
.setMaxBackoffDelay(new Duration(1, SECONDS))
.setMaxBackoffDelay(new Duration(2, SECONDS))
.setMaxRetryTime(new Duration(30, SECONDS))
.setTlsEnabled(false)
.setKeystorePath(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public void testJmxTablesExposedByDeltaLakeConnectorBackedByThriftMetastore()
row("io.trino.plugin.hive.metastore.thrift:name=delta,type=thrifthivemetastore"),
row("io.trino.plugin.hive:catalog=delta,name=delta,type=fileformatdatasourcestats"),
row("trino.plugin.deltalake.metastore:catalog=delta,name=delta,type=deltalaketablemetadatascheduler"),
row("trino.plugin.deltalake.transactionlog:catalog=delta,name=delta,type=transactionlogaccess"));
row("trino.plugin.deltalake.transactionlog:catalog=delta,name=delta,type=transactionlogaccess"),
row("trino.plugin.deltalake.transactionlog:catalog=delta,name=delta,type=transactionlogaccess"),
row("io.trino.plugin.hive.metastore.thrift:name=delta,type=thriftmetastorestats"));
}
}

0 comments on commit 3202480

Please sign in to comment.