Skip to content

Commit

Permalink
Fixing SI to not rely on RDD<WriteStatus> while preparing MDT records
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 20, 2024
1 parent a8f2488 commit 9ea6e3e
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -108,6 +110,8 @@
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordKeyInfo;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.generateRLIRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex;
Expand Down Expand Up @@ -602,7 +606,8 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeSecondaryIndexPartitio
this.getClass().getSimpleName(),
dataMetaClient,
getEngineType(),
indexDefinition);
indexDefinition,
dataMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime());

// Initialize the file groups - using the same estimation logic as that of record index
final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),
Expand Down Expand Up @@ -1061,11 +1066,26 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD
// Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
if (dataWriteConfig.isRecordIndexEnabled()) {
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata);
int parallelism = Math.max(1, Math.min(commitMetadata.getWriteStats().size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()));
boolean isSecondaryIndexEnabled = dataMetaClient.getTableConfig().getMetadataPartitions()
.stream().anyMatch(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX));
// get info about every record key for impacted file groups so that we can use that to generate RLI and optionally SI records.
HoodiePairData<Pair<String, String>, PerFileGroupRecordKeyInfos> recordKeyInfoPairs = convertMetadataToRecordKeyInfo(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(),
dataMetaClient, isSecondaryIndexEnabled, parallelism);

recordKeyInfoPairs.persist("MEMORY_AND_DISK_SER", engineContext, HoodieData.HoodieDataCacheKey.of(dataWriteConfig.getBasePath(), instantTime));
// this will get cleaned up when metadata commit completes.
engineContext.putCachedDataIds(HoodieData.HoodieDataCacheKey.of(metadataWriteConfig.getBasePath(), instantTime), recordKeyInfoPairs.getId());

HoodieData<HoodieRecord> rliRecords = generateRLIRecords(engineContext, recordKeyInfoPairs, commitMetadata.getWriteStats(), instantTime,
dataWriteConfig.getWritesFileIdEncoding(), parallelism);
partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), rliRecords);
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(rliRecords, commitMetadata);
partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
// prepare secondary index records.
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, recordKeyInfoPairs);
}
updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap);
updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus);
return partitionToRecordMap;
});
closeInternal();
Expand Down Expand Up @@ -1127,7 +1147,8 @@ private HoodieData<HoodieRecord> getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime);
}

private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map<String, HoodieData<HoodieRecord>> partitionToRecordMap,
HoodiePairData<Pair<String, String>, PerFileGroupRecordKeyInfos> recordKeyInfoPairs) {
// If write operation type based on commit metadata is COMPACT or CLUSTER then no need to update,
// because these operations do not change the secondary key - record key mapping.
if (commitMetadata.getOperationType() == WriteOperationType.COMPACT
Expand All @@ -1141,28 +1162,29 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
.forEach(partition -> {
HoodieData<HoodieRecord> secondaryIndexRecords;
try {
secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, writeStatus);
secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, recordKeyInfoPairs);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e);
}
partitionToRecordMap.put(partition, secondaryIndexRecords);
});
}

private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
private HoodieData<HoodieRecord> getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition,
HoodiePairData<Pair<String, String>, PerFileGroupRecordKeyInfos> recordKeyInfoPairs) throws Exception {
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = getPartitionFilePairs(commitMetadata);
// Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of
// the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this
// operation is going to be expensive (in CPU, memory and IO)
List<String> keysToRemove = new ArrayList<>();
writeStatus.collectAsList().forEach(status -> {
status.getWrittenRecordDelegates().forEach(recordDelegate -> {
// Consider those keys which were either updated or deleted in this commit
if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) {
keysToRemove.add(recordDelegate.getRecordKey());
}
});
});

List<String> keysToRemove = recordKeyInfoPairs
.map((SerializableFunction<Pair<Pair<String, String>, PerFileGroupRecordKeyInfos>, PerFileGroupRecordKeyInfos>) v1
-> v1.getValue())
.filter((SerializableFunction<PerFileGroupRecordKeyInfos, Boolean>) v1 -> !v1.isPureInserts())
.flatMap((SerializableFunction<PerFileGroupRecordKeyInfos, Iterator<String>>) v1
-> v1.getRecordKeyInfoList().stream().filter(recordKeyInfo -> recordKeyInfo.getRecordStatus() != RecordKeyInfo.RecordStatus.INSERT) //filter for records that are not deleted or updated.
.map(RecordKeyInfo::getRecordKey).iterator()).collectAsList();

HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition);
// Fetch the secondary keys that each of the record keys ('keysToRemove') maps to
// This is obtained by scanning the entire secondary index partition in the metadata table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand Down Expand Up @@ -93,6 +95,11 @@ public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
return HoodieListPairData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.eager(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodieListPairData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand Down Expand Up @@ -76,6 +78,11 @@ public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
return HoodieListPairData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.eager(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
Expand All @@ -33,6 +34,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.data.HoodieSparkLongAccumulator;
import org.apache.hudi.exception.HoodieException;
Expand All @@ -44,6 +46,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;

import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -108,6 +111,11 @@ public <T> HoodieData<T> emptyHoodieData() {
return HoodieJavaRDD.of(javaSparkContext.emptyRDD());
}

@Override
public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
return HoodieJavaPairRDD.of(javaSparkContext.emptyRDD().mapToPair((PairFunction<Object, K, V>) o -> new Tuple2(o, o)));
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieJavaRDD.of(javaSparkContext.parallelize(data, parallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
Expand Down Expand Up @@ -73,6 +74,11 @@ public static <K, V> JavaPairRDD<K, V> getJavaPairRDD(HoodiePairData<K, V> hoodi
return ((HoodieJavaPairRDD<K, V>) hoodiePairData).get();
}

@Override
public int getId() {
return pairRDDData.id();
}

@Override
public JavaPairRDD<K, V> get() {
return pairRDDData;
Expand All @@ -83,6 +89,12 @@ public void persist(String storageLevel) {
pairRDDData.persist(StorageLevel.fromString(storageLevel));
}

@Override
public void persist(String level, HoodieEngineContext engineContext, HoodieData.HoodieDataCacheKey cacheKey) {
pairRDDData.persist(StorageLevel.fromString(level));
engineContext.putCachedDataIds(cacheKey, pairRDDData.id());
}

@Override
public void unpersist() {
pairRDDData.unpersist();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -32,7 +31,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.BaseFileUtils;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.RecordKeyInfo;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -89,12 +88,12 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro
fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1));
}

Iterator<HoodieRecord> rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage());
Iterator<RecordKeyInfo> rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(),
writeStatus.getStat(), metaClient.getStorage()).getRecordKeyInfoList().iterator();
while (rliRecordsItr.hasNext()) {
HoodieRecord rliRecord = rliRecordsItr.next();
RecordKeyInfo rliRecord = rliRecordsItr.next();
String key = rliRecord.getRecordKey();
String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath();
String partition = rliRecord.getPartition();
recordKeyToPartitionMapping1.put(key, partition);
}
} catch (IOException e) {
Expand Down Expand Up @@ -154,12 +153,10 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro

compactionWriteStats.forEach(writeStat -> {
try {
Iterator<HoodieRecord> rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat,
writeConfig.getWritesFileIdEncoding(), finalCommitTime3, metaClient.getStorage());
while (rliRecordsItr.hasNext()) {
HoodieRecord rliRecord = rliRecordsItr.next();
for (RecordKeyInfo rliRecord : BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat,
metaClient.getStorage()).getRecordKeyInfoList()) {
String key = rliRecord.getRecordKey();
if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) {
if (rliRecord.getRecordStatus() == RecordKeyInfo.RecordStatus.DELETE) {
actualRLIDeletes.add(key);
}
}
Expand Down Expand Up @@ -188,12 +185,13 @@ private void generateRliRecordsAndAssert(List<WriteStatus> writeStatuses, Map<St
String writeStatFileId = writeStatus.getFileId();
assertEquals(writeStatus.getStat().getPrevBaseFile(), fileIdToFileNameMapping.get(writeStatFileId));

Iterator<HoodieRecord> rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(),
writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage());
Iterator<RecordKeyInfo> rliRecordsItr =
BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), metaClient.getStorage())
.getRecordKeyInfoList().iterator();
while (rliRecordsItr.hasNext()) {
HoodieRecord rliRecord = rliRecordsItr.next();
RecordKeyInfo rliRecord = rliRecordsItr.next();
String key = rliRecord.getRecordKey();
if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) {
if (rliRecord.getRecordStatus() == RecordKeyInfo.RecordStatus.DELETE) {
actualRLIDeletes.add(key);
} else {
actualRLIInserts.add(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public static HoodieDataCacheKey of(String basePath, String instantTime) {
private final String basePath;
private final String instantTime;

private HoodieDataCacheKey(String basePath, String instantTime) {
public HoodieDataCacheKey(String basePath, String instantTime) {
this.basePath = basePath;
this.instantTime = instantTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.data;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
Expand Down Expand Up @@ -76,6 +77,11 @@ private HoodieListPairData(List<Pair<K, V>> data, boolean lazy) {
super(dataStream, lazy);
}

@Override
public int getId() {
return -1;
}

@Override
public List<Pair<K, V>> get() {
return collectAsList();
Expand All @@ -86,6 +92,11 @@ public void persist(String cacheConfig) {
// no-op
}

@Override
public void persist(String level, HoodieEngineContext engineContext, HoodieData.HoodieDataCacheKey cacheKey) {
// no op
}

@Override
public void unpersist() {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.common.data;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
Expand All @@ -37,6 +38,12 @@
* @param <V> type of value.
*/
public interface HoodiePairData<K, V> extends Serializable {

/**
* Get the {@link HoodieData}'s unique non-negative identifier. -1 indicates invalid id.
*/
int getId();

/**
* @return the collection of pairs.
*/
Expand All @@ -49,6 +56,11 @@ public interface HoodiePairData<K, V> extends Serializable {
*/
void persist(String cacheConfig);

/**
* Persists the data w/ provided {@code level} (if applicable), and cache the data's ids within the {@code engineContext}.
*/
void persist(String level, HoodieEngineContext engineContext, HoodieData.HoodieDataCacheKey cacheKey);

/**
* Un-persists the data (if applicable)
*/
Expand Down
Loading

0 comments on commit 9ea6e3e

Please sign in to comment.