Skip to content

Commit

Permalink
Add option to disable filesystem caching of /_delta_log/ directory
Browse files Browse the repository at this point in the history
  • Loading branch information
sdaberdaku committed Sep 23, 2024
1 parent 59d0181 commit 18211a4
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 76 deletions.
19 changes: 19 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -1224,3 +1224,22 @@ keep a backup of the original values if you change them.

The connector supports configuring and using [file system
caching](/object-storage/file-system-cache).

The following table describes file system cache properties specific to
the Delta Lake connector.

:::{list-table} Delta Lake file system cache configuration properties
:widths: 30, 50, 20
:header-rows: 1

* - Property name
- Description
- Default
* - `delta.fs.cache.disable-transaction-log-caching`
- Set to `true` to disable caching of the `_delta_log` directory of
Delta Tables. This is useful in those cases when Delta Tables are
destroyed and recreated, and the files inside the transaction log
directory get overwritten and cannot be safely cached. Effective
only when `fs.cache.enabled=true`.
- `false`
:::
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DeltaLakeConfig
private boolean projectionPushdownEnabled = true;
private boolean queryPartitionFilterRequired;
private boolean deletionVectorsEnabled;
private boolean deltaLogFileSystemCacheDisabled;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -552,4 +553,17 @@ public DeltaLakeConfig setDeletionVectorsEnabled(boolean deletionVectorsEnabled)
this.deletionVectorsEnabled = deletionVectorsEnabled;
return this;
}

public boolean isDeltaLogFileSystemCacheDisabled()
{
return deltaLogFileSystemCacheDisabled;
}

@Config("delta.fs.cache.disable-transaction-log-caching")
@ConfigDescription("Disable filesystem caching of the _delta_log directory (effective only when fs.cache.enabled=true)")
public DeltaLakeConfig setDeltaLogFileSystemCacheDisabled(boolean deltaLogFileSystemCacheDisabled)
{
this.deltaLogFileSystemCacheDisabled = deltaLogFileSystemCacheDisabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,42 @@
*/
package io.trino.plugin.deltalake.cache;

import com.google.inject.Inject;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.plugin.deltalake.DeltaLakeConfig;

import java.util.Optional;

import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STARBURST_META_DIR;
import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STATISTICS_META_DIR;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;

public class DeltaLakeCacheKeyProvider
implements CacheKeyProvider
{
private final boolean deltaLogFileSystemCacheDisabled;

@Inject
public DeltaLakeCacheKeyProvider(DeltaLakeConfig deltaLakeConfig)
{
// Disabling the delta log folder caching is useful in those scenarios when Delta Tables are deleted and re-created,
// and caching their _delta_log directories should be avoided.
this.deltaLogFileSystemCacheDisabled = deltaLakeConfig.isDeltaLogFileSystemCacheDisabled();
}

/**
* Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable.
*/
@Override
public Optional<String> getCacheKey(TrinoInputFile inputFile)
{
String path = inputFile.location().path();
// Explicitly exclude the files in the _delta_log directory when deltaLogFileSystemCacheDisabled is set to true,
// as they can change when the Delta Table is overwritten, https://github.com/trinodb/trino/issues/21451
if (deltaLogFileSystemCacheDisabled && path.contains("/" + TRANSACTION_LOG_DIRECTORY + "/")) {
return Optional.empty();
}
if (path.endsWith(".trinoSchema") || path.contains("/.trinoPermissions/")) {
// Needed to avoid caching files from FileHiveMetastore on coordinator during tests
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.trace.data.SpanData;

import java.util.OptionalLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.CacheOperation;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE;
import static java.util.Objects.requireNonNull;

public final class DeltaLakeAlluxioCacheTestUtils
{
private static final Pattern dataFilePattern = Pattern.compile(".*?/(?<partition>((\\w+)=[^/]*/)*)(?<queryId>\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?<uuid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");

private DeltaLakeAlluxioCacheTestUtils()
{}

public static CacheOperation createCacheOperation(SpanData span)
{
String operationName = span.getName();
Attributes attributes = span.getAttributes();
String path = getFileLocation(span);
String fileName = path.replaceFirst(".*/", "");

OptionalLong position = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_POSITION)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_POSITION)));
default -> OptionalLong.empty();
};

OptionalLong length = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_SIZE)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_SIZE)));
default -> OptionalLong.empty();
};

if (!path.contains("_delta_log") && !path.contains("/.trino")) {
Matcher matcher = dataFilePattern.matcher(path);
if (matcher.matches()) {
String changeData = path.contains("/_change_data/") ? "change_data/" : "";
if (!path.contains("=")) {
return new CacheOperation(operationName, "data", position, length);
}
return new CacheOperation(operationName, changeData + matcher.group("partition"), position, length);
}
if (path.contains("/part-00000-")) {
return new CacheOperation(operationName, "data", position, length);
}
if (path.contains("/deletion_vector_")) {
return new CacheOperation(operationName, "deletion_vector", position, length);
}
}
else {
return new CacheOperation(operationName, fileName, position, length);
}
throw new IllegalArgumentException("File not recognized: " + path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import com.google.common.io.Resources;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import org.intellij.lang.annotations.Language;
Expand All @@ -31,26 +29,15 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.OptionalLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.CacheOperation;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toCollection;

@Execution(ExecutionMode.SAME_THREAD)
Expand Down Expand Up @@ -88,6 +75,22 @@ private void registerTable(String name, String resourcePath)
getQueryRunner().execute(format("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')", name, dataPath));
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<CacheOperation> expectedCacheAccesses)
{
assertUpdate("CALL system.flush_metadata_cache()");
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
assertMultisetsEqual(getCacheOperations(), expectedCacheAccesses);
}

private Multiset<CacheOperation> getCacheOperations()
{
return getCacheOperationSpans(getQueryRunner())
.stream()
.map(DeltaLakeAlluxioCacheTestUtils::createCacheOperation)
.collect(toCollection(HashMultiset::create));
}

@Test
public void testCacheFileOperations()
{
Expand Down Expand Up @@ -544,65 +547,4 @@ public void testCreateOrReplaceTableAsSelect()

assertUpdate("DROP TABLE test_create_or_replace_as_select");
}

private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<CacheOperation> expectedCacheAccesses)
{
assertUpdate("CALL system.flush_metadata_cache()");
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
assertMultisetsEqual(getCacheOperations(), expectedCacheAccesses);
}

private Multiset<CacheOperation> getCacheOperations()
{
return getCacheOperationSpans(getQueryRunner())
.stream()
.map(TestDeltaLakeAlluxioCacheFileOperations::createCacheOperation)
.collect(toCollection(HashMultiset::create));
}

private static Pattern dataFilePattern = Pattern.compile(".*?/(?<partition>((\\w+)=[^/]*/)*)(?<queryId>\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?<uuid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");

private static CacheOperation createCacheOperation(SpanData span)
{
String operationName = span.getName();
Attributes attributes = span.getAttributes();
String path = getFileLocation(span);
String fileName = path.replaceFirst(".*/", "");

OptionalLong position = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_POSITION)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_POSITION)));
default -> OptionalLong.empty();
};

OptionalLong length = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_SIZE)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_SIZE)));
default -> OptionalLong.empty();
};

if (!path.contains("_delta_log") && !path.contains("/.trino")) {
Matcher matcher = dataFilePattern.matcher(path);
if (matcher.matches()) {
String changeData = path.contains("/_change_data/") ? "change_data/" : "";
if (!path.contains("=")) {
return new CacheOperation(operationName, "data", position, length);
}
return new CacheOperation(operationName, changeData + matcher.group("partition"), position, length);
}
if (path.contains("/part-00000-")) {
return new CacheOperation(operationName, "data", position, length);
}
if (path.contains("/deletion_vector_")) {
return new CacheOperation(operationName, "deletion_vector", position, length);
}
}
else {
return new CacheOperation(operationName, fileName, position, length);
}
throw new IllegalArgumentException("File not recognized: " + path);
}
}
Loading

0 comments on commit 18211a4

Please sign in to comment.