Skip to content

Commit

Permalink
HIVE-27991: Utilise FanoutWriters when inserting records in an Iceber…
Browse files Browse the repository at this point in the history
…g table when the records are unsorted
  • Loading branch information
SourabhBadhya committed Jan 11, 2024
1 parent 0c3b822 commit bff4105
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private static HiveIcebergWriter writer(JobConf jc) {
.attemptID(taskAttemptID)
.poolSize(poolSize)
.operation(HiveCustomStorageHandlerUtils.getWriteOperation(jc, tableName))
.isFanoutEnabled(HiveCustomStorageHandlerUtils.getWriteOperationIsSorted(jc, tableName))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.FanoutDataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
Expand All @@ -41,9 +42,9 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {

HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io,
long targetFileSize) {
super(schema, specs, io,
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
long targetFileSize, boolean fanoutEnabled) {
super(schema, specs, io, fanoutEnabled ? new FanoutDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize)
: new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
this.currentSpecId = currentSpecId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class WriterBuilder {
private String queryId;
private int poolSize;
private Operation operation;

private boolean fanoutEnabled;

// A task may write multiple output files using multiple writers. Each of them must have a unique operationId.
private static AtomicInteger operationNum = new AtomicInteger(0);

Expand Down Expand Up @@ -85,6 +88,11 @@ public WriterBuilder operation(Operation newOperation) {
return this;
}

public WriterBuilder isFanoutEnabled(boolean isFanoutEnabled) {
this.fanoutEnabled = isFanoutEnabled;
return this;
}

public HiveIcebergWriter build() {
Map<String, String> properties = table.properties();

Expand Down Expand Up @@ -133,7 +141,7 @@ public HiveIcebergWriter build() {
break;
case OTHER:
writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
io, targetFileSize);
io, targetFileSize, fanoutEnabled);
break;
default:
// Update and Merge should be splitted to inserts and deletes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperation;
import static org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils.setWriteOperationIsSorted;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -633,6 +634,7 @@ protected void initializeOp(Configuration hconf) throws HiveException {

jc = new JobConf(hconf);
setWriteOperation(jc, getConf().getTableInfo().getTableName(), getConf().getWriteOperation());
setWriteOperationIsSorted(jc, getConf().getTableInfo().getTableName(), dpCtx);

try {
createHiveOutputFormat(jc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.common.StatsSetupConst;

public class HiveCustomStorageHandlerUtils {

public static final String WRITE_OPERATION_CONFIG_PREFIX = "file.sink.write.operation.";

public static final String WRITE_OPERATION_IS_SORTED = "file.sink.write.operation.sorted.";


public static String getTablePropsForCustomStorageHandler(Map<String, String> tableProperties) {
StringBuilder properties = new StringBuilder();
Expand Down Expand Up @@ -71,4 +74,22 @@ public static void setWriteOperation(Configuration conf, String tableName, Conte

conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name());
}

public static void setWriteOperationIsSorted(Configuration conf, String tableName, DynamicPartitionCtx dpCtx) {
boolean isSorted = dpCtx != null && dpCtx.getCustomSortExpressions() != null && dpCtx.getCustomSortExpressions().isEmpty();
if (conf == null || tableName == null) {
return;
}

conf.set(WRITE_OPERATION_IS_SORTED + tableName, Boolean.toString(isSorted));
}

public static boolean getWriteOperationIsSorted(Configuration conf, String tableName) {
if (conf == null || tableName == null) {
return false;
}

String isSortedString = conf.get(WRITE_OPERATION_IS_SORTED + tableName);
return Boolean.parseBoolean(isSortedString);
}
}

0 comments on commit bff4105

Please sign in to comment.