Skip to content

Commit

Permalink
[FLINK-22657][Connectors/Hive] HiveParserDDLSemanticAnalyzer directly…
Browse files Browse the repository at this point in the history
… return operations

This closes #16416
  • Loading branch information
luoyuxia authored and lirui-apache committed Jul 28, 2021
1 parent 5c5e353 commit c3088af
Show file tree
Hide file tree
Showing 16 changed files with 1,033 additions and 2,420 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
Expand All @@ -42,10 +39,8 @@
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserContext;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState;
import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.parse.CalciteParser;
Expand All @@ -60,15 +55,13 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Timestamp;
Expand Down Expand Up @@ -232,42 +225,15 @@ private List<Operation> processCmd(
HiveParserQueryState queryState = new HiveParserQueryState(hiveConf);
HiveParserDDLSemanticAnalyzer ddlAnalyzer =
new HiveParserDDLSemanticAnalyzer(
queryState, hiveCatalog, getCatalogManager().getCurrentDatabase());
Serializable work = ddlAnalyzer.analyzeInternal(node);
DDLOperationConverter ddlConverter =
new DDLOperationConverter(this, getCatalogManager(), hiveShim);
if (work instanceof HiveParserCreateViewDesc) {
// analyze and expand the view query
analyzeCreateView(
(HiveParserCreateViewDesc) work, context, queryState, hiveShim);
} else if (work instanceof CreateTableASDesc) {
// analyze the query
CreateTableASDesc ctasDesc = (CreateTableASDesc) work;
HiveParserCalcitePlanner calcitePlanner =
createCalcitePlanner(context, queryState, hiveShim);
calcitePlanner.setCtasDesc(ctasDesc);
RelNode queryRelNode = calcitePlanner.genLogicalPlan(ctasDesc.getQuery());
// create a table to represent the dest table
HiveParserCreateTableDesc createTableDesc = ctasDesc.getCreateTableDesc();
String[] dbTblName = createTableDesc.getCompoundName().split("\\.");
Table destTable = new Table(Table.getEmptyTable(dbTblName[0], dbTblName[1]));
destTable.getSd().setCols(createTableDesc.getCols());
// create the insert operation
CatalogSinkModifyOperation insertOperation =
dmlHelper.createInsertOperation(
queryRelNode,
destTable,
Collections.emptyMap(),
Collections.emptyList(),
false);
CreateTableOperation createTableOperation =
(CreateTableOperation)
ddlConverter.convert(
((CreateTableASDesc) work).getCreateTableDesc());
return Collections.singletonList(
new CreateTableASOperation(createTableOperation, insertOperation));
}
return Collections.singletonList(ddlConverter.convert(work));
queryState,
hiveCatalog,
getCatalogManager(),
this,
hiveShim,
context,
dmlHelper);
operation = ddlAnalyzer.convertToOperation(node);
return Collections.singletonList(operation);
} else {
final boolean explain = node.getType() == HiveASTParser.TOK_EXPLAIN;
// first child is the underlying explicandum
Expand All @@ -291,7 +257,7 @@ private List<Operation> processCmd(
}
}

private HiveParserCalcitePlanner createCalcitePlanner(
public HiveParserCalcitePlanner createCalcitePlanner(
HiveParserContext context, HiveParserQueryState queryState, HiveShim hiveShim)
throws SemanticException {
HiveParserCalcitePlanner calciteAnalyzer =
Expand All @@ -307,16 +273,16 @@ private HiveParserCalcitePlanner createCalcitePlanner(
return calciteAnalyzer;
}

private void analyzeCreateView(
HiveParserCreateViewDesc desc,
public void analyzeCreateView(
HiveParserCreateViewInfo createViewInfo,
HiveParserContext context,
HiveParserQueryState queryState,
HiveShim hiveShim)
throws SemanticException {
HiveParserCalcitePlanner calciteAnalyzer =
createCalcitePlanner(context, queryState, hiveShim);
calciteAnalyzer.setCreateViewDesc(desc);
calciteAnalyzer.genLogicalPlan(desc.getQuery());
calciteAnalyzer.setCreatViewInfo(createViewInfo);
calciteAnalyzer.genLogicalPlan(createViewInfo.getQuery());
}

private Operation analyzeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeCheckCtx;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeConverter;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserWindowingSpec;
import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution;
Expand Down Expand Up @@ -197,8 +196,8 @@ public class HiveParserCalcitePlanner {
// this will be used in HiveParserRexNodeConverter to create cor var
private int subqueryId = 0;

private HiveParserCreateViewDesc createViewDesc;
private CreateTableASDesc ctasDesc;
private HiveParserCreateViewInfo createViewInfo;
private List<FieldSchema> ctasCols;

public HiveParserCalcitePlanner(
HiveParserQueryState queryState,
Expand All @@ -224,15 +223,15 @@ public HiveParserCalcitePlanner(
cluster, frameworkConfig.getOperatorTable(), catalogReader.nameMatcher());
}

public void setCtasDesc(CreateTableASDesc ctasDesc) {
this.ctasDesc = ctasDesc;
public void setCtasCols(List<FieldSchema> ctasCols) {
this.ctasCols = ctasCols;
}

public void setCreateViewDesc(HiveParserCreateViewDesc createViewDesc) {
if (createViewDesc != null) {
public void setCreatViewInfo(HiveParserCreateViewInfo createViewInfo) {
if (createViewInfo != null) {
semanticAnalyzer.unparseTranslator.enable();
}
this.createViewDesc = createViewDesc;
this.createViewInfo = createViewInfo;
}

public void initCtx(HiveParserContext context) {
Expand Down Expand Up @@ -283,22 +282,22 @@ private RelNode logicalPlan() {

try {
RelNode plan = genLogicalPlan(getQB(), true, null, null);
if (createViewDesc != null) {
if (createViewInfo != null) {
semanticAnalyzer.resultSchema =
HiveParserUtils.convertRowSchemaToResultSetSchema(
relToRowResolver.get(plan), false);
HiveParserUtils.saveViewDefinition(
semanticAnalyzer.resultSchema,
createViewDesc,
createViewInfo,
semanticAnalyzer.ctx.getTokenRewriteStream(),
semanticAnalyzer.unparseTranslator,
semanticAnalyzer.getConf());
} else if (ctasDesc != null) {
} else if (ctasCols != null) {
// CTAS doesn't allow specifying col list, so we set it according to result schema
semanticAnalyzer.resultSchema =
HiveParserUtils.convertRowSchemaToResultSetSchema(
relToRowResolver.get(plan), false);
ctasDesc.getCreateTableDesc().getCols().addAll(semanticAnalyzer.resultSchema);
ctasCols.addAll(semanticAnalyzer.resultSchema);
}
return plan;
} catch (SemanticException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeCheckCtx;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserTypeConverter;
import org.apache.flink.table.planner.delegation.hive.copy.HiveParserUnparseTranslator;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction;
Expand Down Expand Up @@ -1263,7 +1263,7 @@ public static List<FieldSchema> convertRowSchemaToResultSetSchema(

public static void saveViewDefinition(
List<FieldSchema> resultSchema,
HiveParserCreateViewDesc desc,
HiveParserCreateViewInfo createViewInfo,
TokenRewriteStream tokenRewriteStream,
HiveParserUnparseTranslator unparseTranslator,
HiveConf conf)
Expand All @@ -1273,23 +1273,24 @@ public static void saveViewDefinition(
List<FieldSchema> derivedSchema = new ArrayList<>(resultSchema);
ParseUtils.validateColumnNameUniqueness(derivedSchema);

List<FieldSchema> imposedSchema = desc.getSchema();
List<FieldSchema> imposedSchema = createViewInfo.getSchema();
if (imposedSchema != null) {
int explicitColCount = imposedSchema.size();
int derivedColCount = derivedSchema.size();
if (explicitColCount != derivedColCount) {
throw new SemanticException(
generateErrorMessage(desc.getQuery(), ErrorMsg.VIEW_COL_MISMATCH.getMsg()));
generateErrorMessage(
createViewInfo.getQuery(), ErrorMsg.VIEW_COL_MISMATCH.getMsg()));
}
}

// Preserve the original view definition as specified by the user.
if (desc.getOriginalText() == null) {
if (createViewInfo.getOriginalText() == null) {
String originalText =
tokenRewriteStream.toString(
desc.getQuery().getTokenStartIndex(),
desc.getQuery().getTokenStopIndex());
desc.setOriginalText(originalText);
createViewInfo.getQuery().getTokenStartIndex(),
createViewInfo.getQuery().getTokenStopIndex());
createViewInfo.setOriginalText(originalText);
}

// Now expand the view definition with extras such as explicit column
Expand All @@ -1298,7 +1299,8 @@ public static void saveViewDefinition(
unparseTranslator.applyTranslations(tokenRewriteStream);
String expandedText =
tokenRewriteStream.toString(
desc.getQuery().getTokenStartIndex(), desc.getQuery().getTokenStopIndex());
createViewInfo.getQuery().getTokenStartIndex(),
createViewInfo.getQuery().getTokenStopIndex());

if (imposedSchema != null) {
// Merge the names from the imposed schema into the types
Expand Down Expand Up @@ -1332,15 +1334,15 @@ public static void saveViewDefinition(
sb.append(" FROM (");
sb.append(expandedText);
sb.append(") ");
sb.append(HiveUtils.unparseIdentifier(desc.getCompoundName(), conf));
sb.append(HiveUtils.unparseIdentifier(createViewInfo.getCompoundName(), conf));
expandedText = sb.toString();
}

desc.setSchema(derivedSchema);
if (!desc.isMaterialized()) {
createViewInfo.setSchema(derivedSchema);
if (!createViewInfo.isMaterialized()) {
// materialized views don't store the expanded text as they won't be rewritten at query
// time.
desc.setExpandedText(expandedText);
createViewInfo.setExpandedText(expandedText);
}
}

Expand Down
Loading

0 comments on commit c3088af

Please sign in to comment.