diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 07313150ead1..443b1f052e78 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -28,9 +28,11 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -108,6 +110,8 @@ import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordKeyInfo; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.generateRLIRecords; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForFunctionalIndex; @@ -602,7 +606,8 @@ private Pair> initializeSecondaryIndexPartitio this.getClass().getSimpleName(), dataMetaClient, getEngineType(), - indexDefinition); + indexDefinition, + dataMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()); // Initialize the file groups - using the same estimation logic as that of record index final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(), @@ -1061,11 +1066,26 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD // Updates for record index are created by parsing the WriteStatus which is a hudi-client object. Hence, we cannot yet move this code // to the HoodieTableMetadataUtil class in hudi-common. if (dataWriteConfig.isRecordIndexEnabled()) { - HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()), commitMetadata); + int parallelism = Math.max(1, Math.min(commitMetadata.getWriteStats().size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism())); + boolean isSecondaryIndexEnabled = dataMetaClient.getTableConfig().getMetadataPartitions() + .stream().anyMatch(partition -> partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX)); + // get info about every record key for impacted file groups so that we can use that to generate RLI and optionally SI records. + HoodiePairData, PerFileGroupRecordKeyInfos> recordKeyInfoPairs = convertMetadataToRecordKeyInfo(engineContext, commitMetadata, dataWriteConfig.getMetadataConfig(), + dataMetaClient, isSecondaryIndexEnabled, parallelism); + + recordKeyInfoPairs.persist("MEMORY_AND_DISK_SER", engineContext, HoodieData.HoodieDataCacheKey.of(dataWriteConfig.getBasePath(), instantTime)); + // this will get cleaned up when metadata commit completes. + engineContext.putCachedDataIds(HoodieData.HoodieDataCacheKey.of(metadataWriteConfig.getBasePath(), instantTime), recordKeyInfoPairs.getId()); + + HoodieData rliRecords = generateRLIRecords(engineContext, recordKeyInfoPairs, commitMetadata.getWriteStats(), instantTime, + dataWriteConfig.getWritesFileIdEncoding(), parallelism); + partitionToRecordMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), rliRecords); + HoodieData additionalUpdates = getRecordIndexAdditionalUpserts(rliRecords, commitMetadata); partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates)); + // prepare secondary index records. + updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, recordKeyInfoPairs); } updateFunctionalIndexIfPresent(commitMetadata, instantTime, partitionToRecordMap); - updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, writeStatus); return partitionToRecordMap; }); closeInternal(); @@ -1127,7 +1147,8 @@ private HoodieData getFunctionalIndexUpdates(HoodieCommitMetadata return getFunctionalIndexRecords(partitionFilePathPairs, indexDefinition, dataMetaClient, parallelism, readerSchema, storageConf, instantTime); } - private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, HoodieData writeStatus) { + private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, Map> partitionToRecordMap, + HoodiePairData, PerFileGroupRecordKeyInfos> recordKeyInfoPairs) { // If write operation type based on commit metadata is COMPACT or CLUSTER then no need to update, // because these operations do not change the secondary key - record key mapping. if (commitMetadata.getOperationType() == WriteOperationType.COMPACT @@ -1141,7 +1162,7 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, .forEach(partition -> { HoodieData secondaryIndexRecords; try { - secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, writeStatus); + secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, partition, recordKeyInfoPairs); } catch (Exception e) { throw new HoodieMetadataException("Failed to get secondary index updates for partition " + partition, e); } @@ -1149,20 +1170,21 @@ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata, }); } - private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, HoodieData writeStatus) throws Exception { + private HoodieData getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String indexPartition, + HoodiePairData, PerFileGroupRecordKeyInfos> recordKeyInfoPairs) throws Exception { List>>> partitionFilePairs = getPartitionFilePairs(commitMetadata); // Build a list of keys that need to be removed. A 'delete' record will be emitted into the respective FileGroup of // the secondary index partition for each of these keys. For a commit which is deleting/updating a lot of records, this // operation is going to be expensive (in CPU, memory and IO) - List keysToRemove = new ArrayList<>(); - writeStatus.collectAsList().forEach(status -> { - status.getWrittenRecordDelegates().forEach(recordDelegate -> { - // Consider those keys which were either updated or deleted in this commit - if (!recordDelegate.getNewLocation().isPresent() || (recordDelegate.getCurrentLocation().isPresent() && recordDelegate.getNewLocation().isPresent())) { - keysToRemove.add(recordDelegate.getRecordKey()); - } - }); - }); + + List keysToRemove = recordKeyInfoPairs + .map((SerializableFunction, PerFileGroupRecordKeyInfos>, PerFileGroupRecordKeyInfos>) v1 + -> v1.getValue()) + .filter((SerializableFunction) v1 -> !v1.isPureInserts()) + .flatMap((SerializableFunction>) v1 + -> v1.getRecordKeyInfoList().stream().filter(recordKeyInfo -> recordKeyInfo.getRecordStatus() != RecordKeyInfo.RecordStatus.INSERT) //filter for records that are not deleted or updated. + .map(RecordKeyInfo::getRecordKey).iterator()).collectAsList(); + HoodieIndexDefinition indexDefinition = getFunctionalIndexDefinition(indexPartition); // Fetch the secondary keys that each of the record keys ('keysToRemove') maps to // This is obtained by scanning the entire secondary index partition in the metadata table diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index be73658e0e24..9b3308695de9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -93,6 +95,11 @@ public HoodieData emptyHoodieData() { return HoodieListData.eager(Collections.emptyList()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieListPairData.eager(Collections.emptyList()); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieListData.eager(data); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index f542d5a0f7f9..a127089958b1 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -76,6 +78,11 @@ public HoodieData emptyHoodieData() { return HoodieListData.eager(Collections.emptyList()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieListPairData.eager(Collections.emptyList()); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieListData.eager(data); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java index b1763634bc06..e82b5a5bf513 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; @@ -33,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.data.HoodieJavaPairRDD; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.data.HoodieSparkLongAccumulator; import org.apache.hudi.exception.HoodieException; @@ -44,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SQLContext; import javax.annotation.concurrent.ThreadSafe; @@ -108,6 +111,11 @@ public HoodieData emptyHoodieData() { return HoodieJavaRDD.of(javaSparkContext.emptyRDD()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieJavaPairRDD.of(javaSparkContext.emptyRDD().mapToPair((PairFunction) o -> new Tuple2(o, o))); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieJavaRDD.of(javaSparkContext.parallelize(data, parallelism)); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java index 5b422b7fe8ae..5ff458eb7942 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodiePairData; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -73,6 +74,11 @@ public static JavaPairRDD getJavaPairRDD(HoodiePairData hoodi return ((HoodieJavaPairRDD) hoodiePairData).get(); } + @Override + public int getId() { + return pairRDDData.id(); + } + @Override public JavaPairRDD get() { return pairRDDData; @@ -83,6 +89,12 @@ public void persist(String storageLevel) { pairRDDData.persist(StorageLevel.fromString(storageLevel)); } + @Override + public void persist(String level, HoodieEngineContext engineContext, HoodieData.HoodieDataCacheKey cacheKey) { + pairRDDData.persist(StorageLevel.fromString(level)); + engineContext.putCachedDataIds(cacheKey, pairRDDData.id()); + } + @Override public void unpersist() { pairRDDData.unpersist(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java index 9ad4003525b7..88416690b37f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRLIRecordGeneration.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -32,7 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.BaseFileUtils; -import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.RecordKeyInfo; import org.apache.hudi.testutils.HoodieClientTestBase; import org.junit.jupiter.params.ParameterizedTest; @@ -89,12 +88,12 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro fileIdToFileNameMapping1.put(writeStatFileId, writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf("/") + 1)); } - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), - writeStatus.getStat(), writeConfig.getWritesFileIdEncoding(), finalCommitTime, metaClient.getStorage()); + Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), + writeStatus.getStat(), metaClient.getStorage()).getRecordKeyInfoList().iterator(); while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); + RecordKeyInfo rliRecord = rliRecordsItr.next(); String key = rliRecord.getRecordKey(); - String partition = ((HoodieMetadataPayload) rliRecord.getData()).getRecordGlobalLocation().getPartitionPath(); + String partition = rliRecord.getPartition(); recordKeyToPartitionMapping1.put(key, partition); } } catch (IOException e) { @@ -154,12 +153,10 @@ public void testGeneratingRLIRecordsFromBaseFile(HoodieTableType tableType) thro compactionWriteStats.forEach(writeStat -> { try { - Iterator rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat, - writeConfig.getWritesFileIdEncoding(), finalCommitTime3, metaClient.getStorage()); - while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); + for (RecordKeyInfo rliRecord : BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStat, + metaClient.getStorage()).getRecordKeyInfoList()) { String key = rliRecord.getRecordKey(); - if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + if (rliRecord.getRecordStatus() == RecordKeyInfo.RecordStatus.DELETE) { actualRLIDeletes.add(key); } } @@ -188,12 +185,13 @@ private void generateRliRecordsAndAssert(List writeStatuses, Map rliRecordsItr = BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), - writeConfig.getWritesFileIdEncoding(), commitTime, metaClient.getStorage()); + Iterator rliRecordsItr = + BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(metaClient.getBasePath().toString(), writeStatus.getStat(), metaClient.getStorage()) + .getRecordKeyInfoList().iterator(); while (rliRecordsItr.hasNext()) { - HoodieRecord rliRecord = rliRecordsItr.next(); + RecordKeyInfo rliRecord = rliRecordsItr.next(); String key = rliRecord.getRecordKey(); - if (rliRecord.getData() instanceof EmptyHoodieRecordPayload) { + if (rliRecord.getRecordStatus() == RecordKeyInfo.RecordStatus.DELETE) { actualRLIDeletes.add(key); } else { actualRLIInserts.add(key); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index e65a8f426bd3..bd8b88e75391 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -233,7 +233,7 @@ public static HoodieDataCacheKey of(String basePath, String instantTime) { private final String basePath; private final String instantTime; - private HoodieDataCacheKey(String basePath, String instantTime) { + public HoodieDataCacheKey(String basePath, String instantTime) { this.basePath = basePath; this.instantTime = instantTime; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java index b55d2f5be98f..95b73b138d2f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.data; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -76,6 +77,11 @@ private HoodieListPairData(List> data, boolean lazy) { super(dataStream, lazy); } + @Override + public int getId() { + return -1; + } + @Override public List> get() { return collectAsList(); @@ -86,6 +92,11 @@ public void persist(String cacheConfig) { // no-op } + @Override + public void persist(String level, HoodieEngineContext engineContext, HoodieData.HoodieDataCacheKey cacheKey) { + // no op + } + @Override public void unpersist() { // no-op diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java index d9815063b86f..af83e1fad181 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.data; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; @@ -37,6 +38,12 @@ * @param type of value. */ public interface HoodiePairData extends Serializable { + + /** + * Get the {@link HoodieData}'s unique non-negative identifier. -1 indicates invalid id. + */ + int getId(); + /** * @return the collection of pairs. */ @@ -49,6 +56,11 @@ public interface HoodiePairData extends Serializable { */ void persist(String cacheConfig); + /** + * Persists the data w/ provided {@code level} (if applicable), and cache the data's ids within the {@code engineContext}. + */ + void persist(String level, HoodieEngineContext engineContext, HoodieData.HoodieDataCacheKey cacheKey); + /** * Un-persists the data (if applicable) */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java index b16f93302920..43b14d61c0b8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -67,6 +68,8 @@ public TaskContextSupplier getTaskContextSupplier() { public abstract HoodieData emptyHoodieData(); + public abstract HoodiePairData emptyHoodiePairData(); + public HoodieData parallelize(List data) { if (data.isEmpty()) { return emptyHoodieData(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index d605349b30d3..979a65413e5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -23,6 +23,8 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.data.HoodieListData; +import org.apache.hudi.common.data.HoodieListPairData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -73,6 +75,11 @@ public HoodieData emptyHoodieData() { return HoodieListData.eager(Collections.emptyList()); } + @Override + public HoodiePairData emptyHoodiePairData() { + return HoodieListPairData.eager(Collections.emptyList()); + } + @Override public HoodieData parallelize(List data, int parallelism) { return HoodieListData.eager(data); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java index 20038fdf840d..31b83301b412 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileUtils.java @@ -33,59 +33,83 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.stream.Collectors.toList; public class BaseFileUtils { + public static PerFileGroupRecordKeyInfos generateRLIMetadataHoodieRecordsForBaseFile(String basePath, + HoodieWriteStat writeStat, + HoodieStorage storage) throws IOException { + return generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, storage, false); + } + /** * Generates RLI Metadata records for base files. * If base file is a added to a new file group, all record keys are treated as inserts. * If a base file is added to an existing file group, we read previous base file in addition to the latest base file of interest. Find deleted records and generate RLI Metadata records * for the same in addition to new insert records. - * @param basePath base path of the table. + * + * @param basePath base path of the table. * @param writeStat {@link HoodieWriteStat} of interest. - * @param writesFileIdEncoding fileID encoding for the table. - * @param instantTime instant time of interest. - * @param storage instance of {@link HoodieStorage}. + * @param storage instance of {@link HoodieStorage}. * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition. * @throws IOException */ - public static Iterator generateRLIMetadataHoodieRecordsForBaseFile(String basePath, - HoodieWriteStat writeStat, - Integer writesFileIdEncoding, - String instantTime, - HoodieStorage storage) throws IOException { + public static PerFileGroupRecordKeyInfos generateRLIMetadataHoodieRecordsForBaseFile(String basePath, + HoodieWriteStat writeStat, + HoodieStorage storage, + boolean bothRLIAndSIEnabled) throws IOException { String partition = writeStat.getPartitionPath(); String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); String previousFileName = writeStat.getPrevBaseFile(); String fileId = FSUtils.getFileId(latestFileName); Set recordKeysFromLatestBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName); - if (writeStat.getNumDeletes() == 0) { // if there are no deletes, reading only the file added as part of current commit metadata would suffice. + if (writeStat.getNumDeletes() == 0 && !bothRLIAndSIEnabled) { // if there are no deletes, reading only the file added as part of current commit metadata would suffice. // this means that both inserts and updates from latest base file might result in RLI records. - return new ArrayList(recordKeysFromLatestBaseFile).stream().map(recordKey -> HoodieMetadataPayload.createRecordIndexUpdate((String) recordKey, partition, fileId, - instantTime, writesFileIdEncoding)).iterator(); + return new PerFileGroupRecordKeyInfos(true, (List) new ArrayList(recordKeysFromLatestBaseFile).stream() + .map(recordKey -> new RecordKeyInfo((String) recordKey, RecordKeyInfo.RecordStatus.INSERT, partition, fileId)).collect(toList())); } else { - // read from previous base file and find difference to also generate delete records. - // we will return new inserts and deletes from this code block - Set recordKeysFromPreviousBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName); - List toReturn = recordKeysFromPreviousBaseFile.stream() - .filter(recordKey -> { - // deleted record - return !recordKeysFromLatestBaseFile.contains(recordKey); - }).map(recordKey -> HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()); + if (previousFileName == null) { + // all records are inserts + return new PerFileGroupRecordKeyInfos(true, (List) new ArrayList(recordKeysFromLatestBaseFile).stream() + .map(recordKey -> new RecordKeyInfo((String) recordKey, RecordKeyInfo.RecordStatus.INSERT, partition, fileId)).collect(toList())); + } else { + // read from previous base file and find difference to also generate delete records. + // we will return new inserts and deletes from this code block + Set recordKeysFromPreviousBaseFile = getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName); + AtomicBoolean pureInserts = new AtomicBoolean(true); + List toReturn = recordKeysFromPreviousBaseFile.stream() + .filter(recordKey -> { + // deleted record + return !recordKeysFromLatestBaseFile.contains(recordKey); + }).map(recordKey -> { + pureInserts.set(false); + return new RecordKeyInfo(recordKey, RecordKeyInfo.RecordStatus.DELETE, partition, fileId); + }).collect(toList()); - toReturn.addAll(recordKeysFromLatestBaseFile.stream() - .filter(recordKey -> { - // new inserts - return !recordKeysFromPreviousBaseFile.contains(recordKey); - }).map(recordKey -> - HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId, - instantTime, writesFileIdEncoding)).collect(toList())); - return toReturn.iterator(); + if (!bothRLIAndSIEnabled) { + // when only RLI is enabled, we only care about new inserts + toReturn.addAll(recordKeysFromLatestBaseFile.stream() + .filter(recordKey -> { + // new inserts + return !recordKeysFromPreviousBaseFile.contains(recordKey); + }).map(recordKey -> new RecordKeyInfo((String) recordKey, RecordKeyInfo.RecordStatus.INSERT, partition, fileId)).collect(toList())); + } else { + // we need info about both inserts and updates if both rli and si are enabled + toReturn.addAll(recordKeysFromLatestBaseFile.stream().map(recordKey -> { + if (recordKeysFromPreviousBaseFile.contains(recordKey)) { + pureInserts.set(false); // updates + } + return new RecordKeyInfo((String) recordKey, + !recordKeysFromPreviousBaseFile.contains(recordKey) ? RecordKeyInfo.RecordStatus.INSERT : RecordKeyInfo.RecordStatus.UPDATE, partition, fileId); + }).collect(toList())); + } + return new PerFileGroupRecordKeyInfos(pureInserts.get(), toReturn); + } } } @@ -94,4 +118,5 @@ private static Set getRecordKeysFromBaseFile(HoodieStorage storage, Stri FileFormatUtils fileFormatUtils = HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET); return fileFormatUtils.readRowKeys(storage, dataFilePath); } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 5626e88b0fe6..ad16d5402785 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -43,10 +43,12 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.function.SerializableBiFunction; +import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.FileSlice; @@ -127,6 +129,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -393,10 +396,6 @@ public static Map> convertMetadataToRecords(Hoo final HoodieData partitionStatsRDD = convertMetadataToPartitionStatsRecords(commitMetadata, context, dataMetaClient, metadataConfig); partitionToRecordsMap.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), partitionStatsRDD); } - if (enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX)) { - partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(), convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig, - dataMetaClient, writesFileIdEncoding, instantTime)); - } return partitionToRecordsMap; } @@ -769,7 +768,7 @@ public static HoodieData convertMetadataToColumnStatsRecords(Hoodi }); } - static HoodieData convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext, + /*static HoodieData convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext, HoodieCommitMetadata commitMetadata, HoodieMetadataConfig metadataConfig, HoodieTableMetaClient dataTableMetaClient, @@ -806,7 +805,17 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); // handle base files if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, writesFileIdEncoding, instantTime, storage); + return BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, storage) + .stream().map(recordKeyInfo -> { + if (recordKeyInfo.getRecordStatus() == RecordKeyInfo.RecordStatus.INSERT) { + return HoodieMetadataPayload.createRecordIndexUpdate(recordKeyInfo.getRecordKey(), recordKeyInfo.getPartition(), + recordKeyInfo.getFileId(), instantTime, writesFileIdEncoding); + } else if (recordKeyInfo.getRecordStatus() == RecordKeyInfo.RecordStatus.DELETE) { + return HoodieMetadataPayload.createRecordIndexDelete(recordKeyInfo.getRecordKey()); + } else { + throw new HoodieMetadataException("Unknown RecordKey status " + recordKeyInfo.getRecordStatus() +" for " + recordKeyInfo.getRecordKey()); + } + }).iterator(); } else { // for logs, we only need to process log files containing deletes if (writeStat.getNumDeletes() > 0) { @@ -839,6 +848,146 @@ static HoodieData convertMetadataToRecordIndexRecords(HoodieEngine } catch (Exception e) { throw new HoodieException("Failed to generate column stats records for metadata table", e); } + }*/ + + static HoodiePairData, PerFileGroupRecordKeyInfos> convertMetadataToRecordKeyInfo(HoodieEngineContext engineContext, + HoodieCommitMetadata commitMetadata, + HoodieMetadataConfig metadataConfig, + HoodieTableMetaClient dataTableMetaClient, + boolean bothRLIAndSIEnabled, + int parallelism) { + + List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + + if (allWriteStats.isEmpty()) { + return engineContext.emptyHoodiePairData(); + } + + try { + String basePath = dataTableMetaClient.getBasePath().toString(); + // we might need to set some additional variables if we need to process log files. + boolean hasLogFiles = allWriteStats.stream().anyMatch(writeStat -> { + String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); + return FSUtils.isLogFile(fileName); + }); + Option latestCommitTimestamp = Option.empty(); + Option writerSchemaOpt = Option.empty(); + if (hasLogFiles) { // if we have a log files, we might need to populate few additional supporting cast + latestCommitTimestamp = Option.of(dataTableMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime()); + writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); + } + int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); + StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); + Option finalWriterSchemaOpt = writerSchemaOpt; + Option finalLatestCommitTimestamp = latestCommitTimestamp; + + // fix me. we need to ensure we have one return value per combination of partition and fileId. If there are N log files added to same file slice, we could have N writeStats. + return engineContext.parallelize(allWriteStats, parallelism).mapToPair(new SerializablePairFunction, PerFileGroupRecordKeyInfos>() { + @Override + public Pair, PerFileGroupRecordKeyInfos> call(HoodieWriteStat writeStat) throws Exception { + HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); + String partition = writeStat.getPartitionPath(); + String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath()); + // handle base files + if (writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + String fileId = FSUtils.getFileId(latestFileName); + return Pair.of(Pair.of(partition, fileId), BaseFileUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath, writeStat, storage, bothRLIAndSIEnabled)); + } else { + String fileId = FSUtils.getFileIdFromLogPath(new StoragePath(writeStat.getPath())); + if (!bothRLIAndSIEnabled) { // only RLI enabled. + // for logs, we only need to process delete blocks + if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) { + Set deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient, + finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); + return Pair.of(Pair.of(partition, fileId), + new PerFileGroupRecordKeyInfos(false, + (deletedRecordKeys.stream().map(recordKey -> new RecordKeyInfo(recordKey, RecordKeyInfo.RecordStatus.DELETE, partition, fileId))) + .collect(toList()))); + } else { + // ignore log file data blocks. + return Pair.of(Pair.of(partition, fileId), new PerFileGroupRecordKeyInfos(false, Collections.emptyList())); + } + } else { + // incase of both RLI and SI enabled, we have to process regular data blocks and delete blocks as well. + if (writeStat.getNumInserts() == 0 && writeStat.getNumUpdateWrites() == 0 && writeStat.getNumDeletes() > 0) { + Set deletedRecordKeys = getDeletedRecordKeys(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), dataTableMetaClient, + finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); + return Pair.of(Pair.of(partition, fileId), + new PerFileGroupRecordKeyInfos(false, (deletedRecordKeys.stream().map(recordKey + -> new RecordKeyInfo(recordKey, RecordKeyInfo.RecordStatus.DELETE, partition, fileId))).collect(toList()))); + } else { + // data block. generate update records. + Set recordKeysFromLogFile = getRecordKeysFromLogFile(dataTableMetaClient.getBasePath().toString() + "/" + writeStat.getPath(), + dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, finalLatestCommitTimestamp.get()); + return Pair.of(Pair.of(partition, fileId), + new PerFileGroupRecordKeyInfos(false, (recordKeysFromLogFile.stream().map(recordKey + -> new RecordKeyInfo(recordKey, RecordKeyInfo.RecordStatus.UPDATE, partition, fileId))).collect(toList()))); + } + } + } + } + }); + } catch (Exception e) { + throw new HoodieException("Failed to generate record key info to assist w/ RLI and SI record generation for metadata table", e); + } + } + + static HoodieData generateRLIRecords(HoodieEngineContext engineContext, + HoodiePairData, PerFileGroupRecordKeyInfos> recordKeyInfoPairs, + List writeStatList, + String instantTime, Integer writesFileIdEncoding, + int parallelism) { + + HoodieData recordIndexRecords = engineContext.parallelize(writeStatList, parallelism) + .mapToPair( + (SerializablePairFunction, HoodieWriteStat>) writeStat -> { + String partition = writeStat.getPartitionPath(); + String latestFileName = getFileNameFromPath(writeStat.getPath()); + String fileId = (latestFileName.contains("log")) ? FSUtils.getFileIdFromLogPath(new StoragePath(writeStat.getPath())) + : FSUtils.getFileId(latestFileName); + return Pair.of(Pair.of(partition, fileId), writeStat); + }) + .leftOuterJoin(recordKeyInfoPairs) // recordKeyInfoPairs may not have entries for log files. So, lets do both HoodiePairData + .map((SerializableFunction, Pair>>, List>) v1 -> { + if (v1.getValue().getValue().isPresent()) { + // already generated record key info. We can re-use that to generate HoodieRecords + List recordKeyInfoList = v1.getValue().getValue().get().getRecordKeyInfoList(); + return recordKeyInfoList.stream().map(recordKeyInfo -> { + if (recordKeyInfo.getRecordStatus() == RecordKeyInfo.RecordStatus.INSERT) { + // add or update a record to RLI + return HoodieMetadataPayload.createRecordIndexUpdate(recordKeyInfo.getRecordKey(), recordKeyInfo.getPartition(), recordKeyInfo.getFileId(), + instantTime, writesFileIdEncoding); + } else if (recordKeyInfo.getRecordStatus() == RecordKeyInfo.RecordStatus.DELETE) { + // delete a record from RLI. + return HoodieMetadataPayload.createRecordIndexDelete(recordKeyInfo.getRecordKey()); + } else { + // ignore update records. + return null; + } + }).filter(record -> !Objects.isNull(record)).collect(toList()); + } else { + // represents a log file. for RLI, we don't need to process log files w/ data blocks. + return Collections.emptyList(); + } + }).flatMap((SerializableFunction, Iterator>) List::iterator); + + // there are chances that same record key from data table has 2 entries (1 delete from older partition and 1 insert to newer partition) + // lets do reduce by key to ignore the deleted entry. + return recordIndexRecords.mapToPair( + (SerializablePairFunction) t -> Pair.of(t.getKey(), t)) + .reduceByKey((SerializableBiFunction) (record1, record2) -> { + boolean isRecord1Deleted = record1.getData() instanceof EmptyHoodieRecordPayload; + boolean isRecord2Deleted = record2.getData() instanceof EmptyHoodieRecordPayload; + if (isRecord1Deleted && !isRecord2Deleted) { + return record2; + } else if (!isRecord1Deleted && isRecord2Deleted) { + return record1; + } else { + throw new HoodieIOException("Two HoodieRecord updates to RLI is seen for same record key " + record2.getRecordKey() + ", record 1 : " + + record1.getData().toString() + ", record 2 : " + record2.getData().toString()); + } + }, parallelism).values(); } private static void reAddLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime, @@ -1415,6 +1564,29 @@ private static Set getDeletedRecordKeys(String filePath, HoodieTableMeta return Collections.emptySet(); } + private static Set getRecordKeysFromLogFile(String filePath, HoodieTableMetaClient datasetMetaClient, + Option writerSchemaOpt, int maxBufferSize, + String latestCommitTimestamp) throws IOException { + if (writerSchemaOpt.isPresent()) { + // read log file records without merging + List records = new ArrayList<>(); + HoodieUnMergedLogRecordScanner scanner = HoodieUnMergedLogRecordScanner.newBuilder() + .withStorage(datasetMetaClient.getStorage()) + .withBasePath(datasetMetaClient.getBasePath()) + .withLogFilePaths(Collections.singletonList(filePath)) + .withBufferSize(maxBufferSize) + .withLatestInstantTime(latestCommitTimestamp) + .withReaderSchema(writerSchemaOpt.get()) + .withLogRecordScannerCallback(records::add) + .withTableMetaClient(datasetMetaClient) + .build(); + scanner.scan(); + // HoodieUnMergedLogRecordScanner will expose deleted record keys + return records.stream().map(record -> record.getRecordKey()).collect(Collectors.toSet()); + } + return Collections.emptySet(); + } + /** * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by * the {@link LogicalTypes.Decimal} Avro logical type @@ -2073,6 +2245,7 @@ public static HoodieData readSecondaryKeysFromBaseFiles(HoodieEngi readerSchema = tableSchema; } return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, readerSchema, partition, dataFilePath, indexDefinition, + metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(""), metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse("")); }); } @@ -2081,7 +2254,8 @@ public static HoodieData readSecondaryKeysFromFileSlices(HoodieEng List> partitionFileSlicePairs, int secondaryIndexMaxParallelism, String activeModule, HoodieTableMetaClient metaClient, EngineType engineType, - HoodieIndexDefinition indexDefinition) { + HoodieIndexDefinition indexDefinition, + String latestCommitTimestamp) { if (partitionFileSlicePairs.isEmpty()) { return engineContext.emptyHoodieData(); } @@ -2109,7 +2283,8 @@ public static HoodieData readSecondaryKeysFromFileSlices(HoodieEng readerSchema = tableSchema; } return createSecondaryIndexGenerator(metaClient, engineType, logFilePaths, readerSchema, partition, dataFilePath, indexDefinition, - metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse("")); + metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""), + latestCommitTimestamp); }); } @@ -2118,6 +2293,7 @@ private static ClosableIterator createSecondaryIndexGenerator(Hood Schema tableSchema, String partition, Option dataFilePath, HoodieIndexDefinition indexDefinition, + String latestInstantTime, String instantTime) throws Exception { final String basePath = metaClient.getBasePath().toString(); final StorageConfiguration storageConf = metaClient.getStorageConf(); @@ -2133,7 +2309,7 @@ private static ClosableIterator createSecondaryIndexGenerator(Hood .withBasePath(metaClient.getBasePath()) .withLogFilePaths(logFilePaths) .withReaderSchema(tableSchema) - .withLatestInstantTime(instantTime) + .withLatestInstantTime(latestInstantTime) .withReverseReader(false) .withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) .withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue()) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/PerFileGroupRecordKeyInfos.java b/hudi-common/src/main/java/org/apache/hudi/metadata/PerFileGroupRecordKeyInfos.java new file mode 100644 index 000000000000..d3ce898853ef --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/PerFileGroupRecordKeyInfos.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.List; + +public class PerFileGroupRecordKeyInfos implements Serializable { + private boolean pureInserts; + private List recordKeyInfoList; + + public PerFileGroupRecordKeyInfos(boolean pureInserts, List recordKeyInfoList) { + this.pureInserts = pureInserts; + this.recordKeyInfoList = recordKeyInfoList; + } + + public boolean isPureInserts() { + return pureInserts; + } + + public List getRecordKeyInfoList() { + return recordKeyInfoList; + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/RecordKeyInfo.java b/hudi-common/src/main/java/org/apache/hudi/metadata/RecordKeyInfo.java new file mode 100644 index 000000000000..fd3ea28ae997 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/RecordKeyInfo.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import java.io.Serializable; +import java.util.Objects; + +public class RecordKeyInfo implements Serializable { + private final String recordKey; + private final RecordStatus recordStatus; + private String partition; + private String fileId; + + public RecordKeyInfo(String recordKey, RecordStatus recordStatus, String partition, String fileId) { + this.recordKey = recordKey; + this.recordStatus = recordStatus; + this.partition = partition; + this.fileId = fileId; + } + + public String getRecordKey() { + return recordKey; + } + + public RecordStatus getRecordStatus() { + return recordStatus; + } + + public String getPartition() { + return partition; + } + + public String getFileId() { + return fileId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RecordKeyInfo)) { + return false; + } + RecordKeyInfo that = (RecordKeyInfo) o; + return getRecordKey().equals(that.getRecordKey()) && getRecordStatus() == that.getRecordStatus() + && Objects.equals(getPartition(), that.getPartition()) && Objects.equals(getFileId(), that.getFileId()); + } + + @Override + public int hashCode() { + return Objects.hash(getRecordKey(), getRecordStatus(), getPartition(), getFileId()); + } + + public static enum RecordStatus { + INSERT, + UPDATE, + DELETE + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala index 606a5186faf6..9db14994a6c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala @@ -1101,13 +1101,13 @@ class TestSecondaryIndexPruning extends SparkClientFunctionalTestHarness { Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false)) // update the secondary index by delete. - spark.sql(s"delete from $tableName where record_key_col = 'row3'") + spark.sql(s"delete from $tableName where record_key_col = 'row1'") confirmLastCommitType(ActionType.replacecommit) // validate the secondary index records themselves checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from hudi_metadata('$basePath') where type=7")( + Seq(s"def${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row3", false), Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), - Seq(s"fgh${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false), - Seq(s"xyz${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row1", false) + Seq(s"fgh${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false) ) } }