Skip to content

Commit

Permalink
[HUDI-7645] Optimize BQ sync tool for MDT (#11065)
Browse files Browse the repository at this point in the history
  • Loading branch information
wombatu-kun authored and yihua committed May 15, 2024
1 parent d42f399 commit 5007231
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
public class BigQuerySyncTool extends HoodieSyncTool {

private static final Logger LOG = LoggerFactory.getLogger(BigQuerySyncTool.class);
private static final String SUFFIX_MANIFEST = "_manifest";
private static final String SUFFIX_VERSIONS = "_versions";

private final BigQuerySyncConfig config;
private final String tableName;
Expand All @@ -70,8 +72,8 @@ public BigQuerySyncTool(Properties props) {
super(props);
this.config = new BigQuerySyncConfig(props);
this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
this.manifestTableName = tableName + "_manifest";
this.versionsTableName = tableName + "_versions";
this.manifestTableName = tableName + SUFFIX_MANIFEST;
this.versionsTableName = tableName + SUFFIX_VERSIONS;
this.snapshotViewName = tableName;
this.bqSyncClient = new HoodieBigQuerySyncClient(config);
// reuse existing meta client if not provided (only test cases will provide their own meta client)
Expand All @@ -86,8 +88,8 @@ public BigQuerySyncTool(Properties props) {
super(properties);
this.config = new BigQuerySyncConfig(props);
this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);
this.manifestTableName = tableName + "_manifest";
this.versionsTableName = tableName + "_versions";
this.manifestTableName = tableName + SUFFIX_MANIFEST;
this.versionsTableName = tableName + SUFFIX_VERSIONS;
this.snapshotViewName = tableName;
this.bqSyncClient = bigQuerySyncClient;
this.metaClient = metaClient;
Expand Down Expand Up @@ -117,7 +119,7 @@ public void syncHoodieTable() {

private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) {
if (bqSyncClient.tableExists(tableName)) {
LOG.info(tableName + " already exists. Skip table creation.");
LOG.info("{} already exists. Skip table creation.", tableName);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) {
LOG.warn("No base file to generate manifest file.");
return;
} else {
LOG.info("Writing base file names to manifest file: " + baseFiles.size());
LOG.info("Writing base file names to manifest file: {}", baseFiles.size());
}
final StoragePath manifestFilePath = getManifestFilePath(useAbsolutePath);
try (OutputStream outputStream = metaClient.getStorage().create(manifestFilePath, true);
Expand All @@ -87,15 +87,23 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) {
public static Stream<String> fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
boolean useFileListingFromMetadata, boolean assumeDatePartitioning, boolean useAbsolutePath) {
try {
List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()),
metaClient.getBasePath(), useFileListingFromMetadata, assumeDatePartitioning);
LOG.info("Retrieve all partitions: " + partitions.size());
Configuration hadoopConf = metaClient.getHadoopConf();
HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build());
return partitions.parallelStream().flatMap(partition -> fsView.getLatestBaseFiles(partition).map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName));
Stream<HoodieBaseFile> allLatestBaseFiles;
if (useFileListingFromMetadata) {
LOG.info("Fetching all base files from MDT.");
fsView.loadAllPartitions();
allLatestBaseFiles = fsView.getLatestBaseFiles();
} else {
List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()),
metaClient.getBasePathV2().toString(), false, assumeDatePartitioning);
LOG.info("Retrieve all partitions from fs: {}", partitions.size());
allLatestBaseFiles = partitions.parallelStream().flatMap(fsView::getLatestBaseFiles);
}
return allLatestBaseFiles.map(useAbsolutePath ? HoodieBaseFile::getPath : HoodieBaseFile::getFileName);
} catch (Exception e) {
throw new HoodieException("Error in fetching latest base files.", e);
}
Expand Down

0 comments on commit 5007231

Please sign in to comment.