Skip to content

Commit

Permalink
Make use of hive temp staging directory configurable
Browse files Browse the repository at this point in the history
Presto for some file systems that wrap S3, should behave in the same way
as it does for S3.
  • Loading branch information
kokosing committed Jan 30, 2019
1 parent fa65182 commit 68393b0
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public class HiveClientConfig
private boolean s3SelectPushdownEnabled;
private int s3SelectPushdownMaxConnections = 500;

private boolean isTemporaryStagingDirectoryEnabled = true;

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -1190,4 +1192,17 @@ public HiveClientConfig setS3SelectPushdownMaxConnections(int s3SelectPushdownMa
this.s3SelectPushdownMaxConnections = s3SelectPushdownMaxConnections;
return this;
}

@Config("hive.temporary-staging-directory-enabled")
@ConfigDescription("Should use (if possible) temporary staging directory for write operations")
public HiveClientConfig setTemporaryStagingDirectoryEnabled(boolean temporaryStagingDirectoryEnabled)
{
this.isTemporaryStagingDirectoryEnabled = temporaryStagingDirectoryEnabled;
return this;
}

public boolean isTemporaryStagingDirectoryEnabled()
{
return isTemporaryStagingDirectoryEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;

import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static io.prestosql.plugin.hive.HiveSessionProperties.isTemporaryStagingDirectoryEnabled;
import static io.prestosql.plugin.hive.HiveWriteUtils.createTemporaryPath;
import static io.prestosql.plugin.hive.HiveWriteUtils.getTableDefaultLocation;
import static io.prestosql.plugin.hive.HiveWriteUtils.isS3FileSystem;
Expand Down Expand Up @@ -59,7 +60,7 @@ public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore, Conn
throw new PrestoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath));
}

if (shouldUseTemporaryDirectory(context, targetPath)) {
if (shouldUseTemporaryDirectory(session, context, targetPath)) {
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
return new LocationHandle(targetPath, writePath, false, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
Expand All @@ -74,7 +75,7 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName());
Path targetPath = new Path(table.getStorage().getLocation());

if (shouldUseTemporaryDirectory(context, targetPath)) {
if (shouldUseTemporaryDirectory(session, context, targetPath)) {
Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath);
return new LocationHandle(targetPath, writePath, true, STAGE_AND_MOVE_TO_TARGET_DIRECTORY);
}
Expand All @@ -83,10 +84,11 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore,
}
}

private boolean shouldUseTemporaryDirectory(HdfsContext context, Path path)
private boolean shouldUseTemporaryDirectory(ConnectorSession session, HdfsContext context, Path path)
{
// skip using temporary directory for S3
return !isS3FileSystem(context, hdfsEnvironment, path);
return isTemporaryStagingDirectoryEnabled(session)
// skip using temporary directory for S3
&& !isS3FileSystem(context, hdfsEnvironment, path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class HiveSessionProperties
private static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -296,6 +297,11 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
S3_SELECT_PUSHDOWN_ENABLED,
"S3 Select pushdown enabled",
hiveClientConfig.isS3SelectPushdownEnabled(),
false),
booleanProperty(
TEMPORARY_STAGING_DIRECTORY_ENABLED,
"Should use temporary staging directory for write operations",
hiveClientConfig.isTemporaryStagingDirectoryEnabled(),
false));
}

Expand Down Expand Up @@ -495,6 +501,11 @@ public static boolean isOptimizedMismatchedBucketCount(ConnectorSession session)
return session.getProperty(OPTIMIZE_MISMATCHED_BUCKET_COUNT, Boolean.class);
}

public static boolean isTemporaryStagingDirectoryEnabled(ConnectorSession session)
{
return session.getProperty(TEMPORARY_STAGING_DIRECTORY_ENABLED, Boolean.class);
}

public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, String description, DataSize defaultValue, boolean hidden)
{
return new PropertyMetadata<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ public WriteMode getJsonSerializableWriteMode()
public enum WriteMode
{
/**
* common mode for new table or existing table (both new and existing partition)
* common mode for new table or existing table (both new and existing partition) and when staging directory is enabled
*/
STAGE_AND_MOVE_TO_TARGET_DIRECTORY(false),
/**
* for new table in S3
* for new table in S3 or when staging directory is disabled
*/
DIRECT_TO_TARGET_NEW_DIRECTORY(true),
/**
* for existing table in S3 (both new and existing partition)
* for existing table in S3 (both new and existing partition) or when staging directory is disabled
*/
DIRECT_TO_TARGET_EXISTING_DIRECTORY(true),
/**/;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public void testDefaults()
.setCollectColumnStatisticsOnWrite(false)
.setCollectColumnStatisticsOnWrite(false)
.setS3SelectPushdownEnabled(false)
.setS3SelectPushdownMaxConnections(500));
.setS3SelectPushdownMaxConnections(500)
.setTemporaryStagingDirectoryEnabled(true));
}

@Test
Expand Down Expand Up @@ -199,6 +200,7 @@ public void testExplicitPropertyMappings()
.put("hive.collect-column-statistics-on-write", "true")
.put("hive.s3select-pushdown.enabled", "true")
.put("hive.s3select-pushdown.max-connections", "1234")
.put("hive.temporary-staging-directory-enabled", "false")
.build();

HiveClientConfig expected = new HiveClientConfig()
Expand Down Expand Up @@ -279,7 +281,8 @@ public void testExplicitPropertyMappings()
.setCollectColumnStatisticsOnWrite(true)
.setCollectColumnStatisticsOnWrite(true)
.setS3SelectPushdownEnabled(true)
.setS3SelectPushdownMaxConnections(1234);
.setS3SelectPushdownMaxConnections(1234)
.setTemporaryStagingDirectoryEnabled(false);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.Session;
import io.prestosql.connector.ConnectorId;
import io.prestosql.cost.StatsAndCosts;
import io.prestosql.metadata.InsertTableHandle;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.TableHandle;
Expand Down Expand Up @@ -3550,6 +3551,37 @@ public void testPrunePartitionFailure()
assertUpdate("DROP TABLE test_prune_failure");
}

@Test
public void testTemporaryStagingDirectorySessionProperty()
{
String tableName = "test_temporary_staging_directory_session_property";
assertUpdate(format("CREATE TABLE %s(i int)", tableName));

Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "temporary_staging_directory_enabled", "false")
.build();

HiveInsertTableHandle hiveInsertTableHandle = getHiveInsertTableHandle(session, tableName);
assertEquals(hiveInsertTableHandle.getLocationHandle().getWritePath(), hiveInsertTableHandle.getLocationHandle().getTargetPath());

assertUpdate("DROP TABLE " + tableName);
}

private HiveInsertTableHandle getHiveInsertTableHandle(Session session, String tableName)
{
Metadata metadata = ((DistributedQueryRunner) getQueryRunner()).getCoordinator().getMetadata();
return transaction(getQueryRunner().getTransactionManager(), getQueryRunner().getAccessControl())
.execute(session, transactionSession -> {
QualifiedObjectName objectName = new QualifiedObjectName(catalog, TPCH_SCHEMA, tableName);
Optional<TableHandle> handle = metadata.getTableHandle(transactionSession, objectName);
InsertTableHandle insertTableHandle = metadata.beginInsert(transactionSession, handle.get());
HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) insertTableHandle.getConnectorHandle();

metadata.finishInsert(transactionSession, insertTableHandle, ImmutableList.of(), ImmutableList.of());
return hiveInsertTableHandle;
});
}

private Session getParallelWriteSession()
{
return Session.builder(getSession())
Expand Down

0 comments on commit 68393b0

Please sign in to comment.