diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java deleted file mode 100644 index 7bef1d75c15a4..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/DDLOperationConverter.java +++ /dev/null @@ -1,822 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive; - -import org.apache.flink.connectors.hive.FlinkHiveException; -import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; -import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.AlterHiveDatabaseOp; -import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.WatermarkSpec; -import org.apache.flink.table.api.constraints.UniqueConstraint; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogFunctionImpl; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.CatalogPartition; -import org.apache.flink.table.catalog.CatalogPartitionImpl; -import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.CatalogView; -import org.apache.flink.table.catalog.CatalogViewImpl; -import org.apache.flink.table.catalog.FunctionLanguage; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.UnresolvedIdentifier; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.hive.HiveCatalog; -import org.apache.flink.table.catalog.hive.client.HiveShim; -import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory; -import org.apache.flink.table.catalog.hive.util.HiveTableUtil; -import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; -import org.apache.flink.table.delegation.Parser; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.operations.DescribeTableOperation; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.ShowDatabasesOperation; -import org.apache.flink.table.operations.ShowFunctionsOperation; -import org.apache.flink.table.operations.ShowPartitionsOperation; -import org.apache.flink.table.operations.ShowTablesOperation; -import org.apache.flink.table.operations.ShowViewsOperation; -import org.apache.flink.table.operations.UseDatabaseOperation; -import org.apache.flink.table.operations.ddl.AddPartitionsOperation; -import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; -import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; -import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; -import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; -import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation; -import org.apache.flink.table.operations.ddl.AlterViewAsOperation; -import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation; -import org.apache.flink.table.operations.ddl.AlterViewRenameOperation; -import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; -import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; -import org.apache.flink.table.operations.ddl.CreateTableOperation; -import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; -import org.apache.flink.table.operations.ddl.CreateViewOperation; -import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation; -import org.apache.flink.table.operations.ddl.DropDatabaseOperation; -import org.apache.flink.table.operations.ddl.DropPartitionsOperation; -import org.apache.flink.table.operations.ddl.DropTableOperation; -import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; -import org.apache.flink.table.operations.ddl.DropViewOperation; -import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer; -import org.apache.flink.table.planner.delegation.hive.copy.HiveParserStorageFormat; -import org.apache.flink.table.planner.delegation.hive.desc.DropPartitionDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterDatabaseDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.NotNullConstraint; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.PrimaryKey; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropDatabaseDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropFunctionDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserShowTablesDesc; -import org.apache.flink.table.planner.utils.OperationConverterUtils; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; -import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DescTableDesc; -import org.apache.hadoop.hive.ql.plan.FunctionWork; -import org.apache.hadoop.hive.ql.plan.PrincipalDesc; -import org.apache.hadoop.hive.ql.plan.ShowDatabasesDesc; -import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc; -import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; -import org.apache.hadoop.hive.ql.plan.SwitchDatabaseDesc; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils.COL_DELIMITER; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_COL_CASCADE; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.ALTER_COLUMNS; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_FILE_FORMAT; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_LOCATION; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_SERDE_PROPS; -import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_TBL_PROPS; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase.DATABASE_LOCATION_URI; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.COLLECTION_DELIM; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.ESCAPE_CHAR; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.FIELD_DELIM; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.LINE_DELIM; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.MAPKEY_DELIM; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERIALIZATION_NULL_FORMAT; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_INPUT_FORMAT; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL; -import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI; - -/** A converter to generate DDL operations. */ -public class DDLOperationConverter { - - private final Parser parser; - private final CatalogManager catalogManager; - private final HiveFunctionDefinitionFactory funcDefFactory; - - public DDLOperationConverter(Parser parser, CatalogManager catalogManager, HiveShim hiveShim) { - this.parser = parser; - this.catalogManager = catalogManager; - this.funcDefFactory = new HiveFunctionDefinitionFactory(hiveShim); - } - - public Operation convert(Serializable work) { - if (work instanceof DDLWork) { - DDLWork ddlWork = (DDLWork) work; - if (ddlWork.getCreateDatabaseDesc() != null) { - return convertCreateDatabase(ddlWork.getCreateDatabaseDesc()); - } else if (ddlWork.getShowDatabasesDesc() != null) { - return convertShowDatabases(ddlWork.getShowDatabasesDesc()); - } else if (ddlWork.getSwitchDatabaseDesc() != null) { - return convertUseDatabase(ddlWork.getSwitchDatabaseDesc()); - } else if (ddlWork.getAddPartitionDesc() != null) { - return convertAddPartitions(ddlWork.getAddPartitionDesc()); - } else if (ddlWork.getShowPartsDesc() != null) { - return convertShowPartitions(ddlWork.getShowPartsDesc()); - } else if (ddlWork.getShowFuncsDesc() != null) { - return convertShowFunctions(ddlWork.getShowFuncsDesc()); - } else if (ddlWork.getDescTblDesc() != null) { - return convertDescTable(ddlWork.getDescTblDesc()); - } else { - throw new FlinkHiveException("Unsupported DDLWork"); - } - } else if (work instanceof HiveParserShowTablesDesc) { - if (((HiveParserShowTablesDesc) work).isExpectView()) { - return convertShowViews((HiveParserShowTablesDesc) work); - } else { - return convertShowTables((HiveParserShowTablesDesc) work); - } - } else if (work instanceof HiveParserAlterTableDesc) { - HiveParserAlterTableDesc alterTableDesc = (HiveParserAlterTableDesc) work; - if (alterTableDesc.expectView()) { - return convertAlterView(alterTableDesc); - } else { - return convertAlterTable(alterTableDesc); - } - } else if (work instanceof HiveParserCreateTableDesc) { - return convertCreateTable((HiveParserCreateTableDesc) work); - } else if (work instanceof HiveParserDropTableDesc) { - HiveParserDropTableDesc dropTableDesc = (HiveParserDropTableDesc) work; - if (dropTableDesc.isExpectView()) { - return convertDropView(dropTableDesc); - } else { - return convertDropTable(dropTableDesc); - } - } else if (work instanceof DropPartitionDesc) { - return convertDropPartitions((DropPartitionDesc) work); - } else if (work instanceof HiveParserCreateViewDesc) { - return convertCreateAlterView((HiveParserCreateViewDesc) work); - } else if (work instanceof HiveParserDropFunctionDesc) { - return convertDropFunction((HiveParserDropFunctionDesc) work); - } else if (work instanceof FunctionWork) { - FunctionWork functionWork = (FunctionWork) work; - if (functionWork.getCreateFunctionDesc() != null) { - return convertCreateFunction(functionWork.getCreateFunctionDesc()); - } - throw new FlinkHiveException("Unsupported FunctionWork"); - } else if (work instanceof HiveParserAlterDatabaseDesc) { - return convertAlterDatabase((HiveParserAlterDatabaseDesc) work); - } else if (work instanceof HiveParserDropDatabaseDesc) { - return convertDropDatabase((HiveParserDropDatabaseDesc) work); - } else { - throw new FlinkHiveException("Unsupported work class " + work.getClass().getName()); - } - } - - private Operation convertDescTable(DescTableDesc desc) { - ObjectIdentifier tableIdentifier = parseObjectIdentifier(desc.getTableName()); - return new DescribeTableOperation(tableIdentifier, desc.isExt() || desc.isFormatted()); - } - - private Operation convertShowFunctions(ShowFunctionsDesc desc) { - return new ShowFunctionsOperation(); - } - - private Operation convertShowPartitions(ShowPartitionsDesc desc) { - ObjectIdentifier tableIdentifier = parseObjectIdentifier(desc.getTabName()); - CatalogPartitionSpec spec = null; - if (desc.getPartSpec() != null && !desc.getPartSpec().isEmpty()) { - spec = new CatalogPartitionSpec(new HashMap<>(desc.getPartSpec())); - } - return new ShowPartitionsOperation(tableIdentifier, spec); - } - - private Operation convertCreateFunction(CreateFunctionDesc desc) { - if (desc.isTemp()) { - // hive's temporary function is more like flink's temp system function, e.g. doesn't - // belong to a catalog/db - // the DDL analyzer makes sure temp function name is not a compound one - FunctionDefinition funcDefinition = - funcDefFactory.createFunctionDefinition( - desc.getFunctionName(), - new CatalogFunctionImpl(desc.getClassName(), FunctionLanguage.JAVA)); - return new CreateTempSystemFunctionOperation( - desc.getFunctionName(), false, funcDefinition); - } else { - ObjectIdentifier identifier = parseObjectIdentifier(desc.getFunctionName()); - CatalogFunction catalogFunction = - new CatalogFunctionImpl(desc.getClassName(), FunctionLanguage.JAVA); - return new CreateCatalogFunctionOperation( - identifier, catalogFunction, false, desc.isTemp()); - } - } - - private Operation convertDropFunction(HiveParserDropFunctionDesc desc) { - if (desc.getDesc().isTemp()) { - return new DropTempSystemFunctionOperation( - desc.getDesc().getFunctionName(), desc.ifExists()); - } else { - ObjectIdentifier identifier = parseObjectIdentifier(desc.getDesc().getFunctionName()); - return new DropCatalogFunctionOperation( - identifier, desc.ifExists(), desc.getDesc().isTemp()); - } - } - - private Operation convertDropView(HiveParserDropTableDesc desc) { - ObjectIdentifier identifier = parseObjectIdentifier(desc.getCompoundName()); - CatalogBaseTable baseTable = getCatalogBaseTable(identifier, true); - if (baseTable instanceof CatalogTable) { - throw new ValidationException("DROP VIEW for a table is not allowed"); - } - return new DropViewOperation(identifier, desc.ifExists(), false); - } - - private Operation convertDropTable(HiveParserDropTableDesc desc) { - ObjectIdentifier identifier = parseObjectIdentifier(desc.getCompoundName()); - CatalogBaseTable baseTable = getCatalogBaseTable(identifier, true); - if (baseTable instanceof CatalogView) { - throw new ValidationException("DROP TABLE for a view is not allowed"); - } - return new DropTableOperation(identifier, desc.ifExists(), false); - } - - // handles both create view and alter view as - private Operation convertCreateAlterView(HiveParserCreateViewDesc desc) { - ObjectIdentifier viewIdentifier = parseObjectIdentifier(desc.getCompoundName()); - TableSchema schema = - HiveTableUtil.createTableSchema( - desc.getSchema(), Collections.emptyList(), Collections.emptySet(), null); - Map props = new HashMap<>(); - String comment; - if (desc.isAlterViewAs()) { - CatalogBaseTable baseTable = getCatalogBaseTable(viewIdentifier); - if (baseTable instanceof CatalogTable) { - throw new ValidationException("ALTER VIEW for a table is not allowed"); - } - props.putAll(baseTable.getOptions()); - comment = baseTable.getComment(); - } else { - comment = desc.getComment(); - if (desc.getTblProps() != null) { - props.putAll(desc.getTblProps()); - } - } - CatalogView catalogView = - new CatalogViewImpl( - desc.getOriginalText(), desc.getExpandedText(), schema, props, comment); - if (desc.isAlterViewAs()) { - return new AlterViewAsOperation(viewIdentifier, catalogView); - } else { - return new CreateViewOperation(viewIdentifier, catalogView, desc.ifNotExists(), false); - } - } - - private Operation convertDropPartitions(DropPartitionDesc desc) { - ObjectIdentifier tableIdentifier = - catalogManager.qualifyIdentifier( - UnresolvedIdentifier.of(desc.getDbName(), desc.getTableName())); - CatalogBaseTable catalogBaseTable = getCatalogBaseTable(tableIdentifier); - if (catalogBaseTable instanceof CatalogView) { - throw new ValidationException("DROP PARTITION for a view is not supported"); - } - List specs = - desc.getSpecs().stream() - .map(CatalogPartitionSpec::new) - .collect(Collectors.toList()); - return new DropPartitionsOperation(tableIdentifier, desc.ifExists(), specs); - } - - private Operation convertAddPartitions(AddPartitionDesc desc) { - ObjectIdentifier tableIdentifier = - desc.getDbName() == null - ? parseObjectIdentifier(desc.getTableName()) - : catalogManager.qualifyIdentifier( - UnresolvedIdentifier.of(desc.getDbName(), desc.getTableName())); - CatalogBaseTable catalogBaseTable = getCatalogBaseTable(tableIdentifier); - if (catalogBaseTable instanceof CatalogView) { - throw new ValidationException("ADD PARTITION for a view is not supported"); - } - List specs = new ArrayList<>(); - List partitions = new ArrayList<>(); - for (int i = 0; i < desc.getPartitionCount(); i++) { - specs.add(new CatalogPartitionSpec(desc.getPartition(i).getPartSpec())); - Map props = new HashMap<>(); - String location = desc.getPartition(i).getLocation(); - if (location != null) { - props.put(TABLE_LOCATION_URI, location); - } - partitions.add(new CatalogPartitionImpl(props, null)); - } - return new AddPartitionsOperation(tableIdentifier, desc.isIfNotExists(), specs, partitions); - } - - private Operation convertCreateTable(HiveParserCreateTableDesc desc) { - Map props = new HashMap<>(); - if (desc.getTblProps() != null) { - props.putAll(desc.getTblProps()); - } - markHiveConnector(props); - // external - if (desc.isExternal()) { - props.put(TABLE_IS_EXTERNAL, "true"); - } - // PK trait - UniqueConstraint uniqueConstraint = null; - if (desc.getPrimaryKeys() != null && !desc.getPrimaryKeys().isEmpty()) { - PrimaryKey primaryKey = desc.getPrimaryKeys().get(0); - byte trait = 0; - if (primaryKey.isEnable()) { - trait = HiveDDLUtils.enableConstraint(trait); - } - if (primaryKey.isValidate()) { - trait = HiveDDLUtils.validateConstraint(trait); - } - if (primaryKey.isRely()) { - trait = HiveDDLUtils.relyConstraint(trait); - } - props.put(PK_CONSTRAINT_TRAIT, String.valueOf(trait)); - List pkCols = - desc.getPrimaryKeys().stream() - .map(PrimaryKey::getPk) - .collect(Collectors.toList()); - String constraintName = primaryKey.getConstraintName(); - if (constraintName == null) { - constraintName = pkCols.stream().collect(Collectors.joining("_", "PK_", "")); - } - uniqueConstraint = UniqueConstraint.primaryKey(constraintName, pkCols); - } - // NOT NULL constraints - List notNullCols = new ArrayList<>(); - if (!desc.getNotNullConstraints().isEmpty()) { - List traits = new ArrayList<>(); - for (NotNullConstraint notNull : desc.getNotNullConstraints()) { - byte trait = 0; - if (notNull.isEnable()) { - trait = HiveDDLUtils.enableConstraint(trait); - } - if (notNull.isValidate()) { - trait = HiveDDLUtils.validateConstraint(trait); - } - if (notNull.isRely()) { - trait = HiveDDLUtils.relyConstraint(trait); - } - traits.add(String.valueOf(trait)); - notNullCols.add(notNull.getColName()); - } - props.put(NOT_NULL_CONSTRAINT_TRAITS, String.join(COL_DELIMITER, traits)); - props.put(NOT_NULL_COLS, String.join(COL_DELIMITER, notNullCols)); - } - // row format - if (desc.getRowFormatParams() != null) { - encodeRowFormat(desc.getRowFormatParams(), props); - } - // storage format - if (desc.getStorageFormat() != null) { - encodeStorageFormat(desc.getStorageFormat(), props); - } - // location - if (desc.getLocation() != null) { - props.put(TABLE_LOCATION_URI, desc.getLocation()); - } - ObjectIdentifier identifier = parseObjectIdentifier(desc.getCompoundName()); - Set notNullColSet = new HashSet<>(notNullCols); - if (uniqueConstraint != null) { - notNullColSet.addAll(uniqueConstraint.getColumns()); - } - TableSchema tableSchema = - HiveTableUtil.createTableSchema( - desc.getCols(), desc.getPartCols(), notNullColSet, uniqueConstraint); - return new CreateTableOperation( - identifier, - new CatalogTableImpl( - tableSchema, - HiveCatalog.getFieldNames(desc.getPartCols()), - props, - desc.getComment()), - desc.ifNotExists(), - desc.isTemporary()); - } - - private Operation convertAlterView(HiveParserAlterTableDesc desc) { - ObjectIdentifier viewIdentifier = parseObjectIdentifier(desc.getCompoundName()); - CatalogBaseTable baseTable = getCatalogBaseTable(viewIdentifier); - if (baseTable instanceof CatalogTable) { - throw new ValidationException("ALTER VIEW for a table is not allowed"); - } - CatalogView oldView = (CatalogView) baseTable; - switch (desc.getOp()) { - case RENAME: - return new AlterViewRenameOperation( - viewIdentifier, parseObjectIdentifier(desc.getNewName())); - case ADDPROPS: - Map props = new HashMap<>(oldView.getOptions()); - props.putAll(desc.getProps()); - CatalogView newView = - new CatalogViewImpl( - oldView.getOriginalQuery(), - oldView.getExpandedQuery(), - oldView.getSchema(), - props, - oldView.getComment()); - return new AlterViewPropertiesOperation(viewIdentifier, newView); - default: - throw new FlinkHiveException("Unsupported alter view operation " + desc.getOp()); - } - } - - private Operation convertAlterTable(HiveParserAlterTableDesc desc) { - ObjectIdentifier tableIdentifier = parseObjectIdentifier(desc.getCompoundName()); - CatalogBaseTable catalogBaseTable = getCatalogBaseTable(tableIdentifier); - if (catalogBaseTable instanceof CatalogView) { - throw new ValidationException("ALTER TABLE for a view is not allowed"); - } - CatalogTable oldTable = (CatalogTable) catalogBaseTable; - CatalogPartitionSpec partSpec = - desc.getPartSpec() != null ? new CatalogPartitionSpec(desc.getPartSpec()) : null; - CatalogPartition catalogPartition = - partSpec != null ? getPartition(tableIdentifier, partSpec) : null; - Map newProps = new HashMap<>(); - switch (desc.getOp()) { - case RENAME: - return new AlterTableRenameOperation( - tableIdentifier, parseObjectIdentifier(desc.getNewName())); - case ADDPROPS: - newProps.put(ALTER_TABLE_OP, CHANGE_TBL_PROPS.name()); - newProps.putAll(desc.getProps()); - return convertAlterTableProps( - tableIdentifier, oldTable, partSpec, catalogPartition, newProps); - case ALTERLOCATION: - newProps.put(ALTER_TABLE_OP, CHANGE_LOCATION.name()); - newProps.put(TABLE_LOCATION_URI, desc.getNewLocation()); - return convertAlterTableProps( - tableIdentifier, oldTable, partSpec, catalogPartition, newProps); - case ADDFILEFORMAT: - newProps.put(ALTER_TABLE_OP, CHANGE_FILE_FORMAT.name()); - newProps.put(STORED_AS_FILE_FORMAT, desc.getGenericFileFormatName()); - return convertAlterTableProps( - tableIdentifier, oldTable, partSpec, catalogPartition, newProps); - case ADDSERDE: - case ADDSERDEPROPS: - newProps.put(ALTER_TABLE_OP, CHANGE_SERDE_PROPS.name()); - if (desc.getSerdeName() != null) { - newProps.put(SERDE_LIB_CLASS_NAME, desc.getSerdeName()); - } - if (desc.getProps() != null) { - for (String key : desc.getProps().keySet()) { - newProps.put(SERDE_INFO_PROP_PREFIX + key, desc.getProps().get(key)); - } - } - return convertAlterTableProps( - tableIdentifier, oldTable, partSpec, catalogPartition, newProps); - case REPLACECOLS: - return convertAddReplaceColumns( - tableIdentifier, oldTable, desc.getNewCols(), true, desc.isCascade()); - case ADDCOLS: - return convertAddReplaceColumns( - tableIdentifier, oldTable, desc.getNewCols(), false, desc.isCascade()); - case RENAMECOLUMN: - return convertChangeColumn( - tableIdentifier, - oldTable, - desc.getOldColName(), - desc.getNewColName(), - desc.getNewColType(), - desc.getNewColComment(), - desc.getAfter(), - desc.isFirst(), - desc.isCascade()); - default: - throw new FlinkHiveException("Unsupported alter table operation " + desc.getOp()); - } - } - - private Operation convertChangeColumn( - ObjectIdentifier tableIdentifier, - CatalogTable oldTable, - String oldName, - String newName, - String newType, - String newComment, - String after, - boolean first, - boolean cascade) { - if (oldTable.getPartitionKeys().contains(oldName)) { - // disallow changing partition columns - throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns"); - } - TableSchema oldSchema = oldTable.getSchema(); - TableColumn newTableColumn = - TableColumn.physical( - newName, - HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(newType))); - TableSchema newSchema = - OperationConverterUtils.changeColumn( - oldSchema, oldName, newTableColumn, first, after); - Map props = new HashMap<>(oldTable.getOptions()); - props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name()); - if (cascade) { - props.put(ALTER_COL_CASCADE, "true"); - } - return new AlterTableSchemaOperation( - tableIdentifier, - new CatalogTableImpl( - newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment())); - } - - private Operation convertAddReplaceColumns( - ObjectIdentifier tableIdentifier, - CatalogTable oldTable, - List newCols, - boolean replace, - boolean cascade) { - // prepare properties - Map props = new HashMap<>(oldTable.getOptions()); - props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name()); - if (cascade) { - props.put(ALTER_COL_CASCADE, "true"); - } - TableSchema oldSchema = oldTable.getSchema(); - final int numPartCol = oldTable.getPartitionKeys().size(); - TableSchema.Builder builder = TableSchema.builder(); - // add existing non-part col if we're not replacing - if (!replace) { - List nonPartCols = - oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol); - for (TableColumn column : nonPartCols) { - builder.add(column); - } - setWatermarkAndPK(builder, oldSchema); - } - // add new cols - for (FieldSchema col : newCols) { - builder.add( - TableColumn.physical( - col.getName(), - HiveTypeUtil.toFlinkType( - TypeInfoUtils.getTypeInfoFromTypeString(col.getType())))); - } - // add part cols - List partCols = - oldSchema - .getTableColumns() - .subList(oldSchema.getFieldCount() - numPartCol, oldSchema.getFieldCount()); - for (TableColumn column : partCols) { - builder.add(column); - } - return new AlterTableSchemaOperation( - tableIdentifier, - new CatalogTableImpl( - builder.build(), - oldTable.getPartitionKeys(), - props, - oldTable.getComment())); - } - - private Operation convertAlterTableProps( - ObjectIdentifier tableIdentifier, - CatalogTable oldTable, - CatalogPartitionSpec partSpec, - CatalogPartition catalogPartition, - Map newProps) { - Map props = new HashMap<>(); - if (catalogPartition != null) { - props.putAll(catalogPartition.getProperties()); - props.putAll(newProps); - return new AlterPartitionPropertiesOperation( - tableIdentifier, - partSpec, - new CatalogPartitionImpl(props, catalogPartition.getComment())); - } else { - props.putAll(oldTable.getOptions()); - props.putAll(newProps); - return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(props)); - } - } - - private Operation convertShowViews(HiveParserShowTablesDesc desc) { - return new ShowViewsOperation(); - } - - private Operation convertShowTables(HiveParserShowTablesDesc desc) { - return new ShowTablesOperation(); - } - - private Operation convertUseDatabase(SwitchDatabaseDesc desc) { - return new UseDatabaseOperation(catalogManager.getCurrentCatalog(), desc.getDatabaseName()); - } - - private Operation convertShowDatabases(ShowDatabasesDesc desc) { - return new ShowDatabasesOperation(); - } - - private Operation convertDropDatabase(HiveParserDropDatabaseDesc desc) { - return new DropDatabaseOperation( - catalogManager.getCurrentCatalog(), - desc.getDatabaseName(), - desc.ifExists(), - desc.cascade()); - } - - private Operation convertAlterDatabase(HiveParserAlterDatabaseDesc desc) { - Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(); - CatalogDatabase originDB; - try { - originDB = catalog.getDatabase(desc.getDatabaseName()); - } catch (DatabaseNotExistException e) { - throw new ValidationException( - String.format("Database %s not exists", desc.getDatabaseName()), e); - } - Map props = new HashMap<>(originDB.getProperties()); - switch (desc.getAlterType()) { - case ALTER_PROPERTY: - props.put(ALTER_DATABASE_OP, AlterHiveDatabaseOp.CHANGE_PROPS.name()); - props.putAll(desc.getDbProperties()); - break; - case ALTER_OWNER: - props.put(ALTER_DATABASE_OP, AlterHiveDatabaseOp.CHANGE_OWNER.name()); - PrincipalDesc principalDesc = desc.getOwnerPrincipal(); - props.put(DATABASE_OWNER_NAME, principalDesc.getName()); - props.put(DATABASE_OWNER_TYPE, principalDesc.getType().name().toLowerCase()); - break; - case ALTER_LOCATION: - props.put(ALTER_DATABASE_OP, AlterHiveDatabaseOp.CHANGE_LOCATION.name()); - props.put(DATABASE_LOCATION_URI, desc.getLocation()); - break; - default: - throw new FlinkHiveException("Unsupported alter database operation"); - } - CatalogDatabase newDB = new CatalogDatabaseImpl(props, originDB.getComment()); - return new AlterDatabaseOperation( - catalogManager.getCurrentCatalog(), desc.getDatabaseName(), newDB); - } - - private Operation convertCreateDatabase(CreateDatabaseDesc desc) { - Map props = new HashMap<>(); - if (desc.getDatabaseProperties() != null) { - props.putAll(desc.getDatabaseProperties()); - } - if (desc.getLocationUri() != null) { - props.put(DATABASE_LOCATION_URI, desc.getLocationUri()); - } - CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(props, desc.getComment()); - return new CreateDatabaseOperation( - catalogManager.getCurrentCatalog(), - desc.getName(), - catalogDatabase, - desc.getIfNotExists()); - } - - private void markHiveConnector(Map props) { - props.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); - } - - private CatalogBaseTable getCatalogBaseTable(ObjectIdentifier tableIdentifier) { - return getCatalogBaseTable(tableIdentifier, false); - } - - private CatalogBaseTable getCatalogBaseTable( - ObjectIdentifier tableIdentifier, boolean ifExists) { - Optional optionalCatalogTable = - catalogManager.getTable(tableIdentifier); - if (!optionalCatalogTable.isPresent()) { - if (ifExists) { - return null; - } else { - throw new ValidationException( - String.format( - "Table or View %s doesn't exist.", tableIdentifier.toString())); - } - } - if (optionalCatalogTable.get().isTemporary()) { - throw new ValidationException( - String.format("Table or View %s is temporary.", tableIdentifier.toString())); - } - return optionalCatalogTable.get().getTable(); - } - - private CatalogPartition getPartition( - ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) { - return catalogManager - .getPartition(tableIdentifier, partitionSpec) - .orElseThrow( - () -> - new ValidationException( - String.format( - "Partition %s of table %s doesn't exist", - partitionSpec.getPartitionSpec(), - tableIdentifier))); - } - - private ObjectIdentifier parseObjectIdentifier(String compoundName) { - UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(compoundName); - return catalogManager.qualifyIdentifier(unresolvedIdentifier); - } - - private void encodeRowFormat( - HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams rowFormatParams, - Map props) { - if (rowFormatParams.getFieldDelim() != null) { - props.put(FIELD_DELIM, rowFormatParams.getFieldDelim()); - } - if (rowFormatParams.getCollItemDelim() != null) { - props.put(COLLECTION_DELIM, rowFormatParams.getCollItemDelim()); - } - if (rowFormatParams.getMapKeyDelim() != null) { - props.put(MAPKEY_DELIM, rowFormatParams.getMapKeyDelim()); - } - if (rowFormatParams.getFieldEscape() != null) { - props.put(ESCAPE_CHAR, rowFormatParams.getFieldEscape()); - } - if (rowFormatParams.getLineDelim() != null) { - props.put(LINE_DELIM, rowFormatParams.getLineDelim()); - } - if (rowFormatParams.getNullFormat() != null) { - props.put(SERIALIZATION_NULL_FORMAT, rowFormatParams.getNullFormat()); - } - } - - private void encodeStorageFormat( - HiveParserStorageFormat storageFormat, Map props) { - String serdeName = storageFormat.getSerde(); - if (serdeName != null) { - props.put(SERDE_LIB_CLASS_NAME, serdeName); - } - Map serdeProps = storageFormat.getSerdeProps(); - if (serdeProps != null) { - for (String serdeKey : serdeProps.keySet()) { - props.put(SERDE_INFO_PROP_PREFIX + serdeKey, serdeProps.get(serdeKey)); - } - } - if (storageFormat.getInputFormat() != null) { - props.put(STORED_AS_INPUT_FORMAT, storageFormat.getInputFormat()); - } - if (storageFormat.getOutputFormat() != null) { - props.put(STORED_AS_OUTPUT_FORMAT, storageFormat.getOutputFormat()); - } - } - - private static void setWatermarkAndPK(TableSchema.Builder builder, TableSchema schema) { - for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) { - builder.watermark(watermarkSpec); - } - schema.getPrimaryKey() - .ifPresent( - pk -> { - builder.primaryKey( - pk.getName(), pk.getColumns().toArray(new String[0])); - }); - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index 897bdb8c29ec7..f5511b78f5ebe 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -28,12 +28,9 @@ import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping; -import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.NopOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.ddl.CreateTableASOperation; -import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.delegation.ParserImpl; import org.apache.flink.table.planner.delegation.PlannerContext; @@ -42,10 +39,8 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserContext; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState; -import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc; import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; +import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.parse.CalciteParser; @@ -60,7 +55,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; @@ -68,7 +62,6 @@ import java.io.File; import java.io.IOException; -import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.sql.Timestamp; @@ -232,42 +225,15 @@ private List processCmd( HiveParserQueryState queryState = new HiveParserQueryState(hiveConf); HiveParserDDLSemanticAnalyzer ddlAnalyzer = new HiveParserDDLSemanticAnalyzer( - queryState, hiveCatalog, getCatalogManager().getCurrentDatabase()); - Serializable work = ddlAnalyzer.analyzeInternal(node); - DDLOperationConverter ddlConverter = - new DDLOperationConverter(this, getCatalogManager(), hiveShim); - if (work instanceof HiveParserCreateViewDesc) { - // analyze and expand the view query - analyzeCreateView( - (HiveParserCreateViewDesc) work, context, queryState, hiveShim); - } else if (work instanceof CreateTableASDesc) { - // analyze the query - CreateTableASDesc ctasDesc = (CreateTableASDesc) work; - HiveParserCalcitePlanner calcitePlanner = - createCalcitePlanner(context, queryState, hiveShim); - calcitePlanner.setCtasDesc(ctasDesc); - RelNode queryRelNode = calcitePlanner.genLogicalPlan(ctasDesc.getQuery()); - // create a table to represent the dest table - HiveParserCreateTableDesc createTableDesc = ctasDesc.getCreateTableDesc(); - String[] dbTblName = createTableDesc.getCompoundName().split("\\."); - Table destTable = new Table(Table.getEmptyTable(dbTblName[0], dbTblName[1])); - destTable.getSd().setCols(createTableDesc.getCols()); - // create the insert operation - CatalogSinkModifyOperation insertOperation = - dmlHelper.createInsertOperation( - queryRelNode, - destTable, - Collections.emptyMap(), - Collections.emptyList(), - false); - CreateTableOperation createTableOperation = - (CreateTableOperation) - ddlConverter.convert( - ((CreateTableASDesc) work).getCreateTableDesc()); - return Collections.singletonList( - new CreateTableASOperation(createTableOperation, insertOperation)); - } - return Collections.singletonList(ddlConverter.convert(work)); + queryState, + hiveCatalog, + getCatalogManager(), + this, + hiveShim, + context, + dmlHelper); + operation = ddlAnalyzer.convertToOperation(node); + return Collections.singletonList(operation); } else { final boolean explain = node.getType() == HiveASTParser.TOK_EXPLAIN; // first child is the underlying explicandum @@ -291,7 +257,7 @@ private List processCmd( } } - private HiveParserCalcitePlanner createCalcitePlanner( + public HiveParserCalcitePlanner createCalcitePlanner( HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim) throws SemanticException { HiveParserCalcitePlanner calciteAnalyzer = @@ -307,16 +273,16 @@ private HiveParserCalcitePlanner createCalcitePlanner( return calciteAnalyzer; } - private void analyzeCreateView( - HiveParserCreateViewDesc desc, + public void analyzeCreateView( + HiveParserCreateViewInfo createViewInfo, HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim) throws SemanticException { HiveParserCalcitePlanner calciteAnalyzer = createCalcitePlanner(context, queryState, hiveShim); - calciteAnalyzer.setCreateViewDesc(desc); - calciteAnalyzer.genLogicalPlan(desc.getQuery()); + calciteAnalyzer.setCreatViewInfo(createViewInfo); + calciteAnalyzer.genLogicalPlan(createViewInfo.getQuery()); } private Operation analyzeSql( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java index c7b32615e91f0..c5a0329c0dfe8 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java @@ -47,9 +47,8 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeCheckCtx; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeConverter; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserWindowingSpec; -import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc; import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; +import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution; @@ -197,8 +196,8 @@ public class HiveParserCalcitePlanner { // this will be used in HiveParserRexNodeConverter to create cor var private int subqueryId = 0; - private HiveParserCreateViewDesc createViewDesc; - private CreateTableASDesc ctasDesc; + private HiveParserCreateViewInfo createViewInfo; + private List ctasCols; public HiveParserCalcitePlanner( HiveParserQueryState queryState, @@ -224,15 +223,15 @@ public HiveParserCalcitePlanner( cluster, frameworkConfig.getOperatorTable(), catalogReader.nameMatcher()); } - public void setCtasDesc(CreateTableASDesc ctasDesc) { - this.ctasDesc = ctasDesc; + public void setCtasCols(List ctasCols) { + this.ctasCols = ctasCols; } - public void setCreateViewDesc(HiveParserCreateViewDesc createViewDesc) { - if (createViewDesc != null) { + public void setCreatViewInfo(HiveParserCreateViewInfo createViewInfo) { + if (createViewInfo != null) { semanticAnalyzer.unparseTranslator.enable(); } - this.createViewDesc = createViewDesc; + this.createViewInfo = createViewInfo; } public void initCtx(HiveParserContext context) { @@ -283,22 +282,22 @@ private RelNode logicalPlan() { try { RelNode plan = genLogicalPlan(getQB(), true, null, null); - if (createViewDesc != null) { + if (createViewInfo != null) { semanticAnalyzer.resultSchema = HiveParserUtils.convertRowSchemaToResultSetSchema( relToRowResolver.get(plan), false); HiveParserUtils.saveViewDefinition( semanticAnalyzer.resultSchema, - createViewDesc, + createViewInfo, semanticAnalyzer.ctx.getTokenRewriteStream(), semanticAnalyzer.unparseTranslator, semanticAnalyzer.getConf()); - } else if (ctasDesc != null) { + } else if (ctasCols != null) { // CTAS doesn't allow specifying col list, so we set it according to result schema semanticAnalyzer.resultSchema = HiveParserUtils.convertRowSchemaToResultSetSchema( relToRowResolver.get(plan), false); - ctasDesc.getCreateTableDesc().getCols().addAll(semanticAnalyzer.resultSchema); + ctasCols.addAll(semanticAnalyzer.resultSchema); } return plan; } catch (SemanticException e) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java index 9b8c02a20a50f..073afb2d9563d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java @@ -43,8 +43,8 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeCheckCtx; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeConverter; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserUnparseTranslator; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc; import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; +import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction; @@ -1263,7 +1263,7 @@ public static List convertRowSchemaToResultSetSchema( public static void saveViewDefinition( List resultSchema, - HiveParserCreateViewDesc desc, + HiveParserCreateViewInfo createViewInfo, TokenRewriteStream tokenRewriteStream, HiveParserUnparseTranslator unparseTranslator, HiveConf conf) @@ -1273,23 +1273,24 @@ public static void saveViewDefinition( List derivedSchema = new ArrayList<>(resultSchema); ParseUtils.validateColumnNameUniqueness(derivedSchema); - List imposedSchema = desc.getSchema(); + List imposedSchema = createViewInfo.getSchema(); if (imposedSchema != null) { int explicitColCount = imposedSchema.size(); int derivedColCount = derivedSchema.size(); if (explicitColCount != derivedColCount) { throw new SemanticException( - generateErrorMessage(desc.getQuery(), ErrorMsg.VIEW_COL_MISMATCH.getMsg())); + generateErrorMessage( + createViewInfo.getQuery(), ErrorMsg.VIEW_COL_MISMATCH.getMsg())); } } // Preserve the original view definition as specified by the user. - if (desc.getOriginalText() == null) { + if (createViewInfo.getOriginalText() == null) { String originalText = tokenRewriteStream.toString( - desc.getQuery().getTokenStartIndex(), - desc.getQuery().getTokenStopIndex()); - desc.setOriginalText(originalText); + createViewInfo.getQuery().getTokenStartIndex(), + createViewInfo.getQuery().getTokenStopIndex()); + createViewInfo.setOriginalText(originalText); } // Now expand the view definition with extras such as explicit column @@ -1298,7 +1299,8 @@ public static void saveViewDefinition( unparseTranslator.applyTranslations(tokenRewriteStream); String expandedText = tokenRewriteStream.toString( - desc.getQuery().getTokenStartIndex(), desc.getQuery().getTokenStopIndex()); + createViewInfo.getQuery().getTokenStartIndex(), + createViewInfo.getQuery().getTokenStopIndex()); if (imposedSchema != null) { // Merge the names from the imposed schema into the types @@ -1332,15 +1334,15 @@ public static void saveViewDefinition( sb.append(" FROM ("); sb.append(expandedText); sb.append(") "); - sb.append(HiveUtils.unparseIdentifier(desc.getCompoundName(), conf)); + sb.append(HiveUtils.unparseIdentifier(createViewInfo.getCompoundName(), conf)); expandedText = sb.toString(); } - desc.setSchema(derivedSchema); - if (!desc.isMaterialized()) { + createViewInfo.setSchema(derivedSchema); + if (!createViewInfo.isMaterialized()) { // materialized views don't store the expanded text as they won't be rewritten at query // time. - desc.setExpandedText(expandedText); + createViewInfo.setExpandedText(expandedText); } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java index 2c4d3cb60c8fa..d1c9dfd3bb90c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserBaseSemanticAnalyzer.java @@ -30,8 +30,6 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserPTFInvocationSpec.PartitionExpression; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserPTFInvocationSpec.PartitionSpec; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserPTFInvocationSpec.PartitioningSpec; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.NotNullConstraint; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.PrimaryKey; import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg; @@ -101,6 +99,7 @@ import java.io.BufferedReader; import java.io.InputStreamReader; +import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.util.ArrayDeque; @@ -2394,4 +2393,122 @@ public void analyzeRowFormat(HiveParserASTNode child) throws SemanticException { } } } + + /** Counterpart of hive's SQLPrimaryKey. */ + public static class PrimaryKey implements Serializable { + + private static final long serialVersionUID = 3036210046732750293L; + + private final String dbName; + private final String tblName; + private final String pk; + private final String constraintName; + private final boolean enable; + private final boolean validate; + private final boolean rely; + + public PrimaryKey( + String dbName, + String tblName, + String pk, + String constraintName, + boolean enable, + boolean validate, + boolean rely) { + this.dbName = dbName; + this.tblName = tblName; + this.pk = pk; + this.constraintName = constraintName; + this.enable = enable; + this.validate = validate; + this.rely = rely; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + public String getPk() { + return pk; + } + + public String getConstraintName() { + return constraintName; + } + + public boolean isEnable() { + return enable; + } + + public boolean isValidate() { + return validate; + } + + public boolean isRely() { + return rely; + } + } + + /** Counterpart of hive's SQLNotNullConstraint. */ + public static class NotNullConstraint implements Serializable { + + private static final long serialVersionUID = 7642343368203203950L; + + private final String dbName; + private final String tblName; + private final String colName; + private final String constraintName; + private final boolean enable; + private final boolean validate; + private final boolean rely; + + public NotNullConstraint( + String dbName, + String tblName, + String colName, + String constraintName, + boolean enable, + boolean validate, + boolean rely) { + this.dbName = dbName; + this.tblName = tblName; + this.colName = colName; + this.constraintName = constraintName; + this.enable = enable; + this.validate = validate; + this.rely = rely; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } + + public String getColName() { + return colName; + } + + public String getConstraintName() { + return constraintName; + } + + public boolean isEnable() { + return enable; + } + + public boolean isValidate() { + return validate; + } + + public boolean isRely() { + return rely; + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/CreateTableASDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/CreateTableASDesc.java deleted file mode 100644 index abbe01310fb8d..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/CreateTableASDesc.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; - -import java.io.Serializable; - -/** A desc for CTAS. */ -public class CreateTableASDesc implements Serializable { - - private static final long serialVersionUID = 1L; - - private final HiveParserCreateTableDesc createTableDesc; - private final HiveParserASTNode query; - - public CreateTableASDesc(HiveParserCreateTableDesc createTableDesc, HiveParserASTNode query) { - this.createTableDesc = createTableDesc; - this.query = query; - } - - public HiveParserCreateTableDesc getCreateTableDesc() { - return createTableDesc; - } - - public HiveParserASTNode getQuery() { - return query; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/DropPartitionDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/DropPartitionDesc.java deleted file mode 100644 index 5ea6d602f6b3c..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/DropPartitionDesc.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -/** Desc to represent DROP PARTITIONS. */ -public class DropPartitionDesc implements Serializable { - - private static final long serialVersionUID = 1L; - - private final String dbName; - private final String tableName; - private final List> specs; - private final boolean ifExists; - - public DropPartitionDesc( - String dbName, String tableName, List> specs, boolean ifExists) { - this.dbName = dbName; - this.tableName = tableName; - this.specs = specs; - this.ifExists = ifExists; - } - - public String getDbName() { - return dbName; - } - - public String getTableName() { - return tableName; - } - - public List> getSpecs() { - return specs; - } - - public boolean ifExists() { - return ifExists; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserAlterDatabaseDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserAlterDatabaseDesc.java deleted file mode 100644 index f1a3118b8b1b5..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserAlterDatabaseDesc.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import org.apache.hadoop.hive.ql.plan.PrincipalDesc; - -import java.io.Serializable; -import java.util.Map; - -/** Desc for alter database. */ -public class HiveParserAlterDatabaseDesc implements Serializable { - private static final long serialVersionUID = 1L; - - /** Type of the alter db operation. */ - public enum AlterDBType { - ALTER_PROPERTY, - ALTER_OWNER, - ALTER_LOCATION - } - - private final AlterDBType alterType; - private final String databaseName; - private final Map dbProperties; - private final PrincipalDesc ownerPrincipal; - private final String location; - - private HiveParserAlterDatabaseDesc( - AlterDBType alterType, - String databaseName, - Map dbProperties, - PrincipalDesc ownerPrincipal, - String location) { - this.alterType = alterType; - this.databaseName = databaseName; - this.dbProperties = dbProperties; - this.ownerPrincipal = ownerPrincipal; - this.location = location; - } - - public AlterDBType getAlterType() { - return alterType; - } - - public String getDatabaseName() { - return databaseName; - } - - public Map getDbProperties() { - return dbProperties; - } - - public PrincipalDesc getOwnerPrincipal() { - return ownerPrincipal; - } - - public String getLocation() { - return location; - } - - public static HiveParserAlterDatabaseDesc alterProps( - String databaseName, Map dbProperties) { - return new Builder(AlterDBType.ALTER_PROPERTY, databaseName).newProps(dbProperties).build(); - } - - public static HiveParserAlterDatabaseDesc alterOwner( - String databaseName, PrincipalDesc ownerPrincipal) { - return new Builder(AlterDBType.ALTER_OWNER, databaseName).newOwner(ownerPrincipal).build(); - } - - public static HiveParserAlterDatabaseDesc alterLocation(String databaseName, String location) { - return new Builder(AlterDBType.ALTER_LOCATION, databaseName).newLocation(location).build(); - } - - private static class Builder { - private final AlterDBType alterType; - private final String databaseName; - private Map dbProperties; - private PrincipalDesc ownerPrincipal; - private String location; - - Builder(AlterDBType alterType, String databaseName) { - this.alterType = alterType; - this.databaseName = databaseName; - } - - Builder newProps(Map dbProperties) { - this.dbProperties = dbProperties; - return this; - } - - Builder newOwner(PrincipalDesc ownerPrincipal) { - this.ownerPrincipal = ownerPrincipal; - return this; - } - - Builder newLocation(String location) { - this.location = location; - return this; - } - - HiveParserAlterDatabaseDesc build() { - return new HiveParserAlterDatabaseDesc( - alterType, databaseName, dbProperties, ownerPrincipal, location); - } - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserAlterTableDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserAlterTableDesc.java deleted file mode 100644 index bd21d9e266e9a..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserAlterTableDesc.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.ADDCOLS; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.ADDFILEFORMAT; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.ADDPROPS; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.ADDSERDE; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.ADDSERDEPROPS; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.ALTERLOCATION; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.RENAME; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.RENAMECOLUMN; -import static org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes.REPLACECOLS; - -/** Desc for alter table. */ -public class HiveParserAlterTableDesc implements Serializable { - private static final long serialVersionUID = 1L; - - private final AlterTableDesc.AlterTableTypes op; - private final String compoundName; - private final Map partSpec; - private final boolean expectView; - private final Map props; - private final String newName; - private final String serdeName; - private final String newLocation; - private final String oldColName; - private final String newColName; - private final String newColType; - private final String newColComment; - private final boolean first; - private final String after; - private final List newCols; - private final boolean cascade; - - private String genericFileFormatName; - - private HiveParserAlterTableDesc( - AlterTableDesc.AlterTableTypes op, - String compoundName, - Map partSpec, - boolean expectView, - Map props, - String newName, - String serdeName, - String newLocation, - String oldColName, - String newColName, - String newColType, - String newColComment, - boolean first, - String after, - List newCols, - boolean cascade) { - this.op = op; - this.compoundName = compoundName; - this.partSpec = partSpec; - this.expectView = expectView; - this.props = props; - this.newName = newName; - this.serdeName = serdeName; - this.newLocation = newLocation; - this.oldColName = oldColName; - this.newColName = newColName; - this.newColType = newColType; - this.newColComment = newColComment; - this.first = first; - this.after = after; - this.newCols = newCols; - this.cascade = cascade; - } - - public void setGenericFileFormatName(String genericFileFormatName) { - this.genericFileFormatName = genericFileFormatName; - } - - public String getGenericFileFormatName() { - return genericFileFormatName; - } - - public AlterTableDesc.AlterTableTypes getOp() { - return op; - } - - public String getCompoundName() { - return compoundName; - } - - public Map getPartSpec() { - return partSpec; - } - - public boolean expectView() { - return expectView; - } - - public Map getProps() { - return props; - } - - public String getNewName() { - return newName; - } - - public String getSerdeName() { - return serdeName; - } - - public String getNewLocation() { - return newLocation; - } - - public String getOldColName() { - return oldColName; - } - - public String getNewColName() { - return newColName; - } - - public String getNewColType() { - return newColType; - } - - public String getNewColComment() { - return newColComment; - } - - public boolean isFirst() { - return first; - } - - public String getAfter() { - return after; - } - - public List getNewCols() { - return newCols; - } - - public boolean isCascade() { - return cascade; - } - - public static HiveParserAlterTableDesc alterFileFormat( - String compoundName, Map partSpec) { - return new Builder() - .op(ADDFILEFORMAT) - .compoundName(compoundName) - .partSpec(partSpec) - .build(); - } - - public static HiveParserAlterTableDesc changeColumn( - String compoundName, - String oldColName, - String newColName, - String newColType, - String newColComment, - boolean first, - String after, - boolean cascade) { - return new Builder() - .op(RENAMECOLUMN) - .compoundName(compoundName) - .oldColName(oldColName) - .newColName(newColName) - .newColType(newColType) - .newColComment(newColComment) - .first(first) - .after(after) - .cascade(cascade) - .build(); - } - - public static HiveParserAlterTableDesc addReplaceColumns( - String compoundName, List newCols, boolean replace, boolean cascade) { - return new Builder() - .op(replace ? REPLACECOLS : ADDCOLS) - .compoundName(compoundName) - .newCols(newCols) - .cascade(cascade) - .build(); - } - - public static HiveParserAlterTableDesc rename( - String compoundName, String newName, boolean expectView) { - return new Builder() - .op(RENAME) - .compoundName(compoundName) - .newName(newName) - .expectView(expectView) - .build(); - } - - public static HiveParserAlterTableDesc alterTableProps( - String compoundName, - Map partSpec, - Map props, - boolean expectView) { - return new Builder() - .op(ADDPROPS) - .compoundName(compoundName) - .partSpec(partSpec) - .props(props) - .expectView(expectView) - .build(); - } - - public static HiveParserAlterTableDesc alterSerDe( - String compoundName, - Map partSpec, - String serdeName, - Map props) { - return new Builder() - .op(serdeName == null ? ADDSERDEPROPS : ADDSERDE) - .compoundName(compoundName) - .partSpec(partSpec) - .serdeName(serdeName) - .props(props) - .build(); - } - - public static HiveParserAlterTableDesc alterLocation( - String compoundName, Map partSpec, String newLocation) { - return new Builder() - .op(ALTERLOCATION) - .compoundName(compoundName) - .partSpec(partSpec) - .newLocation(newLocation) - .build(); - } - - private static class Builder { - private AlterTableDesc.AlterTableTypes op; - private String compoundName; - private Map partSpec; - private boolean expectView; - private Map props; - private String newName; - private String serdeName; - private String newLocation; - private String oldColName; - private String newColName; - private String newColType; - private String newColComment; - private boolean first; - private String after; - private List newCols; - private boolean cascade; - - Builder op(AlterTableDesc.AlterTableTypes op) { - this.op = op; - return this; - } - - Builder compoundName(String compoundName) { - this.compoundName = compoundName; - return this; - } - - Builder partSpec(Map partSpec) { - this.partSpec = partSpec; - return this; - } - - Builder expectView(boolean expectView) { - this.expectView = expectView; - return this; - } - - Builder props(Map props) { - this.props = props; - return this; - } - - Builder newName(String newName) { - this.newName = newName; - return this; - } - - Builder serdeName(String serdeName) { - this.serdeName = serdeName; - return this; - } - - Builder newLocation(String newLocation) { - this.newLocation = newLocation; - return this; - } - - Builder oldColName(String oldColName) { - this.oldColName = oldColName; - return this; - } - - Builder newColName(String newColName) { - this.newColName = newColName; - return this; - } - - Builder newColType(String newColType) { - this.newColType = newColType; - return this; - } - - Builder newColComment(String newColComment) { - this.newColComment = newColComment; - return this; - } - - Builder first(boolean first) { - this.first = first; - return this; - } - - Builder after(String after) { - this.after = after; - return this; - } - - Builder newCols(List newCols) { - this.newCols = newCols; - return this; - } - - Builder cascade(boolean cascade) { - this.cascade = cascade; - return this; - } - - HiveParserAlterTableDesc build() { - return new HiveParserAlterTableDesc( - op, - compoundName, - partSpec, - expectView, - props, - newName, - serdeName, - newLocation, - oldColName, - newColName, - newColType, - newColComment, - first, - after, - newCols, - cascade); - } - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserCreateTableDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserCreateTableDesc.java deleted file mode 100644 index f925a05fe8b03..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserCreateTableDesc.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer; -import org.apache.flink.table.planner.delegation.hive.copy.HiveParserStorageFormat; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -/** Desc for CREATE TABLE operation. */ -public class HiveParserCreateTableDesc implements Serializable { - private static final long serialVersionUID = 1L; - - private final String compoundName; - private final boolean isExternal; - private final boolean ifNotExists; - private final boolean isTemporary; - private final List cols; - private final List partCols; - private final String comment; - private final String location; - private final Map tblProps; - private final HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams rowFormatParams; - private final HiveParserStorageFormat storageFormat; - private final List primaryKeys; - private final List notNullConstraints; - - public HiveParserCreateTableDesc( - String compoundName, - boolean isExternal, - boolean ifNotExists, - boolean isTemporary, - List cols, - List partCols, - String comment, - String location, - Map tblProps, - HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams rowFormatParams, - HiveParserStorageFormat storageFormat, - List primaryKeys, - List notNullConstraints) { - this.compoundName = compoundName; - this.isExternal = isExternal; - this.ifNotExists = ifNotExists; - this.isTemporary = isTemporary; - this.cols = cols; - this.partCols = partCols; - this.comment = comment; - this.location = location; - this.tblProps = tblProps; - this.rowFormatParams = rowFormatParams; - this.storageFormat = storageFormat; - this.primaryKeys = primaryKeys; - this.notNullConstraints = notNullConstraints; - } - - public String getCompoundName() { - return compoundName; - } - - public boolean isExternal() { - return isExternal; - } - - public boolean ifNotExists() { - return ifNotExists; - } - - public boolean isTemporary() { - return isTemporary; - } - - public List getCols() { - return cols; - } - - public List getPartCols() { - return partCols; - } - - public String getComment() { - return comment; - } - - public String getLocation() { - return location; - } - - public Map getTblProps() { - return tblProps; - } - - public HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams getRowFormatParams() { - return rowFormatParams; - } - - public HiveParserStorageFormat getStorageFormat() { - return storageFormat; - } - - public List getPrimaryKeys() { - return primaryKeys; - } - - public List getNotNullConstraints() { - return notNullConstraints; - } - - /** Counterpart of hive's SQLNotNullConstraint. */ - public static class NotNullConstraint implements Serializable { - - private static final long serialVersionUID = 7642343368203203950L; - - private final String dbName; - private final String tblName; - private final String colName; - private final String constraintName; - private final boolean enable; - private final boolean validate; - private final boolean rely; - - public NotNullConstraint( - String dbName, - String tblName, - String colName, - String constraintName, - boolean enable, - boolean validate, - boolean rely) { - this.dbName = dbName; - this.tblName = tblName; - this.colName = colName; - this.constraintName = constraintName; - this.enable = enable; - this.validate = validate; - this.rely = rely; - } - - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } - - public String getColName() { - return colName; - } - - public String getConstraintName() { - return constraintName; - } - - public boolean isEnable() { - return enable; - } - - public boolean isValidate() { - return validate; - } - - public boolean isRely() { - return rely; - } - } - - /** Counterpart of hive's SQLPrimaryKey. */ - public static class PrimaryKey implements Serializable { - - private static final long serialVersionUID = 3036210046732750293L; - - private final String dbName; - private final String tblName; - private final String pk; - private final String constraintName; - private final boolean enable; - private final boolean validate; - private final boolean rely; - - public PrimaryKey( - String dbName, - String tblName, - String pk, - String constraintName, - boolean enable, - boolean validate, - boolean rely) { - this.dbName = dbName; - this.tblName = tblName; - this.pk = pk; - this.constraintName = constraintName; - this.enable = enable; - this.validate = validate; - this.rely = rely; - } - - public String getDbName() { - return dbName; - } - - public String getTblName() { - return tblName; - } - - public String getPk() { - return pk; - } - - public String getConstraintName() { - return constraintName; - } - - public boolean isEnable() { - return enable; - } - - public boolean isValidate() { - return validate; - } - - public boolean isRely() { - return rely; - } - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropDatabaseDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropDatabaseDesc.java deleted file mode 100644 index 9b4345a2a06b7..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropDatabaseDesc.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import java.io.Serializable; - -/** Desc for drop database. */ -public class HiveParserDropDatabaseDesc implements Serializable { - private static final long serialVersionUID = 1L; - - private final String databaseName; - private final boolean ifExists; - private final boolean cascade; - - public HiveParserDropDatabaseDesc(String databaseName, boolean ifExists, boolean cascade) { - this.databaseName = databaseName; - this.ifExists = ifExists; - this.cascade = cascade; - } - - public String getDatabaseName() { - return databaseName; - } - - public boolean ifExists() { - return ifExists; - } - - public boolean cascade() { - return cascade; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropFunctionDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropFunctionDesc.java deleted file mode 100644 index c48608a6a3f0f..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropFunctionDesc.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import org.apache.hadoop.hive.ql.plan.DropFunctionDesc; - -import java.io.Serializable; - -/** Desc for DROP FUNCTION. */ -public class HiveParserDropFunctionDesc implements Serializable { - - private static final long serialVersionUID = 1L; - - private final DropFunctionDesc desc; - private final boolean ifExists; - - public HiveParserDropFunctionDesc(DropFunctionDesc desc, boolean ifExists) { - this.desc = desc; - this.ifExists = ifExists; - } - - public DropFunctionDesc getDesc() { - return desc; - } - - public boolean ifExists() { - return ifExists; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropTableDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropTableDesc.java deleted file mode 100644 index 11bb4eb092955..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserDropTableDesc.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import java.io.Serializable; - -/** Desc for DROP TABLE/VIEW operation. */ -public class HiveParserDropTableDesc implements Serializable { - private static final long serialVersionUID = 7493000830476614290L; - - private final String compoundName; - private final boolean expectView; - private final boolean ifExists; - private final boolean purge; - - public HiveParserDropTableDesc( - String compoundName, boolean expectView, boolean ifExists, boolean purge) { - this.compoundName = compoundName; - this.expectView = expectView; - this.ifExists = ifExists; - this.purge = purge; - } - - public String getCompoundName() { - return compoundName; - } - - public boolean ifExists() { - return ifExists; - } - - public boolean isPurge() { - return purge; - } - - public boolean isExpectView() { - return expectView; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserShowTablesDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserShowTablesDesc.java deleted file mode 100644 index a89bccb214de4..0000000000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserShowTablesDesc.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.delegation.hive.desc; - -import java.io.Serializable; - -/** Desc for SHOW TABLES/VIEWS operation. */ -public class HiveParserShowTablesDesc implements Serializable { - private static final long serialVersionUID = -3381731226279052381L; - - private final String pattern; - private final String dbName; - private final boolean expectView; - - public HiveParserShowTablesDesc(String pattern, String dbName, boolean expectView) { - this.pattern = pattern; - this.dbName = dbName; - this.expectView = expectView; - } - - public String getPattern() { - return pattern; - } - - public String getDbName() { - return dbName; - } - - public boolean isExpectView() { - return expectView; - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserCreateViewDesc.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserCreateViewInfo.java similarity index 66% rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserCreateViewDesc.java rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserCreateViewInfo.java index ac011fa8deb18..b158dafcb5cce 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/desc/HiveParserCreateViewDesc.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserCreateViewInfo.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.delegation.hive.desc; +package org.apache.flink.table.planner.delegation.hive.parse; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; @@ -24,38 +24,23 @@ import java.io.Serializable; import java.util.List; -import java.util.Map; -/** Desc for create view operation. */ -public class HiveParserCreateViewDesc implements Serializable { +/** Information for create view operation. */ +public class HiveParserCreateViewInfo implements Serializable { private static final long serialVersionUID = 1L; private final String compoundName; - private final String comment; - private final Map tblProps; - private final boolean ifNotExists; - private final boolean isAlterViewAs; private final HiveParserASTNode query; private List schema; private String originalText; private String expandedText; - public HiveParserCreateViewDesc( - String compoundName, - List schema, - String comment, - Map tblProps, - boolean ifNotExists, - boolean isAlterViewAs, - HiveParserASTNode query) { + public HiveParserCreateViewInfo( + String compoundName, List schema, HiveParserASTNode query) { this.compoundName = compoundName; this.schema = schema; - this.comment = comment; - this.tblProps = tblProps; - this.ifNotExists = ifNotExists; - this.isAlterViewAs = isAlterViewAs; this.query = query; } @@ -71,22 +56,6 @@ public void setSchema(List schema) { this.schema = schema; } - public String getComment() { - return comment; - } - - public Map getTblProps() { - return tblProps; - } - - public boolean ifNotExists() { - return ifNotExists; - } - - public boolean isAlterViewAs() { - return isAlterViewAs; - } - public String getOriginalText() { return originalText; } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java index e9f6ef5723484..977ae7caf0dd2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java @@ -18,34 +18,87 @@ package org.apache.flink.table.planner.delegation.hive.parse; +import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; +import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase; +import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.WatermarkSpec; +import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.CatalogViewImpl; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.hive.HiveCatalog; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory; +import org.apache.flink.table.catalog.hive.util.HiveTableUtil; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.operations.CatalogSinkModifyOperation; +import org.apache.flink.table.operations.DescribeTableOperation; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ShowDatabasesOperation; +import org.apache.flink.table.operations.ShowFunctionsOperation; +import org.apache.flink.table.operations.ShowPartitionsOperation; +import org.apache.flink.table.operations.ShowTablesOperation; +import org.apache.flink.table.operations.ShowViewsOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AddPartitionsOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; +import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; +import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; +import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation; +import org.apache.flink.table.operations.ddl.AlterViewAsOperation; +import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation; +import org.apache.flink.table.operations.ddl.AlterViewRenameOperation; +import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateTableASOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; +import org.apache.flink.table.operations.ddl.CreateViewOperation; +import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; +import org.apache.flink.table.operations.ddl.DropPartitionsOperation; +import org.apache.flink.table.operations.ddl.DropTableOperation; +import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; +import org.apache.flink.table.operations.ddl.DropViewOperation; +import org.apache.flink.table.planner.delegation.hive.HiveParser; +import org.apache.flink.table.planner.delegation.hive.HiveParserCalcitePlanner; import org.apache.flink.table.planner.delegation.hive.HiveParserConstants; +import org.apache.flink.table.planner.delegation.hive.HiveParserDMLHelper; import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseUtils; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserAuthorizationParseUtils; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer; +import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams; +import org.apache.flink.table.planner.delegation.hive.copy.HiveParserContext; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState; import org.apache.flink.table.planner.delegation.hive.copy.HiveParserStorageFormat; -import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc; -import org.apache.flink.table.planner.delegation.hive.desc.DropPartitionDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterDatabaseDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.NotNullConstraint; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.PrimaryKey; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropDatabaseDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropFunctionDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropTableDesc; -import org.apache.flink.table.planner.delegation.hive.desc.HiveParserShowTablesDesc; +import org.apache.flink.table.planner.utils.OperationConverterUtils; import org.antlr.runtime.tree.CommonTree; -import org.apache.hadoop.fs.Path; +import org.apache.calcite.rel.RelNode; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; @@ -53,37 +106,19 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.FunctionUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; -import org.apache.hadoop.hive.ql.plan.AlterTableDesc; -import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; -import org.apache.hadoop.hive.ql.plan.DDLWork; -import org.apache.hadoop.hive.ql.plan.DescDatabaseDesc; -import org.apache.hadoop.hive.ql.plan.DescFunctionDesc; -import org.apache.hadoop.hive.ql.plan.DescTableDesc; -import org.apache.hadoop.hive.ql.plan.DropFunctionDesc; -import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.PrincipalDesc; -import org.apache.hadoop.hive.ql.plan.ShowDatabasesDesc; -import org.apache.hadoop.hive.ql.plan.ShowFunctionsDesc; -import org.apache.hadoop.hive.ql.plan.ShowPartitionsDesc; -import org.apache.hadoop.hive.ql.plan.SwitchDatabaseDesc; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -91,11 +126,45 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils.COL_DELIMITER; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_COL_CASCADE; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.ALTER_TABLE_OP; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.ALTER_COLUMNS; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_FILE_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_LOCATION; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_SERDE_PROPS; +import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTable.AlterTableOp.CHANGE_TBL_PROPS; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase.DATABASE_LOCATION_URI; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.COLLECTION_DELIM; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.ESCAPE_CHAR; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.FIELD_DELIM; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.LINE_DELIM; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.MAPKEY_DELIM; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_INFO_PROP_PREFIX; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERDE_LIB_CLASS_NAME; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableRowFormat.SERIALIZATION_NULL_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_FILE_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_INPUT_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.HiveTableStoredAs.STORED_AS_OUTPUT_FORMAT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_COLS; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.NOT_NULL_CONSTRAINT_TRAITS; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.PK_CONSTRAINT_TRAIT; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_IS_EXTERNAL; +import static org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable.TABLE_LOCATION_URI; +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.NotNullConstraint; +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.PrimaryKey; /** * Ported hive's org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer, and also incorporated - * functionalities from SemanticAnalyzer and FunctionSemanticAnalyzer. + * functionalities from SemanticAnalyzer and FunctionSemanticAnalyzer. It's mainly used to convert + * {@link HiveParserASTNode} to the corresponding {@link Operation}. */ public class HiveParserDDLSemanticAnalyzer { private static final Logger LOG = LoggerFactory.getLogger(HiveParserDDLSemanticAnalyzer.class); @@ -105,7 +174,13 @@ public class HiveParserDDLSemanticAnalyzer { private final HiveConf conf; private final HiveParserQueryState queryState; private final HiveCatalog hiveCatalog; + private final CatalogManager catalogManager; private final String currentDB; + private final HiveParser hiveParser; + private final HiveFunctionDefinitionFactory funcDefFactory; + private final HiveShim hiveShim; + private final HiveParserContext context; + private final HiveParserDMLHelper dmlHelper; static { TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME); @@ -160,12 +235,24 @@ public static String getTypeName(HiveParserASTNode node) throws SemanticExceptio } public HiveParserDDLSemanticAnalyzer( - HiveParserQueryState queryState, HiveCatalog hiveCatalog, String currentDB) + HiveParserQueryState queryState, + HiveCatalog hiveCatalog, + CatalogManager catalogManager, + HiveParser hiveParser, + HiveShim hiveShim, + HiveParserContext context, + HiveParserDMLHelper dmlHelper) throws SemanticException { this.queryState = queryState; this.conf = queryState.getConf(); this.hiveCatalog = hiveCatalog; - this.currentDB = currentDB; + this.currentDB = catalogManager.getCurrentDatabase(); + this.catalogManager = catalogManager; + this.hiveParser = hiveParser; + this.funcDefFactory = new HiveFunctionDefinitionFactory(hiveShim); + this.hiveShim = hiveShim; + this.context = context; + this.dmlHelper = dmlHelper; reservedPartitionValues = new HashSet<>(); // Partition can't have this name reservedPartitionValues.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME)); @@ -180,10 +267,6 @@ public HiveParserDDLSemanticAnalyzer( HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_INT_EXTRACTED)); } - private Table getTable(String tableName) throws SemanticException { - return getTable(toObjectPath(tableName)); - } - private Table getTable(ObjectPath tablePath) { try { return new Table(hiveCatalog.getHiveTable(tablePath)); @@ -192,195 +275,71 @@ private Table getTable(ObjectPath tablePath) { } } - private ObjectPath toObjectPath(String name) throws SemanticException { - String[] parts = Utilities.getDbTableName(currentDB, name); - return new ObjectPath(parts[0], parts[1]); - } - - private HashSet getInputs() { - return new HashSet<>(); - } - - private HashSet getOutputs() { - return new HashSet<>(); - } - - public Serializable analyzeInternal(HiveParserASTNode input) throws SemanticException { - - HiveParserASTNode ast = input; - Serializable res = null; + public Operation convertToOperation(HiveParserASTNode ast) throws SemanticException { + Operation res = null; switch (ast.getType()) { case HiveASTParser.TOK_ALTERTABLE: - { - ast = (HiveParserASTNode) input.getChild(1); - String[] qualified = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) input.getChild(0)); - String tableName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); - HashMap partSpec = null; - HiveParserASTNode partSpecNode = (HiveParserASTNode) input.getChild(2); - if (partSpecNode != null) { - partSpec = getPartSpec(partSpecNode); - } - - if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_RENAME) { - res = analyzeAlterTableRename(qualified, ast, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_TOUCH) { - handleUnsupportedOperation(ast); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_ARCHIVE) { - handleUnsupportedOperation(ast); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_UNARCHIVE) { - handleUnsupportedOperation(ast); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_ADDCOLS) { - res = analyzeAlterTableModifyCols(qualified, ast, partSpec, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_REPLACECOLS) { - res = analyzeAlterTableModifyCols(qualified, ast, partSpec, true); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_RENAMECOL) { - res = analyzeAlterTableRenameCol(qualified, ast, partSpec); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_ADDPARTS) { - res = analyzeAlterTableAddParts(qualified, ast, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_DROPPARTS) { - res = analyzeAlterTableDropParts(qualified, ast, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_PARTCOLTYPE) { - handleUnsupportedOperation(ast); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_PROPERTIES) { - res = analyzeAlterTableProps(qualified, null, ast, false, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_DROPPROPERTIES) { - res = analyzeAlterTableProps(qualified, null, ast, false, true); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_UPDATESTATS) { - res = analyzeAlterTableProps(qualified, partSpec, ast, false, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_SKEWED) { - handleUnsupportedOperation(ast); - } else if (ast.getType() == HiveASTParser.TOK_ALTERTABLE_EXCHANGEPARTITION) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_FILEFORMAT) { - res = analyzeAlterTableFileFormat(ast, tableName, partSpec); - } else if (ast.getToken().getType() == HiveASTParser.TOK_ALTERTABLE_LOCATION) { - res = analyzeAlterTableLocation(ast, tableName, partSpec); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_MERGEFILES) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_SERIALIZER) { - res = analyzeAlterTableSerde(ast, tableName, partSpec); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_SERDEPROPERTIES) { - res = analyzeAlterTableSerdeProps(ast, tableName, partSpec); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_RENAMEPART) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_SKEWED_LOCATION) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() == HiveASTParser.TOK_ALTERTABLE_BUCKETS) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_CLUSTER_SORT) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() == HiveASTParser.TOK_ALTERTABLE_COMPACT) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_UPDATECOLSTATS) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_DROPCONSTRAINT) { - handleUnsupportedOperation(ast); - } else if (ast.getToken().getType() - == HiveASTParser.TOK_ALTERTABLE_ADDCONSTRAINT) { - handleUnsupportedOperation(ast); - } else { - throw new ValidationException("Unknown AST node for ALTER TABLE: " + ast); - } - break; - } + res = convertAlterTable(ast); + break; case HiveASTParser.TOK_DROPTABLE: - res = analyzeDropTable(ast, null); + res = convertDropTable(ast, null); break; case HiveASTParser.TOK_DESCTABLE: - res = analyzeDescribeTable(ast); + res = convertDescribeTable(ast); break; case HiveASTParser.TOK_SHOWDATABASES: - res = analyzeShowDatabases(ast); + res = convertShowDatabases(); break; case HiveASTParser.TOK_SHOWTABLES: - res = analyzeShowTables(ast, false); + res = convertShowTables(ast, false); break; case HiveASTParser.TOK_SHOWFUNCTIONS: - res = analyzeShowFunctions(ast); + res = convertShowFunctions(ast); break; case HiveASTParser.TOK_SHOWVIEWS: - res = analyzeShowTables(ast, true); - break; - case HiveASTParser.TOK_DESCFUNCTION: - res = analyzeDescFunction(ast); - break; - case HiveASTParser.TOK_DESCDATABASE: - res = analyzeDescDatabase(ast); + res = convertShowTables(ast, true); break; case HiveASTParser.TOK_DROPVIEW: - res = analyzeDropTable(ast, TableType.VIRTUAL_VIEW); + res = convertDropTable(ast, TableType.VIRTUAL_VIEW); break; case HiveASTParser.TOK_ALTERVIEW: - { - if (ast.getChild(1).getType() == HiveASTParser.TOK_QUERY) { - // alter view as - res = analyzeCreateView(ast); - } else { - String[] qualified = - HiveParserBaseSemanticAnalyzer.getQualifiedTableName( - (HiveParserASTNode) ast.getChild(0)); - ast = (HiveParserASTNode) ast.getChild(1); - if (ast.getType() == HiveASTParser.TOK_ALTERVIEW_PROPERTIES) { - res = analyzeAlterTableProps(qualified, null, ast, true, false); - } else if (ast.getType() == HiveASTParser.TOK_ALTERVIEW_DROPPROPERTIES) { - res = analyzeAlterTableProps(qualified, null, ast, true, true); - } else if (ast.getType() == HiveASTParser.TOK_ALTERVIEW_ADDPARTS) { - handleUnsupportedOperation("ADD PARTITION for view is not supported"); - } else if (ast.getType() == HiveASTParser.TOK_ALTERVIEW_DROPPARTS) { - handleUnsupportedOperation("DROP PARTITION for view is not supported"); - } else if (ast.getType() == HiveASTParser.TOK_ALTERVIEW_RENAME) { - res = analyzeAlterTableRename(qualified, ast, true); - } else { - throw new ValidationException( - "Unknown AST node for ALTER VIEW: " + ast); - } - } - break; - } + res = convertAlterView(ast); + break; case HiveASTParser.TOK_SHOWPARTITIONS: - res = analyzeShowPartitions(ast); + res = convertShowPartitions(ast); break; case HiveASTParser.TOK_CREATEDATABASE: - res = analyzeCreateDatabase(ast); + res = convertCreateDatabase(ast); break; case HiveASTParser.TOK_DROPDATABASE: - res = analyzeDropDatabase(ast); + res = convertDropDatabase(ast); break; case HiveASTParser.TOK_SWITCHDATABASE: - res = analyzeSwitchDatabase(ast); + res = convertSwitchDatabase(ast); break; case HiveASTParser.TOK_ALTERDATABASE_PROPERTIES: - res = analyzeAlterDatabaseProperties(ast); + res = convertAlterDatabaseProperties(ast); break; case HiveASTParser.TOK_ALTERDATABASE_OWNER: - res = analyzeAlterDatabaseOwner(ast); + res = convertAlterDatabaseOwner(ast); break; case HiveASTParser.TOK_ALTERDATABASE_LOCATION: - res = analyzeAlterDatabaseLocation(ast); + res = convertAlterDatabaseLocation(ast); break; case HiveASTParser.TOK_CREATETABLE: - res = analyzeCreateTable(ast); + res = convertCreateTable(ast); break; case HiveASTParser.TOK_CREATEVIEW: - res = analyzeCreateView(ast); + res = convertCreateView(ast); break; case HiveASTParser.TOK_CREATEFUNCTION: - res = analyzerCreateFunction(ast); + res = convertCreateFunction(ast); break; case HiveASTParser.TOK_DROPFUNCTION: - res = analyzeDropFunction(ast); + res = convertDropFunction(ast); break; + case HiveASTParser.TOK_DESCFUNCTION: + case HiveASTParser.TOK_DESCDATABASE: case HiveASTParser.TOK_TRUNCATETABLE: case HiveASTParser.TOK_CREATEINDEX: case HiveASTParser.TOK_DROPINDEX: @@ -422,20 +381,101 @@ public Serializable analyzeInternal(HiveParserASTNode input) throws SemanticExce return res; } - private Serializable analyzeDropFunction(HiveParserASTNode ast) { + private Operation convertAlterTable(HiveParserASTNode input) throws SemanticException { + Operation operation = null; + HiveParserASTNode ast = (HiveParserASTNode) input.getChild(1); + String[] qualified = + HiveParserBaseSemanticAnalyzer.getQualifiedTableName( + (HiveParserASTNode) input.getChild(0)); + String tableName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); + HashMap partSpec = null; + HiveParserASTNode partSpecNode = (HiveParserASTNode) input.getChild(2); + if (partSpecNode != null) { + partSpec = getPartSpec(partSpecNode); + } + CatalogBaseTable alteredTable = getAlteredTable(tableName, false); + switch (ast.getType()) { + case HiveASTParser.TOK_ALTERTABLE_RENAME: + operation = convertAlterTableRename(tableName, ast, false); + break; + case HiveASTParser.TOK_ALTERTABLE_ADDCOLS: + operation = convertAlterTableModifyCols(alteredTable, tableName, ast, false); + break; + case HiveASTParser.TOK_ALTERTABLE_REPLACECOLS: + operation = convertAlterTableModifyCols(alteredTable, tableName, ast, true); + break; + case HiveASTParser.TOK_ALTERTABLE_RENAMECOL: + operation = convertAlterTableChangeCol(alteredTable, qualified, ast); + break; + case HiveASTParser.TOK_ALTERTABLE_ADDPARTS: + operation = convertAlterTableAddParts(qualified, ast); + break; + case HiveASTParser.TOK_ALTERTABLE_DROPPARTS: + operation = convertAlterTableDropParts(qualified, ast); + break; + case HiveASTParser.TOK_ALTERTABLE_PROPERTIES: + operation = + convertAlterTableProps(alteredTable, tableName, null, ast, false, false); + break; + case HiveASTParser.TOK_ALTERTABLE_DROPPROPERTIES: + operation = convertAlterTableProps(alteredTable, tableName, null, ast, false, true); + break; + case HiveASTParser.TOK_ALTERTABLE_UPDATESTATS: + operation = + convertAlterTableProps( + alteredTable, tableName, partSpec, ast, false, false); + break; + case HiveASTParser.TOK_ALTERTABLE_FILEFORMAT: + operation = convertAlterTableFileFormat(alteredTable, ast, tableName, partSpec); + break; + case HiveASTParser.TOK_ALTERTABLE_LOCATION: + operation = convertAlterTableLocation(alteredTable, ast, tableName, partSpec); + break; + case HiveASTParser.TOK_ALTERTABLE_SERIALIZER: + operation = convertAlterTableSerde(alteredTable, ast, tableName, partSpec); + break; + case HiveASTParser.TOK_ALTERTABLE_SERDEPROPERTIES: + operation = convertAlterTableSerdeProps(alteredTable, ast, tableName, partSpec); + break; + case HiveASTParser.TOK_ALTERTABLE_TOUCH: + case HiveASTParser.TOK_ALTERTABLE_ARCHIVE: + case HiveASTParser.TOK_ALTERTABLE_UNARCHIVE: + case HiveASTParser.TOK_ALTERTABLE_PARTCOLTYPE: + case HiveASTParser.TOK_ALTERTABLE_SKEWED: + case HiveASTParser.TOK_ALTERTABLE_EXCHANGEPARTITION: + case HiveASTParser.TOK_ALTERTABLE_MERGEFILES: + case HiveASTParser.TOK_ALTERTABLE_RENAMEPART: + case HiveASTParser.TOK_ALTERTABLE_SKEWED_LOCATION: + case HiveASTParser.TOK_ALTERTABLE_BUCKETS: + case HiveASTParser.TOK_ALTERTABLE_CLUSTER_SORT: + case HiveASTParser.TOK_ALTERTABLE_COMPACT: + case HiveASTParser.TOK_ALTERTABLE_UPDATECOLSTATS: + case HiveASTParser.TOK_ALTERTABLE_DROPCONSTRAINT: + case HiveASTParser.TOK_ALTERTABLE_ADDCONSTRAINT: + handleUnsupportedOperation(ast); + break; + default: + throw new ValidationException("Unknown AST node for ALTER TABLE: " + ast); + } + return operation; + } + + private Operation convertDropFunction(HiveParserASTNode ast) { // ^(TOK_DROPFUNCTION identifier ifExists? $temp?) String functionName = ast.getChild(0).getText(); boolean ifExists = (ast.getFirstChildWithType(HiveASTParser.TOK_IFEXISTS) != null); boolean isTemporaryFunction = (ast.getFirstChildWithType(HiveASTParser.TOK_TEMPORARY) != null); - DropFunctionDesc desc = new DropFunctionDesc(); - desc.setFunctionName(functionName); - desc.setTemp(isTemporaryFunction); - return new HiveParserDropFunctionDesc(desc, ifExists); + if (isTemporaryFunction) { + return new DropTempSystemFunctionOperation(functionName, ifExists); + } else { + ObjectIdentifier identifier = parseObjectIdentifier(functionName); + return new DropCatalogFunctionOperation(identifier, ifExists, false); + } } - private Serializable analyzerCreateFunction(HiveParserASTNode ast) { + private Operation convertCreateFunction(HiveParserASTNode ast) { // ^(TOK_CREATEFUNCTION identifier StringLiteral ({isTempFunction}? => TOK_TEMPORARY)) String functionName = ast.getChild(0).getText().toLowerCase(); boolean isTemporaryFunction = @@ -445,19 +485,62 @@ private Serializable analyzerCreateFunction(HiveParserASTNode ast) { // Temp functions are not allowed to have qualified names. if (isTemporaryFunction && FunctionUtils.isQualifiedFunctionName(functionName)) { + // hive's temporary function is more like flink's temp system function, e.g. doesn't + // belong to a catalog/db throw new ValidationException( "Temporary function cannot be created with a qualified name."); } - CreateFunctionDesc desc = new CreateFunctionDesc(); - desc.setFunctionName(functionName); - desc.setTemp(isTemporaryFunction); - desc.setClassName(className); - desc.setResources(Collections.emptyList()); - return new FunctionWork(desc); + if (isTemporaryFunction) { + FunctionDefinition funcDefinition = + funcDefFactory.createFunctionDefinition( + functionName, + new CatalogFunctionImpl(className, FunctionLanguage.JAVA)); + return new CreateTempSystemFunctionOperation(functionName, false, funcDefinition); + } else { + ObjectIdentifier identifier = parseObjectIdentifier(functionName); + CatalogFunction catalogFunction = + new CatalogFunctionImpl(className, FunctionLanguage.JAVA); + return new CreateCatalogFunctionOperation(identifier, catalogFunction, false, false); + } + } + + private Operation convertAlterView(HiveParserASTNode ast) throws SemanticException { + Operation operation = null; + String[] qualified = + HiveParserBaseSemanticAnalyzer.getQualifiedTableName( + (HiveParserASTNode) ast.getChild(0)); + String tableName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); + CatalogBaseTable alteredTable = getAlteredTable(tableName, true); + if (ast.getChild(1).getType() == HiveASTParser.TOK_QUERY) { + // alter view as + operation = convertCreateView(ast); + } else { + ast = (HiveParserASTNode) ast.getChild(1); + switch (ast.getType()) { + case HiveASTParser.TOK_ALTERVIEW_PROPERTIES: + operation = + convertAlterTableProps(alteredTable, tableName, null, ast, true, false); + break; + case HiveASTParser.TOK_ALTERVIEW_DROPPROPERTIES: + operation = + convertAlterTableProps(alteredTable, tableName, null, ast, true, true); + break; + case HiveASTParser.TOK_ALTERVIEW_RENAME: + operation = convertAlterTableRename(tableName, ast, true); + break; + case HiveASTParser.TOK_ALTERVIEW_ADDPARTS: + case HiveASTParser.TOK_ALTERVIEW_DROPPARTS: + handleUnsupportedOperation("ADD/DROP PARTITION for view is not supported"); + break; + default: + throw new ValidationException("Unknown AST node for ALTER VIEW: " + ast); + } + } + return operation; } - private Serializable analyzeCreateView(HiveParserASTNode ast) throws SemanticException { + private Operation convertCreateView(HiveParserASTNode ast) throws SemanticException { String[] qualTabName = HiveParserBaseSemanticAnalyzer.getQualifiedTableName( (HiveParserASTNode) ast.getChild(0)); @@ -473,8 +556,6 @@ private Serializable analyzeCreateView(HiveParserASTNode ast) throws SemanticExc if (isMaterialized) { handleUnsupportedOperation("MATERIALIZED VIEW is not supported"); } - HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams rowFormatParams = - new HiveParserBaseSemanticAnalyzer.HiveParserRowFormatParams(); HiveParserStorageFormat storageFormat = new HiveParserStorageFormat(conf); LOG.info("Creating view " + dbDotTable + " position=" + ast.getCharPositionInLine()); @@ -532,11 +613,43 @@ private Serializable analyzeCreateView(HiveParserASTNode ast) throws SemanticExc } queryState.setCommandType(HiveOperation.CREATEVIEW); - return new HiveParserCreateViewDesc( - dbDotTable, cols, comment, tblProps, ifNotExists, isAlterViewAs, selectStmt); + + HiveParserCreateViewInfo createViewInfo = + new HiveParserCreateViewInfo(dbDotTable, cols, selectStmt); + hiveParser.analyzeCreateView(createViewInfo, context, queryState, hiveShim); + + ObjectIdentifier viewIdentifier = parseObjectIdentifier(createViewInfo.getCompoundName()); + TableSchema schema = + HiveTableUtil.createTableSchema( + createViewInfo.getSchema(), + Collections.emptyList(), + Collections.emptySet(), + null); + Map props = new HashMap<>(); + if (isAlterViewAs) { + CatalogBaseTable baseTable = getCatalogBaseTable(viewIdentifier); + props.putAll(baseTable.getOptions()); + comment = baseTable.getComment(); + } else { + if (tblProps != null) { + props.putAll(tblProps); + } + } + CatalogView catalogView = + new CatalogViewImpl( + createViewInfo.getOriginalText(), + createViewInfo.getExpandedText(), + schema, + props, + comment); + if (isAlterViewAs) { + return new AlterViewAsOperation(viewIdentifier, catalogView); + } else { + return new CreateViewOperation(viewIdentifier, catalogView, ifNotExists, false); + } } - private Serializable analyzeCreateTable(HiveParserASTNode ast) throws SemanticException { + private Operation convertCreateTable(HiveParserASTNode ast) throws SemanticException { String[] qualifiedTabName = HiveParserBaseSemanticAnalyzer.getQualifiedTableName( (HiveParserASTNode) ast.getChild(0)); @@ -694,7 +807,7 @@ private Serializable analyzeCreateTable(HiveParserASTNode ast) throws SemanticEx switch (commandType) { case createTable: // REGULAR CREATE TABLE DDL tblProps = addDefaultProperties(tblProps); - return new HiveParserCreateTableDesc( + return convertCreateTable( dbDotTab, isExt, ifNotExists, @@ -716,8 +829,26 @@ private Serializable analyzeCreateTable(HiveParserASTNode ast) throws SemanticEx case ctas: // create table as select tblProps = addDefaultProperties(tblProps); - HiveParserCreateTableDesc createTableDesc = - new HiveParserCreateTableDesc( + // analyze the query + HiveParserCalcitePlanner calcitePlanner = + hiveParser.createCalcitePlanner(context, queryState, hiveShim); + calcitePlanner.setCtasCols(cols); + RelNode queryRelNode = calcitePlanner.genLogicalPlan(selectStmt); + // create a table to represent the dest table + String[] dbTblName = dbDotTab.split("\\."); + Table destTable = new Table(Table.getEmptyTable(dbTblName[0], dbTblName[1])); + destTable.getSd().setCols(cols); + // create the insert operation + CatalogSinkModifyOperation insertOperation = + dmlHelper.createInsertOperation( + queryRelNode, + destTable, + Collections.emptyMap(), + Collections.emptyList(), + false); + + CreateTableOperation createTableOperation = + convertCreateTable( dbDotTab, isExt, ifNotExists, @@ -731,33 +862,176 @@ private Serializable analyzeCreateTable(HiveParserASTNode ast) throws SemanticEx storageFormat, primaryKeys, notNulls); - return new CreateTableASDesc(createTableDesc, selectStmt); + + return new CreateTableASOperation(createTableOperation, insertOperation); default: throw new ValidationException("Unrecognized command."); } } - private Serializable analyzeAlterDatabaseProperties(HiveParserASTNode ast) { + private CreateTableOperation convertCreateTable( + String compoundName, + boolean isExternal, + boolean ifNotExists, + boolean isTemporary, + List cols, + List partCols, + String comment, + String location, + Map tblProps, + HiveParserRowFormatParams rowFormatParams, + HiveParserStorageFormat storageFormat, + List primaryKeys, + List notNullConstraints) { + Map props = new HashMap<>(); + if (tblProps != null) { + props.putAll(tblProps); + } + markHiveConnector(props); + // external + if (isExternal) { + props.put(TABLE_IS_EXTERNAL, "true"); + } + // PK trait + UniqueConstraint uniqueConstraint = null; + if (primaryKeys != null && !primaryKeys.isEmpty()) { + PrimaryKey primaryKey = primaryKeys.get(0); + byte trait = 0; + if (primaryKey.isEnable()) { + trait = HiveDDLUtils.enableConstraint(trait); + } + if (primaryKey.isValidate()) { + trait = HiveDDLUtils.validateConstraint(trait); + } + if (primaryKey.isRely()) { + trait = HiveDDLUtils.relyConstraint(trait); + } + props.put(PK_CONSTRAINT_TRAIT, String.valueOf(trait)); + List pkCols = + primaryKeys.stream().map(PrimaryKey::getPk).collect(Collectors.toList()); + String constraintName = primaryKey.getConstraintName(); + if (constraintName == null) { + constraintName = pkCols.stream().collect(Collectors.joining("_", "PK_", "")); + } + uniqueConstraint = UniqueConstraint.primaryKey(constraintName, pkCols); + } + // NOT NULL constraints + List notNullCols = new ArrayList<>(); + if (!notNullConstraints.isEmpty()) { + List traits = new ArrayList<>(); + for (NotNullConstraint notNull : notNullConstraints) { + byte trait = 0; + if (notNull.isEnable()) { + trait = HiveDDLUtils.enableConstraint(trait); + } + if (notNull.isValidate()) { + trait = HiveDDLUtils.validateConstraint(trait); + } + if (notNull.isRely()) { + trait = HiveDDLUtils.relyConstraint(trait); + } + traits.add(String.valueOf(trait)); + notNullCols.add(notNull.getColName()); + } + props.put(NOT_NULL_CONSTRAINT_TRAITS, String.join(COL_DELIMITER, traits)); + props.put(NOT_NULL_COLS, String.join(COL_DELIMITER, notNullCols)); + } + // row format + if (rowFormatParams != null) { + encodeRowFormat(rowFormatParams, props); + } + // storage format + if (storageFormat != null) { + encodeStorageFormat(storageFormat, props); + } + // location + if (location != null) { + props.put(TABLE_LOCATION_URI, location); + } + ObjectIdentifier identifier = parseObjectIdentifier(compoundName); + Set notNullColSet = new HashSet<>(notNullCols); + if (uniqueConstraint != null) { + notNullColSet.addAll(uniqueConstraint.getColumns()); + } + TableSchema tableSchema = + HiveTableUtil.createTableSchema(cols, partCols, notNullColSet, uniqueConstraint); + return new CreateTableOperation( + identifier, + new CatalogTableImpl( + tableSchema, HiveCatalog.getFieldNames(partCols), props, comment), + ifNotExists, + isTemporary); + } + private void markHiveConnector(Map props) { + props.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); + } + + private void encodeRowFormat( + HiveParserRowFormatParams rowFormatParams, Map props) { + if (rowFormatParams.getFieldDelim() != null) { + props.put(FIELD_DELIM, rowFormatParams.getFieldDelim()); + } + if (rowFormatParams.getCollItemDelim() != null) { + props.put(COLLECTION_DELIM, rowFormatParams.getCollItemDelim()); + } + if (rowFormatParams.getMapKeyDelim() != null) { + props.put(MAPKEY_DELIM, rowFormatParams.getMapKeyDelim()); + } + if (rowFormatParams.getFieldEscape() != null) { + props.put(ESCAPE_CHAR, rowFormatParams.getFieldEscape()); + } + if (rowFormatParams.getLineDelim() != null) { + props.put(LINE_DELIM, rowFormatParams.getLineDelim()); + } + if (rowFormatParams.getNullFormat() != null) { + props.put(SERIALIZATION_NULL_FORMAT, rowFormatParams.getNullFormat()); + } + } + + private void encodeStorageFormat( + HiveParserStorageFormat storageFormat, Map props) { + String serdeName = storageFormat.getSerde(); + if (serdeName != null) { + props.put(SERDE_LIB_CLASS_NAME, serdeName); + } + Map serdeProps = storageFormat.getSerdeProps(); + if (serdeProps != null) { + for (String serdeKey : serdeProps.keySet()) { + props.put(SERDE_INFO_PROP_PREFIX + serdeKey, serdeProps.get(serdeKey)); + } + } + if (storageFormat.getInputFormat() != null) { + props.put(STORED_AS_INPUT_FORMAT, storageFormat.getInputFormat()); + } + if (storageFormat.getOutputFormat() != null) { + props.put(STORED_AS_OUTPUT_FORMAT, storageFormat.getOutputFormat()); + } + } + + private Operation convertAlterDatabaseProperties(HiveParserASTNode ast) { String dbName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText()); Map dbProps = null; for (int i = 1; i < ast.getChildCount(); i++) { HiveParserASTNode childNode = (HiveParserASTNode) ast.getChild(i); - switch (childNode.getToken().getType()) { - case HiveASTParser.TOK_DATABASEPROPERTIES: - dbProps = getProps((HiveParserASTNode) childNode.getChild(0)); - break; - default: - throw new ValidationException( - "Unknown AST node for ALTER DATABASE PROPERTIES: " + childNode); + if (childNode.getToken().getType() == HiveASTParser.TOK_DATABASEPROPERTIES) { + dbProps = getProps((HiveParserASTNode) childNode.getChild(0)); + } else { + throw new ValidationException( + "Unknown AST node for ALTER DATABASE PROPERTIES: " + childNode); } } - return HiveParserAlterDatabaseDesc.alterProps(dbName, dbProps); + CatalogDatabase originDB = getDatabase(dbName); + Map props = new HashMap<>(originDB.getProperties()); + props.put(ALTER_DATABASE_OP, SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_PROPS.name()); + props.putAll(dbProps); + CatalogDatabase newDB = new CatalogDatabaseImpl(props, originDB.getComment()); + return new AlterDatabaseOperation(catalogManager.getCurrentCatalog(), dbName, newDB); } - private Serializable analyzeAlterDatabaseOwner(HiveParserASTNode ast) { + private Operation convertAlterDatabaseOwner(HiveParserASTNode ast) { String dbName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); @@ -773,20 +1047,31 @@ private Serializable analyzeAlterDatabaseOwner(HiveParserASTNode ast) { if (principalDesc.getType() == null) { throw new ValidationException("Owner type " + nullCmdMsg); } - - return HiveParserAlterDatabaseDesc.alterOwner(dbName, principalDesc); + CatalogDatabase originDB = getDatabase(dbName); + Map props = new HashMap<>(originDB.getProperties()); + props.put(ALTER_DATABASE_OP, SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_OWNER.name()); + props.put(DATABASE_OWNER_NAME, principalDesc.getName()); + props.put(DATABASE_OWNER_TYPE, principalDesc.getType().name().toLowerCase()); + CatalogDatabase newDB = new CatalogDatabaseImpl(props, originDB.getComment()); + return new AlterDatabaseOperation(catalogManager.getCurrentCatalog(), dbName, newDB); } - private Serializable analyzeAlterDatabaseLocation(HiveParserASTNode ast) { + private Operation convertAlterDatabaseLocation(HiveParserASTNode ast) { String dbName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); String newLocation = HiveParserBaseSemanticAnalyzer.unescapeSQLString(ast.getChild(1).getText()); - return HiveParserAlterDatabaseDesc.alterLocation(dbName, newLocation); + CatalogDatabase originDB = getDatabase(dbName); + Map props = new HashMap<>(originDB.getProperties()); + props.put( + ALTER_DATABASE_OP, SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_LOCATION.name()); + props.put(DATABASE_LOCATION_URI, newLocation); + CatalogDatabase newDB = new CatalogDatabaseImpl(props, originDB.getComment()); + return new AlterDatabaseOperation(catalogManager.getCurrentCatalog(), dbName, newDB); } - private Serializable analyzeCreateDatabase(HiveParserASTNode ast) { + private Operation convertCreateDatabase(HiveParserASTNode ast) { String dbName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText()); boolean ifNotExists = false; @@ -819,15 +1104,20 @@ private Serializable analyzeCreateDatabase(HiveParserASTNode ast) { } } - CreateDatabaseDesc createDatabaseDesc = - new CreateDatabaseDesc(dbName, dbComment, dbLocation, ifNotExists); + Map props = new HashMap<>(); if (dbProps != null) { - createDatabaseDesc.setDatabaseProperties(dbProps); + props.putAll(dbProps); } - return new DDLWork(getInputs(), getOutputs(), createDatabaseDesc); + + if (dbLocation != null) { + props.put(DATABASE_LOCATION_URI, dbLocation); + } + CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(props, dbComment); + return new CreateDatabaseOperation( + catalogManager.getCurrentCatalog(), dbName, catalogDatabase, ifNotExists); } - private Serializable analyzeDropDatabase(HiveParserASTNode ast) { + private Operation convertDropDatabase(HiveParserASTNode ast) { String dbName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText()); boolean ifExists = false; @@ -841,67 +1131,53 @@ private Serializable analyzeDropDatabase(HiveParserASTNode ast) { ifCascade = true; } - return new HiveParserDropDatabaseDesc(dbName, ifExists, ifCascade); + return new DropDatabaseOperation( + catalogManager.getCurrentCatalog(), dbName, ifExists, ifCascade); } - private Serializable analyzeSwitchDatabase(HiveParserASTNode ast) { + private Operation convertSwitchDatabase(HiveParserASTNode ast) { String dbName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText()); - SwitchDatabaseDesc switchDatabaseDesc = new SwitchDatabaseDesc(dbName); - return new DDLWork(new HashSet<>(), new HashSet<>(), switchDatabaseDesc); + return new UseDatabaseOperation(catalogManager.getCurrentCatalog(), dbName); } - private Serializable analyzeDropTable(HiveParserASTNode ast, TableType expectedType) { + private Operation convertDropTable(HiveParserASTNode ast, TableType expectedType) { String tableName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); boolean ifExists = (ast.getFirstChildWithType(HiveASTParser.TOK_IFEXISTS) != null); - boolean ifPurge = (ast.getFirstChildWithType(HiveASTParser.KW_PURGE) != null); - return new HiveParserDropTableDesc( - tableName, expectedType == TableType.VIRTUAL_VIEW, ifExists, ifPurge); - } + ObjectIdentifier identifier = parseObjectIdentifier(tableName); + CatalogBaseTable baseTable = getCatalogBaseTable(identifier, true); - private void validateAlterTableType( - Table tbl, AlterTableDesc.AlterTableTypes op, boolean expectView) { - if (tbl.isView()) { - if (!expectView) { - throw new ValidationException(ErrorMsg.ALTER_COMMAND_FOR_VIEWS.getMsg()); - } - - switch (op) { - case ADDPARTITION: - case DROPPARTITION: - case RENAMEPARTITION: - case ADDPROPS: - case DROPPROPS: - case RENAME: - // allow this form - break; - default: - throw new ValidationException( - ErrorMsg.ALTER_VIEW_DISALLOWED_OP.getMsg(op.toString())); + if (expectedType == TableType.VIRTUAL_VIEW) { + if (baseTable instanceof CatalogTable) { + throw new ValidationException("DROP VIEW for a table is not allowed"); } + return new DropViewOperation(identifier, ifExists, false); } else { - if (expectView) { - throw new ValidationException(ErrorMsg.ALTER_COMMAND_FOR_TABLES.getMsg()); + if (baseTable instanceof CatalogView) { + throw new ValidationException("DROP TABLE for a view is not allowed"); } + return new DropTableOperation(identifier, ifExists, false); } + } + + private void validateAlterTableType(Table tbl) { if (tbl.isNonNative()) { throw new ValidationException( ErrorMsg.ALTER_TABLE_NON_NATIVE.getMsg(tbl.getTableName())); } } - private Serializable analyzeAlterTableProps( - String[] qualified, + private Operation convertAlterTableProps( + CatalogBaseTable alteredTable, + String tableName, HashMap partSpec, HiveParserASTNode ast, boolean expectView, - boolean isUnset) - throws SemanticException { + boolean isUnset) { - String tableName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); HashMap mapProp = getProps((HiveParserASTNode) (ast.getChild(0)).getChild(0)); // we need to check if the properties are valid, especially for stats. @@ -939,38 +1215,89 @@ private Serializable analyzeAlterTableProps( } } } - HiveParserAlterTableDesc alterTblDesc = null; if (isUnset) { handleUnsupportedOperation("Unset properties not supported"); + } + + if (expectView) { + return convertAlterViewProps(alteredTable, tableName, mapProp); } else { - alterTblDesc = - HiveParserAlterTableDesc.alterTableProps( - tableName, partSpec, mapProp, expectView); + Map newProps = new HashMap<>(); + newProps.put(ALTER_TABLE_OP, CHANGE_TBL_PROPS.name()); + newProps.putAll(mapProp); + return convertAlterTableProps(alteredTable, tableName, partSpec, newProps); } + } - return alterTblDesc; + private Operation convertAlterTableProps( + CatalogBaseTable oldBaseTable, + String tableName, + Map partSpec, + Map newProps) { + ObjectIdentifier tableIdentifier = parseObjectIdentifier(tableName); + CatalogTable oldTable = (CatalogTable) oldBaseTable; + CatalogPartitionSpec catalogPartitionSpec = + partSpec != null ? new CatalogPartitionSpec(partSpec) : null; + CatalogPartition catalogPartition = + partSpec != null ? getPartition(tableIdentifier, catalogPartitionSpec) : null; + + Map props = new HashMap<>(); + if (catalogPartition != null) { + props.putAll(catalogPartition.getProperties()); + props.putAll(newProps); + return new AlterPartitionPropertiesOperation( + tableIdentifier, + catalogPartitionSpec, + new CatalogPartitionImpl(props, catalogPartition.getComment())); + } else { + props.putAll(oldTable.getOptions()); + props.putAll(newProps); + return new AlterTableOptionsOperation(tableIdentifier, oldTable.copy(props)); + } } - private Serializable analyzeAlterTableSerdeProps( - HiveParserASTNode ast, String tableName, HashMap partSpec) { + private Operation convertAlterTableSerdeProps( + CatalogBaseTable alteredTable, + HiveParserASTNode ast, + String tableName, + HashMap partSpec) { HashMap mapProp = getProps((HiveParserASTNode) (ast.getChild(0)).getChild(0)); - return HiveParserAlterTableDesc.alterSerDe(tableName, partSpec, null, mapProp); + Map newProps = new HashMap<>(); + newProps.put(ALTER_TABLE_OP, CHANGE_SERDE_PROPS.name()); + for (String key : mapProp.keySet()) { + newProps.put(SERDE_INFO_PROP_PREFIX + key, mapProp.get(key)); + } + return convertAlterTableProps(alteredTable, tableName, partSpec, newProps); } - private Serializable analyzeAlterTableSerde( - HiveParserASTNode ast, String tableName, HashMap partSpec) { + private Operation convertAlterTableSerde( + CatalogBaseTable alteredTable, + HiveParserASTNode ast, + String tableName, + HashMap partSpec) { String serdeName = HiveParserBaseSemanticAnalyzer.unescapeSQLString(ast.getChild(0).getText()); HashMap mapProp = null; if (ast.getChildCount() > 1) { mapProp = getProps((HiveParserASTNode) (ast.getChild(1)).getChild(0)); } - return HiveParserAlterTableDesc.alterSerDe(tableName, partSpec, serdeName, mapProp); + Map newProps = new HashMap<>(); + newProps.put(ALTER_TABLE_OP, CHANGE_SERDE_PROPS.name()); + newProps.put(SERDE_LIB_CLASS_NAME, serdeName); + if (mapProp != null) { + for (String key : mapProp.keySet()) { + newProps.put(SERDE_INFO_PROP_PREFIX + key, mapProp.get(key)); + } + } + return convertAlterTableProps(alteredTable, tableName, partSpec, newProps); } - private Serializable analyzeAlterTableFileFormat( - HiveParserASTNode ast, String tableName, HashMap partSpec) + private Operation convertAlterTableFileFormat( + CatalogBaseTable alteredTable, + HiveParserASTNode ast, + String tableName, + HashMap partSpec) throws SemanticException { HiveParserStorageFormat format = new HiveParserStorageFormat(conf); @@ -980,18 +1307,24 @@ private Serializable analyzeAlterTableFileFormat( throw new ValidationException("Unknown AST node for ALTER TABLE FILEFORMAT: " + child); } - HiveParserAlterTableDesc alterTblDesc = - HiveParserAlterTableDesc.alterFileFormat(tableName, partSpec); - alterTblDesc.setGenericFileFormatName(format.getGenericName()); - - return alterTblDesc; + Map newProps = new HashMap<>(); + newProps.put(ALTER_TABLE_OP, CHANGE_FILE_FORMAT.name()); + newProps.put(STORED_AS_FILE_FORMAT, format.getGenericName()); + return convertAlterTableProps(alteredTable, tableName, partSpec, newProps); } - private Serializable analyzeAlterTableLocation( - HiveParserASTNode ast, String tableName, HashMap partSpec) { + private Operation convertAlterTableLocation( + CatalogBaseTable alteredTable, + HiveParserASTNode ast, + String tableName, + HashMap partSpec) { String newLocation = HiveParserBaseSemanticAnalyzer.unescapeSQLString(ast.getChild(0).getText()); - return HiveParserAlterTableDesc.alterLocation(tableName, partSpec, newLocation); + Map newProps = new HashMap<>(); + newProps.put(ALTER_TABLE_OP, CHANGE_LOCATION.name()); + newProps.put(TABLE_LOCATION_URI, newLocation); + + return convertAlterTableProps(alteredTable, tableName, partSpec, newProps); } public static HashMap getProps(HiveParserASTNode prop) { @@ -1053,7 +1386,7 @@ public static String getColPath( if (dbName == null) { return tableName + "." + QualifiedNameUtil.getFullyQualifiedName(columnNode); } else { - return tableName.substring(dbName.length() + 1, tableName.length()) + return tableName.substring(dbName.length() + 1) + "." + QualifiedNameUtil.getFullyQualifiedName(columnNode); } @@ -1063,9 +1396,7 @@ public static String getColPath( } // get partition metadata - public static Map getPartitionSpec( - HiveCatalog db, HiveParserASTNode ast, ObjectPath tablePath) - throws SemanticException { + public static Map getPartitionSpec(HiveParserASTNode ast) { HiveParserASTNode partNode = null; // if this ast has only one child, then no partition spec specified. if (ast.getChildCount() == 1) { @@ -1093,22 +1424,17 @@ public static Map getPartitionSpec( } } - private void validateTable(String tableName, Map partSpec) - throws SemanticException { - Table tab = getTable(tableName); - if (partSpec != null) { - getPartition(tab, partSpec); - } - } - - private void getPartition(Table table, Map partSpec) { - try { - hiveCatalog.getPartition( - new ObjectPath(table.getDbName(), table.getTableName()), - new CatalogPartitionSpec(partSpec)); - } catch (PartitionNotExistException e) { - throw new ValidationException("Partition not found", e); - } + private CatalogPartition getPartition( + ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) { + return catalogManager + .getPartition(tableIdentifier, partitionSpec) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Partition %s of table %s doesn't exist", + partitionSpec.getPartitionSpec(), + tableIdentifier))); } /** @@ -1117,7 +1443,7 @@ private void getPartition(Table table, Map partSpec) { * specified default maptable TOK_PARTSPEC --> root node for partition spec. else columnName * TOK_PARTVAL b 100 id --> root node for columnName formatted */ - private Serializable analyzeDescribeTable(HiveParserASTNode ast) throws SemanticException { + private Operation convertDescribeTable(HiveParserASTNode ast) { HiveParserASTNode tableTypeExpr = (HiveParserASTNode) ast.getChild(0); String dbName = null; @@ -1143,9 +1469,7 @@ private Serializable analyzeDescribeTable(HiveParserASTNode ast) throws Semantic } // process the second child,if exists, node to get partition spec(s) - partSpec = - QualifiedNameUtil.getPartitionSpec( - hiveCatalog, tableTypeExpr, toObjectPath(tableName)); + partSpec = QualifiedNameUtil.getPartitionSpec(tableTypeExpr); // process the third child node,if exists, to get partition spec(s) colPath = QualifiedNameUtil.getColPath(tableTypeExpr, dbName, tableName, partSpec); @@ -1157,37 +1481,19 @@ private Serializable analyzeDescribeTable(HiveParserASTNode ast) throws Semantic handleUnsupportedOperation("DESCRIBE COLUMNS is not supported"); } - DescTableDesc descTblDesc = new DescTableDesc(getResFile(), tableName, partSpec, colPath); - + boolean isExt = false; + boolean isFormatted = false; if (ast.getChildCount() == 2) { int descOptions = ast.getChild(1).getType(); - descTblDesc.setFormatted(descOptions == HiveASTParser.KW_FORMATTED); - descTblDesc.setExt(descOptions == HiveASTParser.KW_EXTENDED); + isExt = descOptions == HiveASTParser.KW_EXTENDED; + isFormatted = descOptions == HiveASTParser.KW_FORMATTED; if (descOptions == HiveASTParser.KW_PRETTY) { handleUnsupportedOperation("DESCRIBE PRETTY is not supported."); } } - return new DDLWork(getInputs(), getOutputs(), descTblDesc); - } - - private Serializable analyzeDescDatabase(HiveParserASTNode ast) { - - boolean isExtended; - String dbName; - - if (ast.getChildCount() == 1) { - dbName = HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText()); - isExtended = false; - } else if (ast.getChildCount() == 2) { - dbName = HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText()); - isExtended = true; - } else { - throw new ValidationException("Unexpected Tokens at DESCRIBE DATABASE"); - } - - DescDatabaseDesc descDbDesc = new DescDatabaseDesc(getResFile(), dbName, isExtended); - return new DDLWork(getInputs(), getOutputs(), descDbDesc); + ObjectIdentifier tableIdentifier = parseObjectIdentifier(tableName); + return new DescribeTableOperation(tableIdentifier, isExt || isFormatted); } public static HashMap getPartSpec(HiveParserASTNode partspec) { @@ -1209,8 +1515,22 @@ public static HashMap getPartSpec(HiveParserASTNode partspec) { return partSpec; } - private Serializable analyzeShowPartitions(HiveParserASTNode ast) throws SemanticException { - ShowPartitionsDesc showPartsDesc; + // Get the partition specs from the tree + private List> getPartitionSpecs(CommonTree ast) { + List> partSpecs = new ArrayList<>(); + // get partition metadata if partition specified + for (int childIndex = 0; childIndex < ast.getChildCount(); childIndex++) { + HiveParserASTNode partSpecNode = (HiveParserASTNode) ast.getChild(childIndex); + // sanity check + if (partSpecNode.getType() == HiveASTParser.TOK_PARTSPEC) { + Map partSpec = getPartSpec(partSpecNode); + partSpecs.add(partSpec); + } + } + return partSpecs; + } + + private Operation convertShowPartitions(HiveParserASTNode ast) { String tableName = HiveParserBaseSemanticAnalyzer.getUnescapedName( (HiveParserASTNode) ast.getChild(0)); @@ -1222,25 +1542,19 @@ private Serializable analyzeShowPartitions(HiveParserASTNode ast) throws Semanti partSpec = partSpecs.get(0); } - validateTable(tableName, null); - - showPartsDesc = new ShowPartitionsDesc(tableName, getResFile(), partSpec); - return new DDLWork(getInputs(), getOutputs(), showPartsDesc); + ObjectIdentifier tableIdentifier = parseObjectIdentifier(tableName); + CatalogPartitionSpec spec = null; + if (partSpec != null && !partSpec.isEmpty()) { + spec = new CatalogPartitionSpec(new HashMap<>(partSpec)); + } + return new ShowPartitionsOperation(tableIdentifier, spec); } - private Serializable analyzeShowDatabases(HiveParserASTNode ast) { - ShowDatabasesDesc showDatabasesDesc; - if (ast.getChildCount() == 1) { - String databasePattern = - HiveParserBaseSemanticAnalyzer.unescapeSQLString(ast.getChild(0).getText()); - showDatabasesDesc = new ShowDatabasesDesc(getResFile(), databasePattern); - } else { - showDatabasesDesc = new ShowDatabasesDesc(getResFile()); - } - return new DDLWork(getInputs(), getOutputs(), showDatabasesDesc); + private Operation convertShowDatabases() { + return new ShowDatabasesOperation(); } - private Serializable analyzeShowTables(HiveParserASTNode ast, boolean expectView) { + private Operation convertShowTables(HiveParserASTNode ast, boolean expectView) { String dbName = currentDB; String pattern = null; @@ -1276,7 +1590,7 @@ private Serializable analyzeShowTables(HiveParserASTNode ast, boolean expectView if (pattern != null) { handleUnsupportedOperation("SHOW TABLES/VIEWS LIKE is not supported"); } - return new HiveParserShowTablesDesc(null, dbName, expectView); + return expectView ? new ShowViewsOperation() : new ShowTablesOperation(); } /** @@ -1285,59 +1599,31 @@ private Serializable analyzeShowTables(HiveParserASTNode ast, boolean expectView * * @param ast The parsed command tree. */ - private Serializable analyzeShowFunctions(HiveParserASTNode ast) { - ShowFunctionsDesc showFuncsDesc; - if (ast.getChildCount() == 1) { - String funcNames = - HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText()); - showFuncsDesc = new ShowFunctionsDesc(getResFile(), funcNames); - } else if (ast.getChildCount() == 2) { + private Operation convertShowFunctions(HiveParserASTNode ast) { + if (ast.getChildCount() == 2) { assert (ast.getChild(0).getType() == HiveASTParser.KW_LIKE); throw new ValidationException("SHOW FUNCTIONS LIKE is not supported yet"); - } else { - showFuncsDesc = new ShowFunctionsDesc(getResFile()); } - return new DDLWork(getInputs(), getOutputs(), showFuncsDesc); + return new ShowFunctionsOperation(); } - /** - * Add the task according to the parsed command tree. This is used for the CLI command "DESCRIBE - * FUNCTION;". - * - * @param ast The parsed command tree. - */ - private Serializable analyzeDescFunction(HiveParserASTNode ast) { - String funcName; - boolean isExtended; - - if (ast.getChildCount() == 1) { - funcName = HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText()); - isExtended = false; - } else if (ast.getChildCount() == 2) { - funcName = HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText()); - isExtended = true; - } else { - throw new ValidationException("Unexpected Tokens at DESCRIBE FUNCTION"); - } - - DescFunctionDesc descFuncDesc = new DescFunctionDesc(getResFile(), funcName, isExtended); - return new DDLWork(getInputs(), getOutputs(), descFuncDesc); - } - - private Serializable analyzeAlterTableRename( - String[] source, HiveParserASTNode ast, boolean expectView) throws SemanticException { + private Operation convertAlterTableRename( + String sourceName, HiveParserASTNode ast, boolean expectView) throws SemanticException { String[] target = HiveParserBaseSemanticAnalyzer.getQualifiedTableName( (HiveParserASTNode) ast.getChild(0)); - String sourceName = HiveParserBaseSemanticAnalyzer.getDotName(source); String targetName = HiveParserBaseSemanticAnalyzer.getDotName(target); + ObjectIdentifier objectIdentifier = parseObjectIdentifier(sourceName); - return HiveParserAlterTableDesc.rename(sourceName, targetName, expectView); + return expectView + ? new AlterViewRenameOperation(objectIdentifier, parseObjectIdentifier(targetName)) + : new AlterTableRenameOperation( + objectIdentifier, parseObjectIdentifier(targetName)); } - private Serializable analyzeAlterTableRenameCol( - String[] qualified, HiveParserASTNode ast, HashMap partSpec) + private Operation convertAlterTableChangeCol( + CatalogBaseTable alteredTable, String[] qualified, HiveParserASTNode ast) throws SemanticException { String newComment = null; boolean first = false; @@ -1388,25 +1674,39 @@ private Serializable analyzeAlterTableRenameCol( } String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); - return HiveParserAlterTableDesc.changeColumn( - tblName, - HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName), - HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName), - newType, - newComment, - first, - flagCol, - isCascade); + + ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName); + CatalogTable oldTable = (CatalogTable) alteredTable; + String oldName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName); + String newName = HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName); + + if (oldTable.getPartitionKeys().contains(oldName)) { + // disallow changing partition columns + throw new ValidationException("CHANGE COLUMN cannot be applied to partition columns"); + } + TableSchema oldSchema = oldTable.getSchema(); + TableColumn newTableColumn = + TableColumn.physical( + newName, + HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(newType))); + TableSchema newSchema = + OperationConverterUtils.changeColumn( + oldSchema, oldName, newTableColumn, first, flagCol); + Map props = new HashMap<>(oldTable.getOptions()); + props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name()); + if (isCascade) { + props.put(ALTER_COL_CASCADE, "true"); + } + return new AlterTableSchemaOperation( + tableIdentifier, + new CatalogTableImpl( + newSchema, oldTable.getPartitionKeys(), props, oldTable.getComment())); } - private Serializable analyzeAlterTableModifyCols( - String[] qualified, - HiveParserASTNode ast, - HashMap partSpec, - boolean replace) + private Operation convertAlterTableModifyCols( + CatalogBaseTable alteredTable, String tblName, HiveParserASTNode ast, boolean replace) throws SemanticException { - String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified); List newCols = HiveParserBaseSemanticAnalyzer.getColumns((HiveParserASTNode) ast.getChild(0)); boolean isCascade = false; @@ -1414,11 +1714,65 @@ private Serializable analyzeAlterTableModifyCols( isCascade = true; } - return HiveParserAlterTableDesc.addReplaceColumns(tblName, newCols, replace, isCascade); + ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName); + CatalogTable oldTable = (CatalogTable) alteredTable; + + // prepare properties + Map props = new HashMap<>(oldTable.getOptions()); + props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name()); + if (isCascade) { + props.put(ALTER_COL_CASCADE, "true"); + } + TableSchema oldSchema = oldTable.getSchema(); + final int numPartCol = oldTable.getPartitionKeys().size(); + TableSchema.Builder builder = TableSchema.builder(); + // add existing non-part col if we're not replacing + if (!replace) { + List nonPartCols = + oldSchema.getTableColumns().subList(0, oldSchema.getFieldCount() - numPartCol); + for (TableColumn column : nonPartCols) { + builder.add(column); + } + setWatermarkAndPK(builder, oldSchema); + } + // add new cols + for (FieldSchema col : newCols) { + builder.add( + TableColumn.physical( + col.getName(), + HiveTypeUtil.toFlinkType( + TypeInfoUtils.getTypeInfoFromTypeString(col.getType())))); + } + // add part cols + List partCols = + oldSchema + .getTableColumns() + .subList(oldSchema.getFieldCount() - numPartCol, oldSchema.getFieldCount()); + for (TableColumn column : partCols) { + builder.add(column); + } + return new AlterTableSchemaOperation( + tableIdentifier, + new CatalogTableImpl( + builder.build(), + oldTable.getPartitionKeys(), + props, + oldTable.getComment())); + } + + private static void setWatermarkAndPK(TableSchema.Builder builder, TableSchema schema) { + for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) { + builder.watermark(watermarkSpec); + } + schema.getPrimaryKey() + .ifPresent( + pk -> { + builder.primaryKey( + pk.getName(), pk.getColumns().toArray(new String[0])); + }); } - private Serializable analyzeAlterTableDropParts( - String[] qualified, HiveParserASTNode ast, boolean expectView) { + private Operation convertAlterTableDropParts(String[] qualified, HiveParserASTNode ast) { boolean ifExists = ast.getFirstChildWithType(HiveASTParser.TOK_IFEXISTS) != null; // If the drop has to fail on non-existent partitions, we cannot batch expressions. @@ -1439,45 +1793,53 @@ private Serializable analyzeAlterTableDropParts( } } - validateAlterTableType(tab, AlterTableDesc.AlterTableTypes.DROPPARTITION, expectView); + validateAlterTableType(tab); - return new DropPartitionDesc(qualified[0], qualified[1], partSpecs, ifExists); + ObjectIdentifier tableIdentifier = + catalogManager.qualifyIdentifier( + UnresolvedIdentifier.of(qualified[0], qualified[1])); + List specs = + partSpecs.stream().map(CatalogPartitionSpec::new).collect(Collectors.toList()); + return new DropPartitionsOperation(tableIdentifier, ifExists, specs); } /** * Add one or more partitions to a table. Useful when the data has been copied to the right * location by some other process. */ - private Serializable analyzeAlterTableAddParts( - String[] qualified, CommonTree ast, boolean expectView) throws SemanticException { - + private Operation convertAlterTableAddParts(String[] qualified, CommonTree ast) { // ^(TOK_ALTERTABLE_ADDPARTS identifier ifNotExists? // alterStatementSuffixAddPartitionsElement+) boolean ifNotExists = ast.getChild(0).getType() == HiveASTParser.TOK_IFNOTEXISTS; Table tab = getTable(new ObjectPath(qualified[0], qualified[1])); boolean isView = tab.isView(); - validateAlterTableType(tab, AlterTableDesc.AlterTableTypes.ADDPARTITION, expectView); + validateAlterTableType(tab); int numCh = ast.getChildCount(); int start = ifNotExists ? 1 : 0; String currentLocation = null; - Map currentPart = null; + Map currentPartSpec = null; // Parser has done some verification, so the order of tokens doesn't need to be verified // here. - AddPartitionDesc addPartitionDesc = - new AddPartitionDesc(tab.getDbName(), tab.getTableName(), ifNotExists); + List specs = new ArrayList<>(); + List partitions = new ArrayList<>(); for (int num = start; num < numCh; num++) { HiveParserASTNode child = (HiveParserASTNode) ast.getChild(num); switch (child.getToken().getType()) { case HiveASTParser.TOK_PARTSPEC: - if (currentPart != null) { - addPartitionDesc.addPartition(currentPart, currentLocation); + if (currentPartSpec != null) { + specs.add(new CatalogPartitionSpec(currentPartSpec)); + Map props = new HashMap<>(); + if (currentLocation != null) { + props.put(TABLE_LOCATION_URI, currentLocation); + } + partitions.add(new CatalogPartitionImpl(props, null)); currentLocation = null; } - currentPart = getPartSpec(child); - validatePartitionValues(currentPart); // validate reserved values + currentPartSpec = getPartSpec(child); + validatePartitionValues(currentPartSpec); // validate reserved values break; case HiveASTParser.TOK_PARTITIONLOCATION: // if location specified, set in partition @@ -1494,26 +1856,92 @@ private Serializable analyzeAlterTableAddParts( } // add the last one - if (currentPart != null) { - addPartitionDesc.addPartition(currentPart, currentLocation); + if (currentPartSpec != null) { + specs.add(new CatalogPartitionSpec(currentPartSpec)); + Map props = new HashMap<>(); + if (currentLocation != null) { + props.put(TABLE_LOCATION_URI, currentLocation); + } + partitions.add(new CatalogPartitionImpl(props, null)); } - return new DDLWork(getInputs(), getOutputs(), addPartitionDesc); + ObjectIdentifier tableIdentifier = + tab.getDbName() == null + ? parseObjectIdentifier(tab.getTableName()) + : catalogManager.qualifyIdentifier( + UnresolvedIdentifier.of(tab.getDbName(), tab.getTableName())); + return new AddPartitionsOperation(tableIdentifier, ifNotExists, specs, partitions); } - // Get the partition specs from the tree - private List> getPartitionSpecs(CommonTree ast) { - List> partSpecs = new ArrayList<>(); - // get partition metadata if partition specified - for (int childIndex = 0; childIndex < ast.getChildCount(); childIndex++) { - HiveParserASTNode partSpecNode = (HiveParserASTNode) ast.getChild(childIndex); - // sanity check - if (partSpecNode.getType() == HiveASTParser.TOK_PARTSPEC) { - Map partSpec = getPartSpec(partSpecNode); - partSpecs.add(partSpec); + private Operation convertAlterViewProps( + CatalogBaseTable oldBaseTable, String tableName, Map newProps) { + ObjectIdentifier viewIdentifier = parseObjectIdentifier(tableName); + CatalogView oldView = (CatalogView) oldBaseTable; + Map props = new HashMap<>(oldView.getOptions()); + props.putAll(newProps); + CatalogView newView = + new CatalogViewImpl( + oldView.getOriginalQuery(), + oldView.getExpandedQuery(), + oldView.getSchema(), + props, + oldView.getComment()); + return new AlterViewPropertiesOperation(viewIdentifier, newView); + } + + private CatalogBaseTable getAlteredTable(String tableName, boolean expectView) { + ObjectIdentifier objectIdentifier = parseObjectIdentifier(tableName); + CatalogBaseTable catalogBaseTable = getCatalogBaseTable(objectIdentifier); + if (expectView) { + if (catalogBaseTable instanceof CatalogTable) { + throw new ValidationException("ALTER VIEW for a table is not allowed"); + } + } else { + if (catalogBaseTable instanceof CatalogView) { + throw new ValidationException("ALTER TABLE for a view is not allowed"); } } - return partSpecs; + return catalogBaseTable; + } + + private ObjectIdentifier parseObjectIdentifier(String compoundName) { + UnresolvedIdentifier unresolvedIdentifier = hiveParser.parseIdentifier(compoundName); + return catalogManager.qualifyIdentifier(unresolvedIdentifier); + } + + private CatalogDatabase getDatabase(String databaseName) { + Catalog catalog = catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get(); + CatalogDatabase database; + try { + database = catalog.getDatabase(databaseName); + } catch (DatabaseNotExistException e) { + throw new ValidationException(String.format("Database %s not exists", databaseName), e); + } + return database; + } + + private CatalogBaseTable getCatalogBaseTable(ObjectIdentifier tableIdentifier) { + return getCatalogBaseTable(tableIdentifier, false); + } + + private CatalogBaseTable getCatalogBaseTable( + ObjectIdentifier tableIdentifier, boolean ifExists) { + Optional optionalCatalogTable = + catalogManager.getTable(tableIdentifier); + if (!optionalCatalogTable.isPresent()) { + if (ifExists) { + return null; + } else { + throw new ValidationException( + String.format( + "Table or View %s doesn't exist.", tableIdentifier.toString())); + } + } + if (optionalCatalogTable.get().isTemporary()) { + throw new ValidationException( + String.format("Table or View %s is temporary.", tableIdentifier.toString())); + } + return optionalCatalogTable.get().getTable(); } /** @@ -1562,10 +1990,6 @@ private Map addDefaultProperties(Map tblProp) { return retValue; } - private Path getResFile() { - return SessionState.getLocalSessionPath(conf); - } - private static void handleUnsupportedOperation(HiveParserASTNode astNode) { throw new ValidationException( null, new UnsupportedOperationException("Unsupported operation: " + astNode));