Skip to content

Commit

Permalink
Add option to disable filesystem caching of /_delta_log/ directory
Browse files Browse the repository at this point in the history
  • Loading branch information
sdaberdaku committed Sep 13, 2024
1 parent 652445b commit 7c4524f
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 3 deletions.
19 changes: 19 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -1221,3 +1221,22 @@ keep a backup of the original values if you change them.

The connector supports configuring and using [file system
caching](/object-storage/file-system-cache).

The following table describes file system cache properties specific to
the Delta Lake connector.

:::{list-table} Delta Lake file system cache configuration properties
:widths: 30, 50, 20
:header-rows: 1

* - Property name
- Description
- Default
* - `delta.fs.cache.delta-log-caching-enabled`
- Set to `false` to disable caching of the `_delta_log` directory of
Delta Tables. This is useful in those cases when Delta Tables are
destroyed and recreated, and the files inside the transaction log
directory get overwritten and cannot be safely cached. Effective
only when `fs.cache.enabled=true`.
- `true`
:::
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class DeltaLakeConfig
private boolean projectionPushdownEnabled = true;
private boolean queryPartitionFilterRequired;
private boolean deletionVectorsEnabled;
private boolean deltaLogFileSystemCacheEnabled = true;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -532,4 +533,17 @@ public DeltaLakeConfig setDeletionVectorsEnabled(boolean deletionVectorsEnabled)
this.deletionVectorsEnabled = deletionVectorsEnabled;
return this;
}

public boolean isDeltaLogFileSystemCacheEnabled()
{
return deltaLogFileSystemCacheEnabled;
}

@Config("delta.fs.cache.delta-log-caching-enable")
@ConfigDescription("Enable object storage caching of the _delta_log directory (effective only when fs.cache.enabled=true)")
public DeltaLakeConfig setDeltaLogFileSystemCacheEnabled(boolean deltaLogFileSystemCacheEnabled)
{
this.deltaLogFileSystemCacheEnabled = deltaLogFileSystemCacheEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.plugin.base.security.ConnectorAccessControlModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.deltalake.cache.DeltaLakeCacheKeyProvider;
import io.trino.plugin.deltalake.cache.MutableDeltaLogDeltaLakeCacheKeyProvider;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionProvider;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider;
import io.trino.plugin.deltalake.metastore.DeltaLakeTableMetadataScheduler;
Expand Down Expand Up @@ -151,7 +152,12 @@ public void setup(Binder binder)
binder.bind(FunctionProvider.class).to(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON);
binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON);

newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);
if (buildConfigObject(DeltaLakeConfig.class).isDeltaLogFileSystemCacheEnabled()) {
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);
}
else {
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(MutableDeltaLogDeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON);
}

closingBinder(binder).registerExecutor(ExecutorService.class);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.cache;

import io.trino.filesystem.TrinoInputFile;

import java.util.Optional;

import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;

public class MutableDeltaLogDeltaLakeCacheKeyProvider
extends DeltaLakeCacheKeyProvider
{
/**
* Extends DeltaLakeCacheKeyProvider by returning Optional.empty() for all files in the _delta_log directory.
* This CacheKeyProvider implementation is useful in those scenarios when Delta Tables are deleted and re-created, and caching their _delta_log directories should be avoided.
*/
@Override
public Optional<String> getCacheKey(TrinoInputFile inputFile)
{
String path = inputFile.location().path();
// Explicitly exclude the files in the _delta_log directory as they can change when the Delta Table is overwritten, https://github.com/trinodb/trino/issues/21451
if (path.contains("/" + TRANSACTION_LOG_DIRECTORY + "/")) {
return Optional.empty();
}
return super.getCacheKey(inputFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void testDefaults()
.setRegisterTableProcedureEnabled(false)
.setProjectionPushdownEnabled(true)
.setQueryPartitionFilterRequired(false)
.setDeletionVectorsEnabled(false));
.setDeletionVectorsEnabled(false)
.setDeltaLogFileSystemCacheEnabled(true));
}

@Test
Expand Down Expand Up @@ -112,6 +113,7 @@ public void testExplicitPropertyMappings()
.put("delta.projection-pushdown-enabled", "false")
.put("delta.query-partition-filter-required", "true")
.put("delta.deletion-vectors-enabled", "true")
.put("delta.fs.cache.delta-log-caching-enable", "false")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -147,7 +149,8 @@ public void testExplicitPropertyMappings()
.setRegisterTableProcedureEnabled(true)
.setProjectionPushdownEnabled(false)
.setQueryPartitionFilterRequired(true)
.setDeletionVectorsEnabled(true);
.setDeletionVectorsEnabled(true)
.setDeltaLogFileSystemCacheEnabled(false);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.cache;

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.cache.CacheKeyProvider;
import io.trino.filesystem.memory.MemoryInputFile;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;

import java.io.IOException;
import java.util.Optional;

import static io.airlift.slice.Slices.EMPTY_SLICE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;

@TestInstance(PER_CLASS)
@Execution(CONCURRENT)
public class TestMutableDeltaLogDeltaLakeCacheKeyProvider
{
private CacheKeyProvider cacheKeyProvider;

@BeforeAll
public void setUp()
{
cacheKeyProvider = new MutableDeltaLogDeltaLakeCacheKeyProvider();
}

@Test
public void testCacheableFileReturnsCacheKey()
throws IOException
{
String filePath = "prefix/test_delta_table/part-00001-0161445f-38f8-46be-a0dc-d49ca3af30b2-c000.snappy.parquet";
TrinoInputFile cacheableFile = new MemoryInputFile(Location.of("memory:///" + filePath), EMPTY_SLICE);

Optional<String> cacheKey = cacheKeyProvider.getCacheKey(cacheableFile);

assertThat(cacheKey).isPresent();
assertThat(cacheKey.get()).isEqualTo(filePath);
}

@Test
public void testUncacheableFileReturnsEmptyCacheKey()
throws IOException
{
String filePath = "prefix/test_delta_table/_delta_log/00000000000000000001.json";
TrinoInputFile uncacheableFile = new MemoryInputFile(Location.of("memory:///" + filePath), EMPTY_SLICE);

Optional<String> cacheKey = cacheKeyProvider.getCacheKey(uncacheableFile);

assertThat(cacheKey).isEmpty();
}
}

0 comments on commit 7c4524f

Please sign in to comment.