From 50d7cf9e35163c98b5641eee96693673904caf0a Mon Sep 17 00:00:00 2001 From: slothever Date: Mon, 22 Jan 2024 10:04:34 +0800 Subject: [PATCH] [feature](multi-catalog)support hms catalog create and drop table/db --- .../java/org/apache/doris/catalog/Env.java | 8 +- .../doris/datasource/CatalogDatabase.java | 28 ++++ .../apache/doris/datasource/CatalogIf.java | 19 ++- .../apache/doris/datasource/CatalogTable.java | 29 ++++ .../doris/datasource/ExternalCatalog.java | 23 +++ .../doris/datasource/HMSExternalCatalog.java | 133 +++++++++++++++++- .../datasource/hive/HMSCachedClient.java | 10 ++ .../datasource/hive/HiveCatalogDatabase.java | 32 +++++ .../datasource/hive/HiveCatalogTable.java | 37 +++++ .../hive/PostgreSQLJdbcHMSCachedClient.java | 19 +++ .../hive/ThriftHMSCachedClient.java | 106 ++++++++++++++ 11 files changed, 436 insertions(+), 8 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogDatabase.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogTable.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogDatabase.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogTable.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1ad131abb0ae18..3ac2ef8e5bb10b 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2887,7 +2887,7 @@ public Frontend getFeByName(String name) { // The interface which DdlExecutor needs. public void createDb(CreateDbStmt stmt) throws DdlException { - getInternalCatalog().createDb(stmt); + getCurrentCatalog().createDb(stmt); } // For replay edit log, need't lock metadata @@ -2900,7 +2900,7 @@ public void replayCreateDb(Database db) { } public void dropDb(DropDbStmt stmt) throws DdlException { - getInternalCatalog().dropDb(stmt); + getCurrentCatalog().dropDb(stmt); } public void replayDropDb(String dbName, boolean isForceDrop, Long recycleTime) throws DdlException { @@ -2972,7 +2972,7 @@ public void replayRenameDatabase(String dbName, String newDbName) { * 11. add this table to ColocateGroup if necessary */ public void createTable(CreateTableStmt stmt) throws UserException { - getInternalCatalog().createTable(stmt); + getCurrentCatalog().createTable(stmt); } public void createTableLike(CreateTableLikeStmt stmt) throws DdlException { @@ -3573,7 +3573,7 @@ public void replayAlterExternalTableSchema(String dbName, String tableName, List // Drop table public void dropTable(DropTableStmt stmt) throws DdlException { - getInternalCatalog().dropTable(stmt); + getCurrentCatalog().dropTable(stmt); } public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogDatabase.java new file mode 100644 index 00000000000000..27fa54fa5785db --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogDatabase.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + + +import java.util.Map; + +public interface CatalogDatabase { + + String getDbName(); + + Map getProperties(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index c8f3033c6688ab..ffae9420edeb48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -17,6 +17,10 @@ package org.apache.doris.datasource; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; @@ -24,6 +28,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -172,9 +177,17 @@ default CatalogLog constructEditLog() { } // Return a copy of all db collection. - public Collection> getAllDbs(); + Collection> getAllDbs(); - public boolean enableAutoAnalyze(); + boolean enableAutoAnalyze(); - public ConcurrentHashMap getIdToDb(); + ConcurrentHashMap getIdToDb(); + + void createDb(CreateDbStmt stmt) throws DdlException; + + void dropDb(DropDbStmt stmt) throws DdlException; + + void createTable(CreateTableStmt stmt) throws UserException; + + void dropTable(DropTableStmt stmt) throws DdlException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogTable.java new file mode 100644 index 00000000000000..98c97dda172ab8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogTable.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import java.util.Map; + +public interface CatalogTable { + + String getDbName(); + + String getTableName(); + + Map getProperties(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 0e12e942c9bb68..af72c087f51451 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -17,6 +17,10 @@ package org.apache.doris.datasource; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -33,6 +37,7 @@ import org.apache.doris.catalog.external.TestExternalDatabase; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; @@ -550,6 +555,24 @@ public void addDatabaseForTest(ExternalDatabase db) { dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); } + public void createDb(CreateDbStmt stmt) throws DdlException { + throw new NotImplementedException("dropDatabase not implemented"); + } + + public void dropDb(DropDbStmt stmt) throws DdlException { + throw new NotImplementedException("dropDatabase not implemented"); + } + + @Override + public void createTable(CreateTableStmt stmt) throws UserException { + throw new NotImplementedException("createTable not implemented"); + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + throw new NotImplementedException("dropTable not implemented"); + } + public void dropDatabaseForReplay(String dbName) { throw new NotImplementedException("dropDatabase not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index ae6b5f04738b07..ca979a11aa19d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -17,6 +17,10 @@ package org.apache.doris.datasource; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.catalog.AuthType; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; @@ -25,8 +29,13 @@ import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.QueryableReentrantLock; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HMSCachedClientFactory; +import org.apache.doris.datasource.hive.HiveCatalogDatabase; +import org.apache.doris.datasource.hive.HiveCatalogTable; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.property.PropertyConverter; @@ -39,22 +48,24 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * External catalog for hive metastore compatible data sources. */ public class HMSExternalCatalog extends ExternalCatalog { private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class); - private static final int MIN_CLIENT_POOL_SIZE = 8; protected HMSCachedClient client; // Record the latest synced event id when processing hive events @@ -71,6 +82,7 @@ public class HMSExternalCatalog extends ExternalCatalog { public static final int FILE_META_CACHE_NO_TTL = -1; // 0 means file cache is disabled; >0 means file cache with ttl; public static final int FILE_META_CACHE_TTL_DISABLE_CACHE = 0; + private QueryableReentrantLock lock = new QueryableReentrantLock(true); public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); @@ -307,6 +319,125 @@ public void notifyPropertiesUpdated(Map updatedProps) { } } + private boolean tryLock(boolean mustLock) { + while (true) { + try { + if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, TimeUnit.MILLISECONDS)) { + // to see which thread held this lock for long time. + Thread owner = lock.getOwner(); + if (owner != null) { + // There are many catalog timeout during regression test + // And this timeout should not happen very often, so it could be info log + LOG.info("catalog lock is held by: {}", Util.dumpThread(owner, 10)); + } + + if (mustLock) { + continue; + } else { + return false; + } + } + return true; + } catch (InterruptedException e) { + LOG.warn("got exception while getting catalog lock", e); + if (mustLock) { + continue; + } else { + return lock.isHeldByCurrentThread(); + } + } + } + } + + private void unlock() { + if (lock.isHeldByCurrentThread()) { + this.lock.unlock(); + } + } + + @Override + public void createDb(CreateDbStmt stmt) throws DdlException { + String fullDbName = stmt.getFullDbName(); + Map properties = stmt.getProperties(); + long id = Env.getCurrentEnv().getNextId(); + + if (!tryLock(false)) { + throw new DdlException("Failed to acquire catalog lock. Try again"); + } + try { + HiveCatalogDatabase catalogDatabase = new HiveCatalogDatabase(); + catalogDatabase.setDbName(fullDbName); + catalogDatabase.setProperties(properties); + if (properties.containsKey("location_uri")) { + catalogDatabase.setLocationUri(properties.get("location_uri")); + } + catalogDatabase.setComment(properties.getOrDefault("comment", "")); + client.createDatabase(catalogDatabase); + createDatabaseForReplay(id, fullDbName); + } finally { + unlock(); + } + LOG.info("createDb dbName = " + fullDbName + ", id = " + id); + } + + public void dropDb(DropDbStmt stmt) throws DdlException { + if (!tryLock(false)) { + throw new DdlException("Failed to acquire catalog lock. Try again"); + } + try { + client.dropDatabase(stmt.getDbName()); + } finally { + unlock(); + } + } + + @Override + public void createTable(CreateTableStmt stmt) throws UserException { + if (!tryLock(false)) { + throw new DdlException("Failed to acquire catalog lock. Try again"); + } + try { + HiveCatalogTable catalogTable = new HiveCatalogTable(); + catalogTable.setDbName(stmt.getDbName()); + catalogTable.setTableName(stmt.getTableName()); + Map props = stmt.getExtProperties(); + catalogTable.setProperties(props); + String inputFormat = props.getOrDefault("input_format", + "org.apache.hadoop.mapred.TextInputFormat"); + String outputFormat = props.getOrDefault("output_format", + "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"); + catalogTable.setInputFormat(inputFormat); + catalogTable.setOutputFormat(outputFormat); + catalogTable.setPartitionKeys(parsePartitionKeys(props)); + client.createTable(catalogTable, stmt.isSetIfNotExists()); + } finally { + unlock(); + } + } + + private static List parsePartitionKeys(Map props) { + List parsedKeys = new ArrayList<>(); + String pkStr = props.getOrDefault("partition_keys", ""); + if (pkStr.isEmpty()) { + return parsedKeys; + } else { + // TODO: parse string to partition keys list + return parsedKeys; + } + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + if (!tryLock(false)) { + throw new DdlException("Failed to acquire catalog lock. Try again"); + } + try { + client.dropTable(stmt.getDbName(), stmt.getTableName()); + } finally { + unlock(); + } + } + @Override public void setDefaultPropsWhenCreating(boolean isReplay) { if (isReplay) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 6e7f45aaa415c6..a5050613ba6936 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -18,6 +18,8 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.datasource.CatalogDatabase; +import org.apache.doris.datasource.CatalogTable; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -44,6 +46,14 @@ public interface HMSCachedClient { List getAllTables(String dbName); + void createDatabase(CatalogDatabase database); + + void dropDatabase(String dbName); + + void createTable(CatalogTable hiveTable, boolean ignoreIfExists); + + void dropTable(String dbName, String tblName); + boolean tableExists(String dbName, String tblName); List listPartitionNames(String dbName, String tblName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogDatabase.java new file mode 100644 index 00000000000000..f2917996fba376 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogDatabase.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.datasource.CatalogDatabase; + +import lombok.Data; + +import java.util.Map; + +@Data +public class HiveCatalogDatabase implements CatalogDatabase { + private String dbName; + private String locationUri; + private Map properties; + private String comment; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogTable.java new file mode 100644 index 00000000000000..3cc535cbc59062 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCatalogTable.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.datasource.CatalogTable; + +import lombok.Data; +import org.apache.hadoop.hive.metastore.api.FieldSchema; + +import java.util.List; +import java.util.Map; + +@Data +public class HiveCatalogTable implements CatalogTable { + private String dbName; + private String tableName; + private List partitionKeys; + private String inputFormat; + private String outputFormat; + private Map properties; + private String viewSql; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index afbd929c5a1417..9f83fb1bea2bb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -20,6 +20,8 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.CatalogDatabase; +import org.apache.doris.datasource.CatalogTable; import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; @@ -30,6 +32,7 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -474,4 +477,20 @@ protected String getDatabaseQuery() { protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } + + public void createDatabase(CatalogDatabase database) { + throw new NotImplementedException("PostgreSQL createDatabase not implemented"); + } + + public void dropDatabase(String dbName) { + throw new NotImplementedException("PostgreSQL dropDatabase not implemented"); + } + + public void createTable(CatalogTable hiveTable, boolean ignoreIfExists) { + throw new NotImplementedException("PostgreSQL createTable not implemented"); + } + + public void dropTable(String dbName, String tblName) { + throw new NotImplementedException("PostgreSQL dropTable not implemented"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index abb3fda24b3605..d23ba71edf6f3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -19,6 +19,8 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.common.Config; +import org.apache.doris.datasource.CatalogDatabase; +import org.apache.doris.datasource.CatalogTable; import org.apache.doris.datasource.HMSClientException; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.property.constants.HMSProperties; @@ -27,6 +29,7 @@ import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -50,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -58,6 +62,7 @@ import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -117,6 +122,107 @@ public List getAllTables(String dbName) { } } + @Override + public void createDatabase(CatalogDatabase db) { + try (ThriftHMSClient client = getClient()) { + try { + if (db instanceof HiveCatalogDatabase) { + HiveCatalogDatabase hiveDb = (HiveCatalogDatabase) db; + Database database = new Database(); + database.setName(hiveDb.getDbName()); + if (StringUtils.isNotEmpty(hiveDb.getLocationUri())) { + database.setLocationUri(hiveDb.getLocationUri()); + } + database.setParameters(hiveDb.getProperties()); + database.setDescription(hiveDb.getComment()); + client.client.createDatabase(database); + } + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to create database from hms client", e); + } + } + + @Override + public void createTable(CatalogTable tbl, boolean ignoreIfExists) { + if (tableExists(tbl.getDbName(), tbl.getTableName())) { + return; + } + try (ThriftHMSClient client = getClient()) { + try { + // sd: List cols, + // String location, + // String inputFormat, + // String outputFormat, + // Map parameters + // parameters.put("", "doris created") + if (tbl instanceof HiveCatalogTable) { + client.client.createTable(toHiveTable((HiveCatalogTable) tbl)); + } + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to create database from hms client", e); + } + } + + private static Table toHiveTable(HiveCatalogTable hiveTable) { + Table table = new Table(); + table.setDbName(hiveTable.getDbName()); + table.setTableName(hiveTable.getTableName()); + // table.setOwner(""); + int createTime = (int) System.currentTimeMillis() * 1000; + table.setCreateTime(createTime); + table.setLastAccessTime(createTime); + // table.setRetention(0); + StorageDescriptor sd = new StorageDescriptor(); + sd.setInputFormat(hiveTable.getInputFormat()); + sd.setOutputFormat(hiveTable.getOutputFormat()); + Map parameters = new HashMap<>(); + parameters.put("tag", "doris created"); + sd.setParameters(parameters); + table.setSd(sd); + table.setPartitionKeys(hiveTable.getPartitionKeys()); + // table.setViewOriginalText(hiveTable.getViewSql()); + // table.setViewExpandedText(hiveTable.getViewSql()); + table.setTableType("MANAGED_TABLE"); + table.setParameters(hiveTable.getProperties()); + return table; + } + + @Override + public void dropDatabase(String dbName) { + try (ThriftHMSClient client = getClient()) { + try { + client.client.dropDatabase(dbName); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to drop database from hms client", e); + } + } + + @Override + public void dropTable(String dbName, String tblName) { + try (ThriftHMSClient client = getClient()) { + try { + client.client.dropTable(dbName, tblName); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to drop database from hms client", e); + } + } + @Override public boolean tableExists(String dbName, String tblName) { try (ThriftHMSClient client = getClient()) {