Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable filesystem caching of /_delta_log/ directory #23408

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -1224,3 +1224,22 @@ keep a backup of the original values if you change them.

The connector supports configuring and using [file system
caching](/object-storage/file-system-cache).

The following table describes file system cache properties specific to
the Delta Lake connector.

:::{list-table} Delta Lake file system cache configuration properties
:widths: 30, 50, 20
:header-rows: 1

* - Property name
- Description
- Default
* - `delta.fs.cache.disable-transaction-log-caching`
- Set to `true` to disable caching of the `_delta_log` directory of
Delta Tables. This is useful in those cases when Delta Tables are
destroyed and recreated, and the files inside the transaction log
directory get overwritten and cannot be safely cached. Effective
only when `fs.cache.enabled=true`.
- `false`
:::
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DeltaLakeConfig
private boolean projectionPushdownEnabled = true;
private boolean queryPartitionFilterRequired;
private boolean deletionVectorsEnabled;
private boolean deltaLogFileSystemCacheDisabled;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -552,4 +553,17 @@ public DeltaLakeConfig setDeletionVectorsEnabled(boolean deletionVectorsEnabled)
this.deletionVectorsEnabled = deletionVectorsEnabled;
return this;
}

public boolean isDeltaLogFileSystemCacheDisabled()
{
return deltaLogFileSystemCacheDisabled;
}

@Config("delta.fs.cache.disable-transaction-log-caching")
@ConfigDescription("Disable filesystem caching of the _delta_log directory (effective only when fs.cache.enabled=true)")
public DeltaLakeConfig setDeltaLogFileSystemCacheDisabled(boolean deltaLogFileSystemCacheDisabled)
{
this.deltaLogFileSystemCacheDisabled = deltaLogFileSystemCacheDisabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,42 @@
*/
package io.trino.plugin.deltalake.cache;

import com.google.inject.Inject;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.plugin.deltalake.DeltaLakeConfig;

import java.util.Optional;

import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STARBURST_META_DIR;
import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STATISTICS_META_DIR;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;

public class DeltaLakeCacheKeyProvider
implements CacheKeyProvider
{
private final boolean deltaLogCacheDisabled;

@Inject
public DeltaLakeCacheKeyProvider(DeltaLakeConfig deltaLakeConfig)
{
// Disabling the delta log folder caching is useful in those scenarios when Delta Tables are deleted and re-created,
// and caching their _delta_log directories should be avoided.
this.deltaLogCacheDisabled = deltaLakeConfig.isDeltaLogFileSystemCacheDisabled();
}

/**
* Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable.
*/
@Override
public Optional<String> getCacheKey(TrinoInputFile inputFile)
{
String path = inputFile.location().path();
// Explicitly exclude the files in the _delta_log directory when deltaLogFileSystemCacheDisabled is set to true,
// as they can change when the Delta Table is overwritten, https://github.com/trinodb/trino/issues/21451
if (deltaLogCacheDisabled && path.contains("/" + TRANSACTION_LOG_DIRECTORY + "/")) {
return Optional.empty();
}
if (path.endsWith(".trinoSchema") || path.contains("/.trinoPermissions/")) {
// Needed to avoid caching files from FileHiveMetastore on coordinator during tests
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.trino.testing.QueryRunner;

import java.util.OptionalLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.CacheOperation;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toCollection;

public final class DeltaLakeAlluxioCacheTestUtils
{
private static final Pattern dataFilePattern = Pattern.compile(".*?/(?<partition>((\\w+)=[^/]*/)*)(?<queryId>\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?<uuid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");

private DeltaLakeAlluxioCacheTestUtils()
{}

public static Multiset<CacheOperation> getCacheOperations(QueryRunner queryRunner)
{
return getCacheOperationSpans(queryRunner)
.stream()
.map(DeltaLakeAlluxioCacheTestUtils::createCacheOperation)
.collect(toCollection(HashMultiset::create));
}

private static CacheOperation createCacheOperation(SpanData span)
{
String operationName = span.getName();
Attributes attributes = span.getAttributes();
String path = getFileLocation(span);
String fileName = path.replaceFirst(".*/", "");

OptionalLong position = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_POSITION)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_POSITION)));
default -> OptionalLong.empty();
};

OptionalLong length = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_SIZE)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_SIZE)));
default -> OptionalLong.empty();
};

if (!path.contains("_delta_log") && !path.contains("/.trino")) {
Matcher matcher = dataFilePattern.matcher(path);
if (matcher.matches()) {
String changeData = path.contains("/_change_data/") ? "change_data/" : "";
if (!path.contains("=")) {
return new CacheOperation(operationName, "data", position, length);
}
return new CacheOperation(operationName, changeData + matcher.group("partition"), position, length);
}
if (path.contains("/part-00000-")) {
return new CacheOperation(operationName, "data", position, length);
}
if (path.contains("/deletion_vector_")) {
return new CacheOperation(operationName, "deletion_vector", position, length);
}
}
else {
return new CacheOperation(operationName, fileName, position, length);
}
throw new IllegalArgumentException("File not recognized: " + path);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import com.google.common.io.Resources;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import org.intellij.lang.annotations.Language;
Expand All @@ -31,27 +28,15 @@
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.OptionalLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.CacheOperation;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE;
import static io.trino.plugin.deltalake.DeltaLakeAlluxioCacheTestUtils.getCacheOperations;
import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toCollection;

@Execution(ExecutionMode.SAME_THREAD)
public class TestDeltaLakeAlluxioCacheFileOperations
Expand Down Expand Up @@ -550,59 +535,6 @@ private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<Ca
assertUpdate("CALL system.flush_metadata_cache()");
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
assertMultisetsEqual(getCacheOperations(), expectedCacheAccesses);
}

private Multiset<CacheOperation> getCacheOperations()
{
return getCacheOperationSpans(getQueryRunner())
.stream()
.map(TestDeltaLakeAlluxioCacheFileOperations::createCacheOperation)
.collect(toCollection(HashMultiset::create));
}

private static Pattern dataFilePattern = Pattern.compile(".*?/(?<partition>((\\w+)=[^/]*/)*)(?<queryId>\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?<uuid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");

private static CacheOperation createCacheOperation(SpanData span)
{
String operationName = span.getName();
Attributes attributes = span.getAttributes();
String path = getFileLocation(span);
String fileName = path.replaceFirst(".*/", "");

OptionalLong position = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_POSITION)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_POSITION)));
default -> OptionalLong.empty();
};

OptionalLong length = switch (operationName) {
case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_SIZE)));
case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE)));
case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_SIZE)));
default -> OptionalLong.empty();
};

if (!path.contains("_delta_log") && !path.contains("/.trino")) {
Matcher matcher = dataFilePattern.matcher(path);
if (matcher.matches()) {
String changeData = path.contains("/_change_data/") ? "change_data/" : "";
if (!path.contains("=")) {
return new CacheOperation(operationName, "data", position, length);
}
return new CacheOperation(operationName, changeData + matcher.group("partition"), position, length);
}
if (path.contains("/part-00000-")) {
return new CacheOperation(operationName, "data", position, length);
}
if (path.contains("/deletion_vector_")) {
return new CacheOperation(operationName, "deletion_vector", position, length);
}
}
else {
return new CacheOperation(operationName, fileName, position, length);
}
throw new IllegalArgumentException("File not recognized: " + path);
assertMultisetsEqual(getCacheOperations(queryRunner), expectedCacheAccesses);
}
}
Loading