Skip to content

Commit

Permalink
[feature](multi-catalog)support hms catalog create and drop table/db
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Jan 22, 2024
1 parent d6dcf96 commit 50d7cf9
Show file tree
Hide file tree
Showing 11 changed files with 436 additions and 8 deletions.
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -2887,7 +2887,7 @@ public Frontend getFeByName(String name) {

// The interface which DdlExecutor needs.
public void createDb(CreateDbStmt stmt) throws DdlException {
getInternalCatalog().createDb(stmt);
getCurrentCatalog().createDb(stmt);
}

// For replay edit log, need't lock metadata
Expand All @@ -2900,7 +2900,7 @@ public void replayCreateDb(Database db) {
}

public void dropDb(DropDbStmt stmt) throws DdlException {
getInternalCatalog().dropDb(stmt);
getCurrentCatalog().dropDb(stmt);
}

public void replayDropDb(String dbName, boolean isForceDrop, Long recycleTime) throws DdlException {
Expand Down Expand Up @@ -2972,7 +2972,7 @@ public void replayRenameDatabase(String dbName, String newDbName) {
* 11. add this table to ColocateGroup if necessary
*/
public void createTable(CreateTableStmt stmt) throws UserException {
getInternalCatalog().createTable(stmt);
getCurrentCatalog().createTable(stmt);
}

public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
Expand Down Expand Up @@ -3573,7 +3573,7 @@ public void replayAlterExternalTableSchema(String dbName, String tableName, List

// Drop table
public void dropTable(DropTableStmt stmt) throws DdlException {
getInternalCatalog().dropTable(stmt);
getCurrentCatalog().dropTable(stmt);
}

public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;


import java.util.Map;

public interface CatalogDatabase {

String getDbName();

Map<String, String> getProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package org.apache.doris.datasource;

import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -172,9 +177,17 @@ default CatalogLog constructEditLog() {
}

// Return a copy of all db collection.
public Collection<DatabaseIf<? extends TableIf>> getAllDbs();
Collection<DatabaseIf<? extends TableIf>> getAllDbs();

public boolean enableAutoAnalyze();
boolean enableAutoAnalyze();

public ConcurrentHashMap<Long, DatabaseIf> getIdToDb();
ConcurrentHashMap<Long, DatabaseIf> getIdToDb();

void createDb(CreateDbStmt stmt) throws DdlException;

void dropDb(DropDbStmt stmt) throws DdlException;

void createTable(CreateTableStmt stmt) throws UserException;

void dropTable(DropTableStmt stmt) throws DdlException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import java.util.Map;

public interface CatalogTable {

String getDbName();

String getTableName();

Map<String, String> getProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.doris.datasource;

import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
Expand All @@ -33,6 +37,7 @@
import org.apache.doris.catalog.external.TestExternalDatabase;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
Expand Down Expand Up @@ -550,6 +555,24 @@ public void addDatabaseForTest(ExternalDatabase<? extends ExternalTable> db) {
dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId());
}

public void createDb(CreateDbStmt stmt) throws DdlException {
throw new NotImplementedException("dropDatabase not implemented");
}

public void dropDb(DropDbStmt stmt) throws DdlException {
throw new NotImplementedException("dropDatabase not implemented");
}

@Override
public void createTable(CreateTableStmt stmt) throws UserException {
throw new NotImplementedException("createTable not implemented");
}

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
throw new NotImplementedException("dropTable not implemented");
}

public void dropDatabaseForReplay(String dbName) {
throw new NotImplementedException("dropDatabase not implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.doris.datasource;

import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
Expand All @@ -25,8 +29,13 @@
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.QueryableReentrantLock;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HMSCachedClientFactory;
import org.apache.doris.datasource.hive.HiveCatalogDatabase;
import org.apache.doris.datasource.hive.HiveCatalogTable;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.property.PropertyConverter;
Expand All @@ -39,22 +48,24 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* External catalog for hive metastore compatible data sources.
*/
public class HMSExternalCatalog extends ExternalCatalog {
private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);

private static final int MIN_CLIENT_POOL_SIZE = 8;
protected HMSCachedClient client;
// Record the latest synced event id when processing hive events
Expand All @@ -71,6 +82,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
public static final int FILE_META_CACHE_NO_TTL = -1;
// 0 means file cache is disabled; >0 means file cache with ttl;
public static final int FILE_META_CACHE_TTL_DISABLE_CACHE = 0;
private QueryableReentrantLock lock = new QueryableReentrantLock(true);

public HMSExternalCatalog() {
catalogProperty = new CatalogProperty(null, null);
Expand Down Expand Up @@ -307,6 +319,125 @@ public void notifyPropertiesUpdated(Map<String, String> updatedProps) {
}
}

private boolean tryLock(boolean mustLock) {
while (true) {
try {
if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, TimeUnit.MILLISECONDS)) {
// to see which thread held this lock for long time.
Thread owner = lock.getOwner();
if (owner != null) {
// There are many catalog timeout during regression test
// And this timeout should not happen very often, so it could be info log
LOG.info("catalog lock is held by: {}", Util.dumpThread(owner, 10));
}

if (mustLock) {
continue;
} else {
return false;
}
}
return true;
} catch (InterruptedException e) {
LOG.warn("got exception while getting catalog lock", e);
if (mustLock) {
continue;
} else {
return lock.isHeldByCurrentThread();
}
}
}
}

private void unlock() {
if (lock.isHeldByCurrentThread()) {
this.lock.unlock();
}
}

@Override
public void createDb(CreateDbStmt stmt) throws DdlException {
String fullDbName = stmt.getFullDbName();
Map<String, String> properties = stmt.getProperties();
long id = Env.getCurrentEnv().getNextId();

if (!tryLock(false)) {
throw new DdlException("Failed to acquire catalog lock. Try again");
}
try {
HiveCatalogDatabase catalogDatabase = new HiveCatalogDatabase();
catalogDatabase.setDbName(fullDbName);
catalogDatabase.setProperties(properties);
if (properties.containsKey("location_uri")) {
catalogDatabase.setLocationUri(properties.get("location_uri"));
}
catalogDatabase.setComment(properties.getOrDefault("comment", ""));
client.createDatabase(catalogDatabase);
createDatabaseForReplay(id, fullDbName);
} finally {
unlock();
}
LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
}

public void dropDb(DropDbStmt stmt) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire catalog lock. Try again");
}
try {
client.dropDatabase(stmt.getDbName());
} finally {
unlock();
}
}

@Override
public void createTable(CreateTableStmt stmt) throws UserException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire catalog lock. Try again");
}
try {
HiveCatalogTable catalogTable = new HiveCatalogTable();
catalogTable.setDbName(stmt.getDbName());
catalogTable.setTableName(stmt.getTableName());
Map<String, String> props = stmt.getExtProperties();
catalogTable.setProperties(props);
String inputFormat = props.getOrDefault("input_format",
"org.apache.hadoop.mapred.TextInputFormat");
String outputFormat = props.getOrDefault("output_format",
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat");
catalogTable.setInputFormat(inputFormat);
catalogTable.setOutputFormat(outputFormat);
catalogTable.setPartitionKeys(parsePartitionKeys(props));
client.createTable(catalogTable, stmt.isSetIfNotExists());
} finally {
unlock();
}
}

private static List<FieldSchema> parsePartitionKeys(Map<String, String> props) {
List<FieldSchema> parsedKeys = new ArrayList<>();
String pkStr = props.getOrDefault("partition_keys", "");
if (pkStr.isEmpty()) {
return parsedKeys;
} else {
// TODO: parse string to partition keys list
return parsedKeys;
}
}

@Override
public void dropTable(DropTableStmt stmt) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire catalog lock. Try again");
}
try {
client.dropTable(stmt.getDbName(), stmt.getTableName());
} finally {
unlock();
}
}

@Override
public void setDefaultPropsWhenCreating(boolean isReplay) {
if (isReplay) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.TableName;
import org.apache.doris.datasource.CatalogDatabase;
import org.apache.doris.datasource.CatalogTable;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;

import org.apache.hadoop.hive.common.ValidWriteIdList;
Expand All @@ -44,6 +46,14 @@ public interface HMSCachedClient {

List<String> getAllTables(String dbName);

void createDatabase(CatalogDatabase database);

void dropDatabase(String dbName);

void createTable(CatalogTable hiveTable, boolean ignoreIfExists);

void dropTable(String dbName, String tblName);

boolean tableExists(String dbName, String tblName);

List<String> listPartitionNames(String dbName, String tblName);
Expand Down
Loading

0 comments on commit 50d7cf9

Please sign in to comment.