Skip to content

Commit

Permalink
[Improvement] Add iceberg metadata cache and support manifest file co…
Browse files Browse the repository at this point in the history
…ntent cache (apache#22336)

Cache the iceberg table. When accessing the same table, the metadata will only be loaded once.
Cache the snapshot of the table to optimize the performance of the iceberg table function.
Add cache support for iceberg's manifest file content
a simple test from 2.0s to 0.8s

before
mysql> refresh table tb3;
Query OK, 0 rows affected (0.03 sec)

mysql> select * from tb3;
+------+------+------+
| id   | par  | data |
+------+------+------+
|    1 | a    | a    |
|    2 | a    | b    |
|    3 | a    | c    |
....
|   68 | a    | a    |
|   69 | a    | b    |
|   70 | a    | c    |
+------+------+------+
70 rows in set (2.10 sec)

mysql> select * from tb3;
+------+------+------+
| id   | par  | data |
+------+------+------+
|    1 | a    | a    |
|    2 | a    | b    |
|    3 | a    | c    |
...
|   68 | a    | a    |
|   69 | a    | b    |
|   70 | a    | c    |
+------+------+------+
70 rows in set (2.00 sec)

after
mysql> refresh table tb3;
Query OK, 0 rows affected (0.03 sec)

mysql> select * from tb3;
+------+------+------+
| id   | par  | data |
+------+------+------+
|    1 | a    | a    |
|    2 | a    | b    |
...
|   68 | a    | a    |
|   69 | a    | b    |
|   70 | a    | c    |
+------+------+------+
70 rows in set (2.05 sec)

mysql> select * from tb3;
+------+------+------+
| id   | par  | data |
+------+------+------+
|    1 | a    | a    |
|    2 | a    | b    |
|    3 | a    | c    |
...
|   68 | a    | a    |
|   69 | a    | b    |
|   70 | a    | c    |
+------+------+------+
70 rows in set (0.80 sec)
  • Loading branch information
wuwenchi authored and xiaokang committed Aug 8, 2023
1 parent 3c3f8b7 commit b1adc8c
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import shade.doris.hive.org.apache.thrift.TException;
Expand All @@ -90,7 +89,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -896,20 +894,6 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table
return output.toString();
}

public static org.apache.iceberg.Table getIcebergTable(HMSExternalTable table) {
String metastoreUri = table.getMetastoreUri();
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
Configuration conf = getConfiguration(table);
hiveCatalog.setConf(conf);
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, metastoreUri);
catalogProperties.put("uri", metastoreUri);
hiveCatalog.initialize("hive", catalogProperties);

return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));
}

public static Schema getHudiTableSchema(HMSExternalTable table) {
HoodieTableMetaClient metaClient = getHudiClient(table);
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public long estimatedRowCount() {
}

private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this);
Table icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this);
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
Expand Down Expand Up @@ -470,7 +470,8 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
case HIVE:
return getHiveColumnStats(colName);
case ICEBERG:
return StatisticsUtil.getIcebergColumnStats(colName, HiveMetaStoreClientHelper.getIcebergTable(this));
return StatisticsUtil.getIcebergColumnStats(colName,
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
default:
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.planner.external.hudi.HudiPartitionMgr;
import org.apache.doris.planner.external.hudi.HudiPartitionProcessor;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
import org.apache.doris.planner.external.iceberg.IcebergMetadataCacheMgr;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
Expand All @@ -52,6 +54,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService executor;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;

public ExternalMetaCacheMgr() {
executor = ThreadPoolManager.newDaemonFixedThreadPool(
Expand All @@ -60,6 +63,7 @@ public ExternalMetaCacheMgr() {
"ExternalMetaCacheMgr", 120, true);
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
}

public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
Expand Down Expand Up @@ -92,6 +96,10 @@ public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
return hudiPartitionMgr.getPartitionProcessor(catalog);
}

public IcebergMetadataCache getIcebergMetadataCache() {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}

public FileSystemCache getFsCache() {
return fsCache;
}
Expand All @@ -104,6 +112,7 @@ public void removeCache(long catalogId) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
}

public void invalidateTableCache(long catalogId, String dbName, String tblName) {
Expand All @@ -117,6 +126,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
metaCache.invalidateTableCache(dbName, tblName);
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId);
}

Expand All @@ -131,6 +141,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
metaCache.invalidateDbCache(dbName);
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}

Expand All @@ -144,6 +155,7 @@ public void invalidateCatalogCache(long catalogId) {
metaCache.invalidateAll();
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
LOG.debug("invalid catalog cache for {}", catalogId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -48,9 +49,11 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
protected String icebergCatalogType;
protected Catalog catalog;
protected SupportsNamespaces nsCatalog;
private final long catalogId;

public IcebergExternalCatalog(long catalogId, String name, String comment) {
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
this.catalogId = catalogId;
}

@Override
Expand Down Expand Up @@ -113,6 +116,9 @@ public List<String> listTableNames(SessionContext ctx, String dbName) {

public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) {
makeSureInitialized();
return catalog.loadTable(TableIdentifier.of(dbName, tblName));
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
.getIcebergTable(catalog, catalogId, dbName, tblName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.planner.external.iceberg;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.MetaNotFoundException;
Expand Down Expand Up @@ -45,8 +46,13 @@ public class IcebergApiSource implements IcebergSource {
public IcebergApiSource(IcebergExternalTable table, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.icebergExtTable = table;
this.originTable = ((IcebergExternalCatalog) icebergExtTable.getCatalog())
.getIcebergTable(icebergExtTable.getDbName(), icebergExtTable.getName());

this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
((IcebergExternalCatalog) icebergExtTable.getCatalog()).getCatalog(),
icebergExtTable.getCatalog().getId(),
icebergExtTable.getDbName(),
icebergExtTable.getName());

this.desc = desc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.planner.external.iceberg;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
Expand All @@ -39,12 +39,15 @@ public class IcebergHMSSource implements IcebergSource {
private final HMSExternalTable hmsTable;
private final TupleDescriptor desc;
private final Map<String, ColumnRange> columnNameToRange;
private final org.apache.iceberg.Table icebergTable;

public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
this.icebergTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable);
}

@Override
Expand All @@ -59,7 +62,7 @@ public String getFileFormat() throws DdlException, MetaNotFoundException {
}

public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
return HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
return icebergTable;
}

@Override
Expand Down
Loading

0 comments on commit b1adc8c

Please sign in to comment.