Skip to content

Commit

Permalink
[HUDI-7645] Optimize BQ sync tool for MDT (apache#11065)
Browse files Browse the repository at this point in the history
  • Loading branch information
wombatu-kun authored Apr 25, 2024
1 parent 9d689fd commit 835d473
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
public class BigQuerySyncTool extends HoodieSyncTool {

private static final Logger LOG = LoggerFactory.getLogger(BigQuerySyncTool.class);
private static final String SUFFIX_MANIFEST = "_manifest";
private static final String SUFFIX_VERSIONS = "_versions";

private final BigQuerySyncConfig config;
private final String tableName;
Expand All @@ -69,8 +71,8 @@ public BigQuerySyncTool(Properties props) {
super(props);
this.config = new BigQuerySyncConfig(props);
this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
this.manifestTableName = tableName + "_manifest";
this.versionsTableName = tableName + "_versions";
this.manifestTableName = tableName + SUFFIX_MANIFEST;
this.versionsTableName = tableName + SUFFIX_VERSIONS;
this.snapshotViewName = tableName;
this.bqSyncClient = new HoodieBigQuerySyncClient(config);
// reuse existing meta client if not provided (only test cases will provide their own meta client)
Expand All @@ -85,8 +87,8 @@ public BigQuerySyncTool(Properties props) {
super(properties);
this.config = new BigQuerySyncConfig(props);
this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
this.manifestTableName = tableName + "_manifest";
this.versionsTableName = tableName + "_versions";
this.manifestTableName = tableName + SUFFIX_MANIFEST;
this.versionsTableName = tableName + SUFFIX_VERSIONS;
this.snapshotViewName = tableName;
this.bqSyncClient = bigQuerySyncClient;
this.metaClient = metaClient;
Expand Down Expand Up @@ -115,7 +117,7 @@ public void syncHoodieTable() {

private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) {
if (bqSyncClient.tableExists(tableName)) {
LOG.info(tableName + " already exists. Skip table creation.");
LOG.info("{} already exists. Skip table creation.", tableName);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) {
LOG.warn("No base file to generate manifest file.");
return;
} else {
LOG.info("Writing base file names to manifest file: " + baseFiles.size());
LOG.info("Writing base file names to manifest file: {}", baseFiles.size());
}
final StoragePath manifestFilePath = getManifestFilePath(useAbsolutePath);
try (OutputStream outputStream = metaClient.getStorage().create(manifestFilePath, true);
Expand All @@ -85,16 +85,23 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) {
public static Stream<String> fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
boolean useFileListingFromMetadata, boolean useAbsolutePath) {
try {
List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()),
metaClient.getBasePath(), useFileListingFromMetadata);
LOG.info("Retrieve all partitions: " + partitions.size());

Configuration hadoopConf = metaClient.getHadoopConf();
HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).build());
return partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName));
Stream<HoodieBaseFile> allLatestBaseFiles;
if (useFileListingFromMetadata) {
LOG.info("Fetching all base files from MDT.");
fsView.loadAllPartitions();
allLatestBaseFiles = fsView.getLatestBaseFiles();
} else {
List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()),
metaClient.getBasePathV2().toString(), false);
LOG.info("Retrieve all partitions from fs: {}", partitions.size());
allLatestBaseFiles = partitions.parallelStream().flatMap(fsView::getLatestBaseFiles);
}
return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName);
} catch (Exception e) {
throw new HoodieException("Error in fetching latest base files.", e);
}
Expand Down

0 comments on commit 835d473

Please sign in to comment.