Skip to content

Commit

Permalink
HIVE-27991: Utilise FanoutWriters when inserting records in an Iceber…
Browse files Browse the repository at this point in the history
…g table when the records are unsorted
  • Loading branch information
SourabhBadhya committed Jan 16, 2024
1 parent 0c3b822 commit 47cfb3b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private static HiveIcebergWriter writer(JobConf jc) {
.attemptID(taskAttemptID)
.poolSize(poolSize)
.operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc, tableName))
.isFanoutEnabled(HiveCustomStorageHandlerUtils.getWriteOperationIsSorted(jc, tableName))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.FanoutDataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.mapred.Container;

Expand All @@ -41,9 +43,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {

HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io,
long targetFileSize) {
super(schema, specs, io,
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
long targetFileSize, boolean fanoutEnabled) {
super(schema, specs, io, getIcebergDataWriter(fileWriterFactory, fileFactory, io, targetFileSize, fanoutEnabled));
this.currentSpecId = currentSpecId;
}

Expand All @@ -58,4 +59,11 @@ public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
return FilesForCommit.onlyData(dataFiles);
}

private static PartitioningWriter getIcebergDataWriter(FileWriterFactory<Record> fileWriterFactory,
OutputFileFactory fileFactory, FileIO io,
long targetFileSize, boolean fanoutEnabled) {
return fanoutEnabled ? new FanoutDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize)
: new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class WriterBuilder {
private String queryId;
private int poolSize;
private Operation operation;

private boolean fanoutEnabled;

// A task may write multiple output files using multiple writers. Each of them must have a unique operationId.
private static AtomicInteger operationNum = new AtomicInteger(0);

Expand Down Expand Up @@ -85,6 +88,11 @@ public WriterBuilder operation(Operation newOperation) {
return this;
}

public WriterBuilder isFanoutEnabled(boolean isFanoutEnabled) {
this.fanoutEnabled = isFanoutEnabled;
return this;
}

public HiveIcebergWriter build() {
Map<String, String> properties = table.properties();

Expand Down Expand Up @@ -133,7 +141,7 @@ public HiveIcebergWriter build() {
break;
case OTHER:
writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
io, targetFileSize);
io, targetFileSize, fanoutEnabled);
break;
default:
// Update and Merge should be splitted to inserts and deletes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation;
import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperationIsSorted;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -633,6 +634,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {

jc = new JobConf(hconf);
setWriteOperation(jc, getConf().getTableInfo().getTableName(), getConf().getWriteOperation());
setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(), getConf().getIsSorted());

try {
createHiveOutputFormat(jc);
Expand Down
8 changes: 8 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public enum DPSortState {

private boolean isCTASorCM = false;

private boolean isSorted = false;

public FileSinkDesc() {
}

Expand All @@ -156,6 +158,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean
this.totalFiles = totalFiles;
this.partitionCols = partitionCols;
this.dpCtx = dpCtx;
this.isSorted = dpCtx != null && dpCtx.getCustomSortExpressions() != null && !dpCtx.getCustomSortExpressions().isEmpty();
this.dpSortState = DPSortState.NONE;
this.destPath = destPath;
this.mmWriteId = mmWriteId;
Expand Down Expand Up @@ -472,6 +475,7 @@ public void setNumFiles(int numFiles) {

public void setDynPartCtx(DynamicPartitionCtx dpc) {
this.dpCtx = dpc;
this.isSorted = dpCtx != null && dpCtx.getCustomSortExpressions() != null && !dpCtx.getCustomSortExpressions().isEmpty();
}

public DynamicPartitionCtx getDynPartCtx() {
Expand Down Expand Up @@ -752,4 +756,8 @@ public void setDynPartitionValues(Map<String, List<Path>> dynPartitionValues) {
this.dynPartitionValues = dynPartitionValues;
}

public boolean getIsSorted() {
return isSorted;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.common.StatsSetupConst;

public class HiveCustomStorageHandlerUtils {

public static final String WRITE_OPERATION_CONFIG_PREFIX = "file.sink.write.operation.";

public static final String WRITE_OPERATION_IS_SORTED = "file.sink.write.operation.sorted.";


public static String getTablePropsForCustomStorageHandler(Map<String, String> tableProperties) {
StringBuilder properties = new StringBuilder();
Expand Down Expand Up @@ -71,4 +74,21 @@ public static void setWriteOperation(Configuration conf, String tableName, Conte

conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name());
}

public static void setWriteOperationIsSorted(Configuration conf, String tableName, boolean isSorted) {
if (conf == null || tableName == null) {
return;
}

conf.set(WRITE_OPERATION_IS_SORTED + tableName, Boolean.toString(isSorted));
}

public static boolean getWriteOperationIsSorted(Configuration conf, String tableName) {
if (conf == null || tableName == null) {
return false;
}

String isSortedString = conf.get(WRITE_OPERATION_IS_SORTED + tableName);
return Boolean.parseBoolean(isSortedString);
}
}

0 comments on commit 47cfb3b

Please sign in to comment.