diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java index 6e064dd59c68..466627dc701c 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java @@ -54,6 +54,8 @@ public class BigQuerySyncTool extends HoodieSyncTool { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySyncTool.class); + private static final String SUFFIX_MANIFEST = "_manifest"; + private static final String SUFFIX_VERSIONS = "_versions"; private final BigQuerySyncConfig config; private final String tableName; @@ -70,8 +72,8 @@ public BigQuerySyncTool(Properties props) { super(props); this.config = new BigQuerySyncConfig(props); this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME); - this.manifestTableName = tableName + "_manifest"; - this.versionsTableName = tableName + "_versions"; + this.manifestTableName = tableName + SUFFIX_MANIFEST; + this.versionsTableName = tableName + SUFFIX_VERSIONS; this.snapshotViewName = tableName; this.bqSyncClient = new HoodieBigQuerySyncClient(config); // reuse existing meta client if not provided (only test cases will provide their own meta client) @@ -86,8 +88,8 @@ public BigQuerySyncTool(Properties props) { super(properties); this.config = new BigQuerySyncConfig(props); this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME); - this.manifestTableName = tableName + "_manifest"; - this.versionsTableName = tableName + "_versions"; + this.manifestTableName = tableName + SUFFIX_MANIFEST; + this.versionsTableName = tableName + SUFFIX_VERSIONS; this.snapshotViewName = tableName; this.bqSyncClient = bigQuerySyncClient; this.metaClient = metaClient; @@ -117,7 +119,7 @@ public void syncHoodieTable() { private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) { if (bqSyncClient.tableExists(tableName)) { - LOG.info(tableName + " already exists. Skip table creation."); + LOG.info("{} already exists. Skip table creation.", tableName); return true; } return false; diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java index ae7580fa9f3e..6f7f4bb2c1f1 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -69,7 +69,7 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) { LOG.warn("No base file to generate manifest file."); return; } else { - LOG.info("Writing base file names to manifest file: " + baseFiles.size()); + LOG.info("Writing base file names to manifest file: {}", baseFiles.size()); } final StoragePath manifestFilePath = getManifestFilePath(useAbsolutePath); try (OutputStream outputStream = metaClient.getStorage().create(manifestFilePath, true); @@ -87,15 +87,23 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) { public static Stream fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient, boolean useFileListingFromMetadata, boolean assumeDatePartitioning, boolean useAbsolutePath) { try { - List partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()), - metaClient.getBasePath(), useFileListingFromMetadata, assumeDatePartitioning); - LOG.info("Retrieve all partitions: " + partitions.size()); Configuration hadoopConf = metaClient.getHadoopConf(); HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf); HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build()); - return partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName)); + Stream allLatestBaseFiles; + if (useFileListingFromMetadata) { + LOG.info("Fetching all base files from MDT."); + fsView.loadAllPartitions(); + allLatestBaseFiles = fsView.getLatestBaseFiles(); + } else { + List partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()), + metaClient.getBasePathV2().toString(), false, assumeDatePartitioning); + LOG.info("Retrieve all partitions from fs: {}", partitions.size()); + allLatestBaseFiles = partitions.parallelStream().flatMap(fsView::getLatestBaseFiles); + } + return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName); } catch (Exception e) { throw new HoodieException("Error in fetching latest base files.", e); }