Skip to content

Commit

Permalink
Respect target-max-file-size in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
homar authored and findepi committed Feb 10, 2022
1 parent 47a3850 commit 5edca27
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.trino.plugin.hive.HdfsEnvironment;
Expand Down Expand Up @@ -97,8 +98,11 @@ public class IcebergPageSink
private final IcebergFileFormat fileFormat;
private final MetricsConfig metricsConfig;
private final PagePartitioner pagePartitioner;
private final long targetMaxFileSize;

private final List<WriteContext> writers = new ArrayList<>();
private final List<WriteContext> closedWriters = new ArrayList<>();
private final Collection<Slice> commitTasks = new ArrayList<>();

private long writtenBytes;
private long memoryUsage;
Expand Down Expand Up @@ -133,6 +137,7 @@ public IcebergPageSink(
this.metricsConfig = MetricsConfig.fromProperties(requireNonNull(storageProperties, "storageProperties is null"));
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
this.targetMaxFileSize = IcebergSessionProperties.getTargetMaxFileSize(session);
}

@Override
Expand Down Expand Up @@ -164,24 +169,14 @@ public CompletableFuture<?> appendPage(Page page)
@Override
public CompletableFuture<Collection<Slice>> finish()
{
Collection<Slice> commitTasks = new ArrayList<>();

for (WriteContext context : writers) {
context.getWriter().commit();

CommitTaskData task = new CommitTaskData(
context.getPath().toString(),
context.getWriter().getWrittenBytes(),
new MetricsWrapper(context.getWriter().getMetrics()),
context.getPartitionData().map(PartitionData::toJson));

commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
closeWriter(context);
}

writtenBytes = writers.stream()
writtenBytes = closedWriters.stream()
.mapToLong(writer -> writer.getWriter().getWrittenBytes())
.sum();
validationCpuNanos = writers.stream()
validationCpuNanos = closedWriters.stream()
.mapToLong(writer -> writer.getWriter().getValidationCpuNanos())
.sum();

Expand All @@ -192,7 +187,7 @@ public CompletableFuture<Collection<Slice>> finish()
public void abort()
{
RuntimeException error = null;
for (WriteContext context : writers) {
for (WriteContext context : Iterables.concat(writers, closedWriters)) {
try {
if (context != null) {
context.getWriter().rollback();
Expand Down Expand Up @@ -288,12 +283,16 @@ private int[] getWriterIndexes(Page page)
// create missing writers
for (int position = 0; position < page.getPositionCount(); position++) {
int writerIndex = writerIndexes[position];
if (writers.get(writerIndex) != null) {
continue;
WriteContext writer = writers.get(writerIndex);
if (writer != null) {
if (writer.getWrittenBytes() <= targetMaxFileSize) {
continue;
}
closeWriter(writer);
}

Optional<PartitionData> partitionData = getPartitionData(pagePartitioner.getColumns(), page, position);
WriteContext writer = createWriter(partitionData);
writer = createWriter(partitionData);

writers.set(writerIndex, writer);
}
Expand All @@ -303,6 +302,25 @@ private int[] getWriterIndexes(Page page)
return writerIndexes;
}

private void closeWriter(WriteContext writeContext)
{
long currentWritten = writeContext.getWriter().getWrittenBytes();
long currentMemory = writeContext.getWriter().getMemoryUsage();
writeContext.getWriter().commit();
writtenBytes += (writeContext.getWriter().getWrittenBytes() - currentWritten);
memoryUsage += (writeContext.getWriter().getMemoryUsage() - currentMemory);

CommitTaskData task = new CommitTaskData(
writeContext.getPath().toString(),
writeContext.getWriter().getWrittenBytes(),
new MetricsWrapper(writeContext.getWriter().getMetrics()),
writeContext.getPartitionData().map(PartitionData::toJson));

commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));

closedWriters.add(writeContext);
}

private WriteContext createWriter(Optional<PartitionData> partitionData)
{
String fileName = fileFormat.toIceberg().addExtension(randomUUID().toString());
Expand Down Expand Up @@ -433,6 +451,11 @@ public Optional<PartitionData> getPartitionData()
{
return partitionData;
}

public long getWrittenBytes()
{
return writer.getWrittenBytes();
}
}

private static class PagePartitioner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.orc.OrcReaderConfig;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
Expand Down Expand Up @@ -69,6 +70,8 @@ public final class IcebergSessionProperties
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String STATISTICS_ENABLED = "statistics_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand All @@ -77,7 +80,8 @@ public IcebergSessionProperties(
OrcReaderConfig orcReaderConfig,
OrcWriterConfig orcWriterConfig,
ParquetReaderConfig parquetReaderConfig,
ParquetWriterConfig parquetWriterConfig)
ParquetWriterConfig parquetWriterConfig,
HiveConfig hiveConfig)
{
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(enumProperty(
Expand Down Expand Up @@ -210,6 +214,11 @@ public IcebergSessionProperties(
"Read only required fields from a struct",
icebergConfig.isProjectionPushdownEnabled(),
false))
.add(dataSizeProperty(
TARGET_MAX_FILE_SIZE,
"Target maximum size of written files; the actual size may be larger",
hiveConfig.getTargetMaxFileSize(),
false))
.build();
}

Expand Down Expand Up @@ -345,4 +354,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
}

public static long getTargetMaxFileSize(ConnectorSession session)
{
return session.getProperty(TARGET_MAX_FILE_SIZE, DataSize.class).toBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3337,4 +3337,36 @@ public void testOptimizeForPartitionedTable()
assertThat(updatedFiles).hasSize(3);
assertThat(getAllDataFilesFromTableDirectory(tableName)).containsExactlyInAnyOrderElementsOf(concat(initialFiles, updatedFiles));
}

@Test
public void testTargetMaxFileSize()
{
String tableName = "test_default_max_file_size" + randomTableSuffix();
@Language("SQL") String createTableSql = format("CREATE TABLE %s AS SELECT * FROM tpch.sf1.lineitem LIMIT 100000", tableName);

Session session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.build();
assertUpdate(session, createTableSql, 100000);
List<String> initialFiles = getActiveFiles(tableName);
assertThat(initialFiles.size()).isLessThanOrEqualTo(3);
assertUpdate(format("DROP TABLE %s", tableName));

DataSize maxSize = DataSize.of(40, DataSize.Unit.KILOBYTE);
session = Session.builder(getSession())
.setSystemProperty("task_writer_count", "1")
.setCatalogSessionProperty("iceberg", "target_max_file_size", maxSize.toString())
.build();

assertUpdate(session, createTableSql, 100000);
assertThat(query(format("SELECT count(*) FROM %s", tableName))).matches("VALUES BIGINT '100000'");
List<String> updatedFiles = getActiveFiles(tableName);
assertThat(updatedFiles.size()).isGreaterThan(10);

computeActual(format("SELECT file_size_in_bytes FROM \"%s$files\"", tableName))
.getMaterializedRows()
// as target_max_file_size is set to quite low value it can happen that created files are bigger,
// so just to be safe we check if it is not much bigger
.forEach(row -> assertThat((Long) row.getField(0)).isBetween(1L, maxSize.toBytes() * 3));
}
}

0 comments on commit 5edca27

Please sign in to comment.