diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index c116f5ad88e067..89e07a17e41957 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -1224,3 +1224,22 @@ keep a backup of the original values if you change them. The connector supports configuring and using [file system caching](/object-storage/file-system-cache). + +The following table describes file system cache properties specific to +the Delta Lake connector. + +:::{list-table} Delta Lake file system cache configuration properties +:widths: 30, 50, 20 +:header-rows: 1 + +* - Property name + - Description + - Default +* - `delta.fs.cache.disable-transaction-log-caching` + - Set to `true` to disable caching of the `_delta_log` directory of + Delta Tables. This is useful in those cases when Delta Tables are + destroyed and recreated, and the files inside the transaction log + directory get overwritten and cannot be safely cached. Effective + only when `fs.cache.enabled=true`. + - `false` +::: diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 42a976cbf7a627..feeda0434d8a13 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -89,6 +89,7 @@ public class DeltaLakeConfig private boolean projectionPushdownEnabled = true; private boolean queryPartitionFilterRequired; private boolean deletionVectorsEnabled; + private boolean deltaLogCacheDisabled; public Duration getMetadataCacheTtl() { @@ -552,4 +553,17 @@ public DeltaLakeConfig setDeletionVectorsEnabled(boolean deletionVectorsEnabled) this.deletionVectorsEnabled = deletionVectorsEnabled; return this; } + + public boolean isDeltaLogCacheDisabled() + { + return deltaLogCacheDisabled; + } + + @Config("delta.fs.cache.disable-transaction-log-caching") + @ConfigDescription("Disable filesystem caching of the _delta_log directory (effective only when fs.cache.enabled=true)") + public DeltaLakeConfig setDeltaLogCacheDisabled(boolean deltaLogCacheDisabled) + { + this.deltaLogCacheDisabled = deltaLogCacheDisabled; + return this; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java index 29382c3f9626b7..007ec8fd4dbc31 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java @@ -13,17 +13,30 @@ */ package io.trino.plugin.deltalake.cache; +import com.google.inject.Inject; import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.cache.CacheKeyProvider; +import io.trino.plugin.deltalake.DeltaLakeConfig; import java.util.Optional; import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STARBURST_META_DIR; import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STATISTICS_META_DIR; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY; public class DeltaLakeCacheKeyProvider implements CacheKeyProvider { + private final boolean deltaLogFileSystemCacheDisabled; + + @Inject + public DeltaLakeCacheKeyProvider(DeltaLakeConfig deltaLakeConfig) + { + // Disabling the delta log folder caching is useful in those scenarios when Delta Tables are deleted and re-created, + // and caching their _delta_log directories should be avoided. + this.deltaLogFileSystemCacheDisabled = deltaLakeConfig.isDeltaLogCacheDisabled(); + } + /** * Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable. */ @@ -31,6 +44,11 @@ public class DeltaLakeCacheKeyProvider public Optional getCacheKey(TrinoInputFile inputFile) { String path = inputFile.location().path(); + // Explicitly exclude the files in the _delta_log directory when deltaLogFileSystemCacheDisabled is set to true, + // as they can change when the Delta Table is overwritten, https://github.com/trinodb/trino/issues/21451 + if (deltaLogFileSystemCacheDisabled && path.contains("/" + TRANSACTION_LOG_DIRECTORY + "/")) { + return Optional.empty(); + } if (path.endsWith(".trinoSchema") || path.contains("/.trinoPermissions/")) { // Needed to avoid caching files from FileHiveMetastore on coordinator during tests return Optional.empty(); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeAlluxioCacheTestUtils.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeAlluxioCacheTestUtils.java new file mode 100644 index 00000000000000..3f06ae944991b7 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/DeltaLakeAlluxioCacheTestUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.trino.testing.QueryRunner; + +import java.util.OptionalLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.CacheOperation; +import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans; +import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE; +import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION; +import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toCollection; + +public final class DeltaLakeAlluxioCacheTestUtils +{ + private static final Pattern dataFilePattern = Pattern.compile(".*?/(?((\\w+)=[^/]*/)*)(?\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + + private DeltaLakeAlluxioCacheTestUtils() + {} + + public static Multiset getCacheOperations(QueryRunner queryRunner) + { + return getCacheOperationSpans(queryRunner) + .stream() + .map(DeltaLakeAlluxioCacheTestUtils::createCacheOperation) + .collect(toCollection(HashMultiset::create)); + } + + private static CacheOperation createCacheOperation(SpanData span) + { + String operationName = span.getName(); + Attributes attributes = span.getAttributes(); + String path = getFileLocation(span); + String fileName = path.replaceFirst(".*/", ""); + + OptionalLong position = switch (operationName) { + case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_POSITION))); + case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION))); + case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_POSITION))); + default -> OptionalLong.empty(); + }; + + OptionalLong length = switch (operationName) { + case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_SIZE))); + case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE))); + case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_SIZE))); + default -> OptionalLong.empty(); + }; + + if (!path.contains("_delta_log") && !path.contains("/.trino")) { + Matcher matcher = dataFilePattern.matcher(path); + if (matcher.matches()) { + String changeData = path.contains("/_change_data/") ? "change_data/" : ""; + if (!path.contains("=")) { + return new CacheOperation(operationName, "data", position, length); + } + return new CacheOperation(operationName, changeData + matcher.group("partition"), position, length); + } + if (path.contains("/part-00000-")) { + return new CacheOperation(operationName, "data", position, length); + } + if (path.contains("/deletion_vector_")) { + return new CacheOperation(operationName, "deletion_vector", position, length); + } + } + else { + return new CacheOperation(operationName, fileName, position, length); + } + throw new IllegalArgumentException("File not recognized: " + path); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java index cf9c5c3737b9e5..cc6902dc05785c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java @@ -13,13 +13,10 @@ */ package io.trino.plugin.deltalake; -import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; import com.google.common.io.Resources; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.trace.data.SpanData; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import org.intellij.lang.annotations.Language; @@ -31,27 +28,15 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; -import java.util.OptionalLong; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Stream; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.CacheOperation; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation; -import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; -import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; -import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION; -import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE; -import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_POSITION; -import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_READ_SIZE; +import static io.trino.plugin.deltalake.DeltaLakeAlluxioCacheTestUtils.getCacheOperations; import static io.trino.plugin.deltalake.TestingDeltaLakeUtils.copyDirectoryContents; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; import static java.lang.String.format; -import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toCollection; @Execution(ExecutionMode.SAME_THREAD) public class TestDeltaLakeAlluxioCacheFileOperations @@ -550,59 +535,6 @@ private void assertFileSystemAccesses(@Language("SQL") String query, Multiset getCacheOperations() - { - return getCacheOperationSpans(getQueryRunner()) - .stream() - .map(TestDeltaLakeAlluxioCacheFileOperations::createCacheOperation) - .collect(toCollection(HashMultiset::create)); - } - - private static Pattern dataFilePattern = Pattern.compile(".*?/(?((\\w+)=[^/]*/)*)(?\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); - - private static CacheOperation createCacheOperation(SpanData span) - { - String operationName = span.getName(); - Attributes attributes = span.getAttributes(); - String path = getFileLocation(span); - String fileName = path.replaceFirst(".*/", ""); - - OptionalLong position = switch (operationName) { - case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_POSITION))); - case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_POSITION))); - case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_POSITION))); - default -> OptionalLong.empty(); - }; - - OptionalLong length = switch (operationName) { - case "Alluxio.readCached", "Alluxio.readExternalStream" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_READ_SIZE))); - case "Alluxio.writeCache" -> OptionalLong.of(requireNonNull(attributes.get(CACHE_FILE_WRITE_SIZE))); - case "Input.readFully" -> OptionalLong.of(requireNonNull(attributes.get(FILE_READ_SIZE))); - default -> OptionalLong.empty(); - }; - - if (!path.contains("_delta_log") && !path.contains("/.trino")) { - Matcher matcher = dataFilePattern.matcher(path); - if (matcher.matches()) { - String changeData = path.contains("/_change_data/") ? "change_data/" : ""; - if (!path.contains("=")) { - return new CacheOperation(operationName, "data", position, length); - } - return new CacheOperation(operationName, changeData + matcher.group("partition"), position, length); - } - if (path.contains("/part-00000-")) { - return new CacheOperation(operationName, "data", position, length); - } - if (path.contains("/deletion_vector_")) { - return new CacheOperation(operationName, "deletion_vector", position, length); - } - } - else { - return new CacheOperation(operationName, fileName, position, length); - } - throw new IllegalArgumentException("File not recognized: " + path); + assertMultisetsEqual(getCacheOperations(queryRunner), expectedCacheAccesses); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java new file mode 100644 index 00000000000000..89cffc8f880675 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multiset; +import io.trino.filesystem.tracing.CacheFileSystemTraceUtils; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.deltalake.DeltaLakeAlluxioCacheTestUtils.getCacheOperations; +import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; + +@Execution(ExecutionMode.SAME_THREAD) +public class TestDeltaLakeAlluxioCacheMutableTransactionLog + extends AbstractTestQueryFramework +{ + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + Path cacheDirectory = Files.createTempDirectory("cache"); + closeAfterClass(() -> deleteRecursively(cacheDirectory, ALLOW_INSECURE)); + + return DeltaLakeQueryRunner.builder() + .setCoordinatorProperties(ImmutableMap.of("node-scheduler.include-coordinator", "false")) + .setDeltaProperties(ImmutableMap.builder() + .put("fs.cache.enabled", "true") + .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) + .put("fs.cache.max-sizes", "100MB") + .put("delta.enable-non-concurrent-writes", "true") + .put("delta.register-table-procedure.enabled", "true") + .put("delta.fs.cache.disable-transaction-log-caching", "true") + .buildOrThrow()) + .setWorkerCount(1) + .build(); + } + + /** + * Tests that querying a table twice results in the table data being cached, while the delta log data remains uncached. + *

+ * This test ensures that when a table is queried multiple times, the underlying data is retrieved from cache, improving performance. + * At the same time, it verifies that the transaction log is not cached, ensuring that updates or changes to the log are always fetched fresh. + */ + @Test + public void testTableDataCachedWhileTransactionLogNotCached() + { + assertUpdate("DROP TABLE IF EXISTS test_transaction_log_not_cached"); + assertUpdate("CREATE TABLE test_transaction_log_not_cached(key varchar, data varchar) with (partitioned_by=ARRAY['key'], checkpoint_interval = 2)"); + assertUpdate("INSERT INTO test_transaction_log_not_cached VALUES ('p1', '1-abc')", 1); + assertUpdate("INSERT INTO test_transaction_log_not_cached VALUES ('p2', '2-xyz')", 1); + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_transaction_log_not_cached')"); + assertFileSystemAccesses( + "SELECT * FROM test_transaction_log_not_cached", + ImmutableMultiset.builder() + .addCopies(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000002.checkpoint.parquet"), 2) + .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) + .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "00000000000000000003.json")) + .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 220)) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_transaction_log_not_cached", + ImmutableMultiset.builder() + .addCopies(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000002.checkpoint.parquet"), 2) + .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) + .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "00000000000000000003.json")) + .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) + .build()); + } + + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) + { + assertUpdate("CALL system.flush_metadata_cache()"); + QueryRunner queryRunner = getQueryRunner(); + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + assertMultisetsEqual(getCacheOperations(queryRunner), expectedCacheAccesses); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 3e180436d61597..ae25172817ec7d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -73,7 +73,8 @@ public void testDefaults() .setRegisterTableProcedureEnabled(false) .setProjectionPushdownEnabled(true) .setQueryPartitionFilterRequired(false) - .setDeletionVectorsEnabled(false)); + .setDeletionVectorsEnabled(false) + .setDeltaLogCacheDisabled(false)); } @Test @@ -114,6 +115,7 @@ public void testExplicitPropertyMappings() .put("delta.projection-pushdown-enabled", "false") .put("delta.query-partition-filter-required", "true") .put("delta.deletion-vectors-enabled", "true") + .put("delta.fs.cache.disable-transaction-log-caching", "true") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -150,7 +152,8 @@ public void testExplicitPropertyMappings() .setRegisterTableProcedureEnabled(true) .setProjectionPushdownEnabled(false) .setQueryPartitionFilterRequired(true) - .setDeletionVectorsEnabled(true); + .setDeletionVectorsEnabled(true) + .setDeltaLogCacheDisabled(true); assertFullMapping(properties, expected); }