Skip to content

Commit

Permalink
Split metastore timeout config for connect and read
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Oct 23, 2023
1 parent 90f3ba1 commit 5c5c6e6
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 28 deletions.
9 changes: 6 additions & 3 deletions docs/src/main/sphinx/connector/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ are also available. They are discussed later in this topic.
* - ``hive.metastore-refresh-max-threads``
- Maximum threads used to refresh cached metastore data.
- ``10``
* - ``hive.metastore-timeout``
- Timeout for Hive metastore requests.
- ``10s``
* - ``hive.hide-delta-lake-tables``
- Controls whether to hide Delta Lake tables in table listings. Currently
applies only when using the AWS Glue metastore.
Expand Down Expand Up @@ -113,6 +110,12 @@ properties:
- Hive metastore authentication type. Possible values are ``NONE`` or
``KERBEROS``.
- ``NONE``
* - ``hive.metastore.thrift.client.connect-timeout``
- Socket connect timeout for metastore client.
- ``10s``
* - ``hive.metastore.thrift.client.read-timeout``
- Socket read timeout for metastore client.
- ``10s``
* - ``hive.metastore.thrift.impersonation.enabled``
- Enable Hive metastore end user impersonation.
-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private DistributedQueryRunner createDeltaLakeQueryRunner()
.put("delta.metadata.live-files.cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("hive.metastore-cache-ttl", TEST_METADATA_CACHE_TTL_SECONDS + "s")
.put("delta.register-table-procedure.enabled", "true")
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.putAll(deltaStorageConfiguration())
.buildOrThrow(),
hiveHadoop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static DistributedQueryRunner createS3DeltaLakeQueryRunner(
.put("s3.endpoint", minioAddress)
.put("s3.path-style-access", "true")
.put("s3.streaming.part-size", "5MB") // minimize memory usage
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.putAll(connectorProperties)
.buildOrThrow(),
testingHadoop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class DefaultThriftMetastoreClientFactory
{
private final Optional<SSLContext> sslContext;
private final Optional<HostAndPort> socksProxy;
private final int timeoutMillis;
private final int connectTimeoutMillis;
private final int readTimeoutMillis;
private final HiveMetastoreAuthentication metastoreAuthentication;
private final String hostname;

Expand All @@ -70,13 +71,15 @@ public class DefaultThriftMetastoreClientFactory
public DefaultThriftMetastoreClientFactory(
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
Duration timeout,
Duration connectTimeout,
Duration readTimeout,
HiveMetastoreAuthentication metastoreAuthentication,
String hostname)
{
this.sslContext = requireNonNull(sslContext, "sslContext is null");
this.socksProxy = requireNonNull(socksProxy, "socksProxy is null");
this.timeoutMillis = toIntExact(timeout.toMillis());
this.connectTimeoutMillis = toIntExact(connectTimeout.toMillis());
this.readTimeoutMillis = toIntExact(readTimeout.toMillis());
this.metastoreAuthentication = requireNonNull(metastoreAuthentication, "metastoreAuthentication is null");
this.hostname = requireNonNull(hostname, "hostname is null");
}
Expand All @@ -95,7 +98,8 @@ public DefaultThriftMetastoreClientFactory(
config.getTruststorePath(),
Optional.ofNullable(config.getTruststorePassword())),
Optional.ofNullable(config.getSocksProxy()),
config.getMetastoreTimeout(),
config.getConnectTimeout(),
config.getReadTimeout(),
metastoreAuthentication,
nodeManager.getCurrentNode().getHost());
}
Expand Down Expand Up @@ -126,7 +130,7 @@ protected ThriftMetastoreClient create(TransportSupplier transportSupplier, Stri
private TTransport createTransport(HostAndPort address, Optional<String> delegationToken)
throws TTransportException
{
return Transport.create(address, sslContext, socksProxy, timeoutMillis, metastoreAuthentication, delegationToken);
return Transport.create(address, sslContext, socksProxy, connectTimeoutMillis, readTimeoutMillis, metastoreAuthentication, delegationToken);
}

private static Optional<SSLContext> buildSslContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

public class ThriftMetastoreConfig
{
private Duration metastoreTimeout = new Duration(10, TimeUnit.SECONDS);
private Duration connectTimeout = new Duration(10, TimeUnit.SECONDS);
private Duration readTimeout = new Duration(10, TimeUnit.SECONDS);
private HostAndPort socksProxy;
private int maxRetries = RetryDriver.DEFAULT_MAX_ATTEMPTS - 1;
private double backoffScaleFactor = RetryDriver.DEFAULT_SCALE_FACTOR;
Expand All @@ -54,15 +55,32 @@ public class ThriftMetastoreConfig
private boolean batchMetadataFetchEnabled = true;

@NotNull
public Duration getMetastoreTimeout()
public Duration getConnectTimeout()
{
return metastoreTimeout;
return connectTimeout;
}

@Config("hive.metastore-timeout")
public ThriftMetastoreConfig setMetastoreTimeout(Duration metastoreTimeout)
@Config("hive.metastore.thrift.client.connect-timeout")
@LegacyConfig("hive.metastore-timeout")
@ConfigDescription("Socket connect timeout for metastore client")
public ThriftMetastoreConfig setConnectTimeout(Duration connectTimeout)
{
this.metastoreTimeout = metastoreTimeout;
this.connectTimeout = connectTimeout;
return this;
}

@NotNull
public Duration getReadTimeout()
{
return readTimeout;
}

@Config("hive.metastore.thrift.client.read-timeout")
@LegacyConfig("hive.metastore-timeout")
@ConfigDescription("Socket read timeout for metastore client")
public ThriftMetastoreConfig setReadTimeout(Duration readTimeout)
{
this.readTimeout = readTimeout;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ public static TTransport create(
HostAndPort address,
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
int timeoutMillis,
int connectTimeoutMillis,
int readTimeoutMillis,
HiveMetastoreAuthentication authentication,
Optional<String> delegationToken)
throws TTransportException
{
requireNonNull(address, "address is null");
try {
TTransport rawTransport = createRaw(address, sslContext, socksProxy, timeoutMillis);
TTransport rawTransport = createRaw(address, sslContext, socksProxy, connectTimeoutMillis, readTimeoutMillis);
TTransport authenticatedTransport = authentication.authenticate(rawTransport, address.getHost(), delegationToken);
if (!authenticatedTransport.isOpen()) {
authenticatedTransport.open();
Expand All @@ -57,7 +58,12 @@ public static TTransport create(

private Transport() {}

private static TTransport createRaw(HostAndPort address, Optional<SSLContext> sslContext, Optional<HostAndPort> socksProxy, int timeoutMillis)
private static TTransport createRaw(
HostAndPort address,
Optional<SSLContext> sslContext,
Optional<HostAndPort> socksProxy,
int connectTimeoutMillis,
int readTimeoutMillis)
throws TTransportException
{
Proxy proxy = socksProxy
Expand All @@ -66,8 +72,8 @@ private static TTransport createRaw(HostAndPort address, Optional<SSLContext> ss

Socket socket = new Socket(proxy);
try {
socket.connect(new InetSocketAddress(address.getHost(), address.getPort()), timeoutMillis);
socket.setSoTimeout(timeoutMillis);
socket.connect(new InetSocketAddress(address.getHost(), address.getPort()), connectTimeoutMillis);
socket.setSoTimeout(readTimeoutMillis);

if (sslContext.isPresent()) {
// SSL will connect to the SOCKS address when present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Path;
import java.util.Map;

import static io.airlift.configuration.testing.ConfigAssertions.assertDeprecatedEquivalence;
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
Expand All @@ -37,7 +38,8 @@ public class TestThriftMetastoreConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ThriftMetastoreConfig.class)
.setMetastoreTimeout(new Duration(10, SECONDS))
.setConnectTimeout(new Duration(10, SECONDS))
.setReadTimeout(new Duration(10, SECONDS))
.setSocksProxy(null)
.setMaxRetries(9)
.setBackoffScaleFactor(2.0)
Expand Down Expand Up @@ -68,7 +70,8 @@ public void testExplicitPropertyMappings()
Path truststoreFile = Files.createTempFile(null, null);

Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("hive.metastore-timeout", "20s")
.put("hive.metastore.thrift.client.connect-timeout", "22s")
.put("hive.metastore.thrift.client.read-timeout", "44s")
.put("hive.metastore.thrift.client.socks-proxy", "localhost:1234")
.put("hive.metastore.thrift.client.max-retries", "15")
.put("hive.metastore.thrift.client.backoff-scale-factor", "3.0")
Expand All @@ -92,7 +95,8 @@ public void testExplicitPropertyMappings()
.buildOrThrow();

ThriftMetastoreConfig expected = new ThriftMetastoreConfig()
.setMetastoreTimeout(new Duration(20, SECONDS))
.setConnectTimeout(new Duration(22, SECONDS))
.setReadTimeout(new Duration(44, SECONDS))
.setSocksProxy(HostAndPort.fromParts("localhost", 1234))
.setMaxRetries(15)
.setBackoffScaleFactor(3.0)
Expand All @@ -116,4 +120,18 @@ public void testExplicitPropertyMappings()

assertFullMapping(properties, expected);
}

@Test
public void testLegacyPropertyMappings()
{
assertDeprecatedEquivalence(
ThriftMetastoreConfig.class,
Map.of(
"hive.metastore.thrift.client.connect-timeout", "42s",
"hive.metastore.thrift.client.read-timeout", "42s",
"hive.metastore.thrift.impersonation.enabled", "true"),
Map.of(
"hive.metastore-timeout", "42s",
"hive.metastore.impersonation-enabled", "true"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public TestingTokenAwareMetastoreClientFactory(Optional<HostAndPort> socksProxy,

public TestingTokenAwareMetastoreClientFactory(Optional<HostAndPort> socksProxy, HostAndPort address, Duration timeout)
{
this.factory = new DefaultThriftMetastoreClientFactory(Optional.empty(), socksProxy, timeout, AUTHENTICATION, "localhost");
this.factory = new DefaultThriftMetastoreClientFactory(Optional.empty(), socksProxy, timeout, timeout, AUTHENTICATION, "localhost");
this.address = requireNonNull(address, "address is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "HIVE_METASTORE")
.put("hive.metastore.uri", "thrift://" + hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("fs.hadoop.enabled", "false")
.put("fs.native-s3.enabled", "true")
.put("s3.aws-access-key", MINIO_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "HIVE_METASTORE")
.put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint())
.put("hive.metastore-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.metastore.thrift.client.read-timeout", "1m") // read timed out sometimes happens with the default timeout
.put("hive.azure.abfs-storage-account", account)
.put("hive.azure.abfs-access-key", accessKey)
.put("iceberg.register-table-procedure.enabled", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
ThriftMetastore thriftMetastore = testingThriftHiveMetastoreBuilder()
.thriftMetastoreConfig(new ThriftMetastoreConfig()
// Read timed out sometimes happens with the default timeout
.setMetastoreTimeout(new Duration(1, MINUTES)))
.setReadTimeout(new Duration(1, MINUTES)))
.metastoreClient(dataLake.getHiveHadoop().getHiveMetastoreEndpoint())
.build();
CachingHiveMetastore metastore = memoizeMetastore(new BridgingHiveMetastore(thriftMetastore), 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class TestHiveMetastoreClientFactory
Optional.empty(),
Optional.empty(),
new Duration(10, SECONDS),
new Duration(10, SECONDS),
new NoHiveMetastoreAuthentication(),
"localhost");

Expand Down

0 comments on commit 5c5c6e6

Please sign in to comment.