Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split metastore timeout config for connect and read #19390

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/src/main/sphinx/connector/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ are also available. They are discussed later in this topic.
* - ``hive.metastore-refresh-max-threads``
- Maximum threads used to refresh cached metastore data.
- ``10``
* - ``hive.metastore-timeout``
- Timeout for Hive metastore requests.
- ``10s``
* - ``hive.hide-delta-lake-tables``
- Controls whether to hide Delta Lake tables in table listings. Currently
applies only when using the AWS Glue metastore.
Expand Down Expand Up @@ -113,6 +110,12 @@ properties:
- Hive metastore authentication type. Possible values are ``NONE`` or
``KERBEROS``.
- ``NONE``
* - ``hive.metastore.thrift.client.connect-timeout``
- Socket connect timeout for metastore client.
- ``10s``
* - ``hive.metastore.thrift.client.read-timeout``
- Socket read timeout for metastore client.
- ``10s``
* - ``hive.metastore.thrift.impersonation.enabled``
- Enable Hive metastore end user impersonation.
-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private DistributedQueryRunner createDeltaLakeQueryRunner()
.put("delta.metadata.live-files.cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("delta.register-table-procedure.enabled", "true")
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.putAll(deltaStorageConfiguration())
.buildOrThrow(),
hiveHadoop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static DistributedQueryRunner createS3DeltaLakeQueryRunner(
.put("s3.endpoint", minioAddress)
.put("s3.path-style-access", "true")
.put("s3.streaming.part-size", "5MB") // minimize memory usage
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.putAll(connectorProperties)
.buildOrThrow(),
testingHadoop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class DefaultThriftMetastoreClientFactory
{
private final Optional<SSLContext> sslContext;
private final Optional<HostAndPort> socksProxy;
private final int timeoutMillis;
private final int connectTimeoutMillis;
private final int readTimeoutMillis;
private final HiveMetastoreAuthentication metastoreAuthentication;
private final String hostname;

Expand All @@ -70,13 +71,15 @@ public class DefaultThriftMetastoreClientFactory
public DefaultThriftMetastoreClientFactory(
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
Duration timeout,
Duration connectTimeout,
Duration readTimeout,
HiveMetastoreAuthentication metastoreAuthentication,
String hostname)
{
this.sslContext = requireNonNull(sslContext, "sslContext is null");
this.socksProxy = requireNonNull(socksProxy, "socksProxy is null");
this.timeoutMillis = toIntExact(timeout.toMillis());
this.connectTimeoutMillis = toIntExact(connectTimeout.toMillis());
this.readTimeoutMillis = toIntExact(readTimeout.toMillis());
this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null");
this.hostname = requireNonNull(hostname, "hostname is null");
}
Expand All @@ -95,7 +98,8 @@ public DefaultThriftMetastoreClientFactory(
config.getTruststorePath(),
Optional.ofNullable(config.getTruststorePassword())),
Optional.ofNullable(config.getSocksProxy()),
config.getMetastoreTimeout(),
config.getConnectTimeout(),
config.getReadTimeout(),
metastoreAuthentication,
nodeManager.getCurrentNode().getHost());
}
Expand Down Expand Up @@ -126,7 +130,7 @@ protected ThriftMetastoreClient create(TransportSupplier transportSupplier, Stri
private TTransport createTransport(HostAndPort address, Optional<String> delegationToken)
throws TTransportException
{
return Transport.create(address, sslContext, socksProxy, timeoutMillis, metastoreAuthentication, delegationToken);
return Transport.create(address, sslContext, socksProxy, connectTimeoutMillis, readTimeoutMillis, metastoreAuthentication, delegationToken);
}

private static Optional<SSLContext> buildSslContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

public class ThriftMetastoreConfig
{
private Duration metastoreTimeout = new Duration(10, TimeUnit.SECONDS);
private Duration connectTimeout = new Duration(10, TimeUnit.SECONDS);
private Duration readTimeout = new Duration(10, TimeUnit.SECONDS);
private HostAndPort socksProxy;
private int maxRetries = RetryDriver.DEFAULT_MAX_ATTEMPTS - 1;
private double backoffScaleFactor = RetryDriver.DEFAULT_SCALE_FACTOR;
Expand All @@ -54,15 +55,32 @@ public class ThriftMetastoreConfig
private boolean batchMetadataFetchEnabled = true;

@NotNull
public Duration getMetastoreTimeout()
public Duration getConnectTimeout()
{
return metastoreTimeout;
return connectTimeout;
}

@Config("hive.metastore-timeout")
public ThriftMetastoreConfig setMetastoreTimeout(Duration metastoreTimeout)
@Config("hive.metastore.thrift.client.connect-timeout")
@LegacyConfig("hive.metastore-timeout")
@ConfigDescription("Socket connect timeout for metastore client")
public ThriftMetastoreConfig setConnectTimeout(Duration connectTimeout)
{
this.metastoreTimeout = metastoreTimeout;
this.connectTimeout = connectTimeout;
return this;
}

@NotNull
public Duration getReadTimeout()
{
return readTimeout;
}

@Config("hive.metastore.thrift.client.read-timeout")
@LegacyConfig("hive.metastore-timeout")
@ConfigDescription("Socket read timeout for metastore client")
public ThriftMetastoreConfig setReadTimeout(Duration readTimeout)
{
this.readTimeout = readTimeout;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ public static TTransport create(
HostAndPort address,
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
int timeoutMillis,
int connectTimeoutMillis,
int readTimeoutMillis,
HiveMetastoreAuthentication authentication,
Optional<String> delegationToken)
throws TTransportException
{
requireNonNull(address, "address is null");
try {
TTransport rawTransport = createRaw(address, sslContext, socksProxy, timeoutMillis);
TTransport rawTransport = createRaw(address, sslContext, socksProxy, connectTimeoutMillis, readTimeoutMillis);
TTransport authenticatedTransport = authentication.authenticate(rawTransport, address.getHost(), delegationToken);
if (!authenticatedTransport.isOpen()) {
authenticatedTransport.open();
Expand All @@ -57,7 +58,12 @@ public static TTransport create(

private Transport() {}

private static TTransport createRaw(HostAndPort address, Optional<SSLContext> sslContext, Optional<HostAndPort> socksProxy, int timeoutMillis)
private static TTransport createRaw(
HostAndPort address,
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
int connectTimeoutMillis,
int readTimeoutMillis)
throws TTransportException
{
Proxy proxy = socksProxy
Expand All @@ -66,8 +72,8 @@ private static TTransport createRaw(HostAndPort address, Optional<SSLContext> ss

Socket socket = new Socket(proxy);
try {
socket.connect(new InetSocketAddress(address.getHost(), address.getPort()), timeoutMillis);
socket.setSoTimeout(timeoutMillis);
socket.connect(new InetSocketAddress(address.getHost(), address.getPort()), connectTimeoutMillis);
socket.setSoTimeout(readTimeoutMillis);

if (sslContext.isPresent()) {
// SSL will connect to the SOCKS address when present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Path;
import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertDeprecatedEquivalence;
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
Expand All @@ -37,7 +38,8 @@ public class TestThriftMetastoreConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ThriftMetastoreConfig.class)
.setMetastoreTimeout(new Duration(10, SECONDS))
.setConnectTimeout(new Duration(10, SECONDS))
.setReadTimeout(new Duration(10, SECONDS))
.setSocksProxy(null)
.setMaxRetries(9)
.setBackoffScaleFactor(2.0)
Expand Down Expand Up @@ -68,7 +70,8 @@ public void testExplicitPropertyMappings()
Path truststoreFile = Files.createTempFile(null, null);

Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("hive.metastore-timeout", "20s")
.put("hive.metastore.thrift.client.connect-timeout", "22s")
.put("hive.metastore.thrift.client.read-timeout", "44s")
.put("hive.metastore.thrift.client.socks-proxy", "localhost:1234")
.put("hive.metastore.thrift.client.max-retries", "15")
.put("hive.metastore.thrift.client.backoff-scale-factor", "3.0")
Expand All @@ -92,7 +95,8 @@ public void testExplicitPropertyMappings()
.buildOrThrow();

ThriftMetastoreConfig expected = new ThriftMetastoreConfig()
.setMetastoreTimeout(new Duration(20, SECONDS))
.setConnectTimeout(new Duration(22, SECONDS))
.setReadTimeout(new Duration(44, SECONDS))
.setSocksProxy(HostAndPort.fromParts("localhost", 1234))
.setMaxRetries(15)
.setBackoffScaleFactor(3.0)
Expand All @@ -116,4 +120,18 @@ public void testExplicitPropertyMappings()

assertFullMapping(properties, expected);
}

@Test
public void testLegacyPropertyMappings()
{
assertDeprecatedEquivalence(
ThriftMetastoreConfig.class,
Map.of(
"hive.metastore.thrift.client.connect-timeout", "42s",
"hive.metastore.thrift.client.read-timeout", "42s",
"hive.metastore.thrift.impersonation.enabled", "true"),
Map.of(
"hive.metastore-timeout", "42s",
"hive.metastore.impersonation-enabled", "true"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public TestingTokenAwareMetastoreClientFactory(Optional<HostAndPort> socksProxy,

public TestingTokenAwareMetastoreClientFactory(Optional<HostAndPort> socksProxy, HostAndPort address, Duration timeout)
{
this.factory = new DefaultThriftMetastoreClientFactory(Optional.empty(), socksProxy, timeout, AUTHENTICATION, "localhost");
this.factory = new DefaultThriftMetastoreClientFactory(Optional.empty(), socksProxy, timeout, timeout, AUTHENTICATION, "localhost");
this.address = requireNonNull(address, "address is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "HIVE_METASTORE")
.put("hive.metastore.uri", "thrift://" + hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("fs.hadoop.enabled", "false")
.put("fs.native-s3.enabled", "true")
.put("s3.aws-access-key", MINIO_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "HIVE_METASTORE")
.put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint())
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.azure.abfs-storage-account", account)
.put("hive.azure.abfs-access-key", accessKey)
.put("iceberg.register-table-procedure.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
ThriftMetastore thriftMetastore = testingThriftHiveMetastoreBuilder()
.thriftMetastoreConfig(new ThriftMetastoreConfig()
// Read timed out sometimes happens with the default timeout
.setMetastoreTimeout(new Duration(1, MINUTES)))
.setReadTimeout(new Duration(1, MINUTES)))
.metastoreClient(dataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build();
CachingHiveMetastore metastore = memoizeMetastore(new BridgingHiveMetastore(thriftMetastore), 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class TestHiveMetastoreClientFactory
Optional.empty(),
Optional.empty(),
new Duration(10, SECONDS),
new Duration(10, SECONDS),
new NoHiveMetastoreAuthentication(),
"localhost");

Expand Down
Loading