Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete syntax #13674

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/multi-stage-query/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,15 @@ PARTITIONED BY HOUR
CLUSTERED BY page
```

## DELETE rows based on a condition

<details><summary>Show the query</summary>
```sql
DELETE FROM w000
WHERE country = 'New Zealand'
PARTITIONED BY HOUR
CLUSTERED BY page
```
</details>

## SELECT with EXTERN and JOIN
Expand Down
2 changes: 2 additions & 0 deletions sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ data: {
"DruidSqlInsertEof()"
"DruidSqlExplain()"
"DruidSqlReplaceEof()"
"DruidSqlDeleteEof()"
]

# List of methods for parsing custom literals.
Expand Down Expand Up @@ -441,6 +442,7 @@ data: {
# "dataTypeParserMethods".
implementationFiles: [
"common.ftl"
"delete.ftl"
"insert.ftl"
"explain.ftl"
"replace.ftl"
Expand Down
74 changes: 74 additions & 0 deletions sql/src/main/codegen/includes/delete.ftl
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Taken from syntax of SqlInsert statement from calcite parser, edited for delete syntax
SqlNode DruidSqlDeleteEof() :
{
SqlNode table;
SqlNode deleteCondition = null;
final Span s;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNodeList clusteredBy = null;
}
{
<DELETE> { s = span(); }
<FROM>
table = CompoundIdentifier()
[
<WHERE> deleteCondition = Expression(ExprContext.ACCEPT_SUB_QUERY)
{ s.add(deleteCondition); }
]
[
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this syntax is changing a bit in this PR: #13686

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I'm looking forward to the catalog changes, I think this would make the delete syntax (and all the others actually) a lot more "SQL-like" since we can drop the partitioned by as a necessary part of the query.

]
[
<CLUSTERED> <BY>
clusteredBy = ClusterItems()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need clustering for delete? What does it mean to cluster deletions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since in the backend, Delete triggers a reingest of the datasource, the above is the clustering for that. I think this would be improved with the catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If our code that does delete needs partitioning, then that code is, I would assert, broken and needs to be fixed. Let's not add SQL syntax exposes partitioning that isn't actually needed.

Let's imagine some cases. I create segments with 1-hour partitioning. Then, I create more segments, in the same time period, with 1 day partitioning. Now, I realize I messed up and want to delete this entire group of segments. Quick: which partitioning do I use? Hour? Or Day? Or, do I need different delete tasks, one for the hour segments, one for Day? Is this the user experience we want?

Often, just writing down the proposed syntax, and thinking through the user experience, can save us lots of back-and-forth at the level of individual lines of code.

So, I propose we either fix the need for partitioning in the reingest, or we give it a fake partitioning. Remember: there is no new data to partition.

]
{
// In the future, if the existing partitioning and clustering could be infered, these parameters could be made optional.
if (clusteredBy != null && partitionedBy.lhs == null) {
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
if (clusteredBy != null) {
s.addAll(clusteredBy);
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been
// validated and throw SQL syntax errors before performing validations in the DruidSqlReplace which can overshadow the
// actual error message.
<EOF>
{
if (deleteCondition == null) {
throw new ParseException("WHERE clause must be present in DELETE statements. (To delete all rows, WHERE TRUE must be explicitly specified.)");
}
return DruidSqlDelete.create(
s.pos(),
table,
deleteCondition,
partitionedBy.lhs,
partitionedBy.rhs,
clusteredBy
);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.parser;

import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;

/**
* Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY
* This class extends the {@link DruidSqlIngest} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing.
*/
public class DruidSqlDelete extends DruidSqlIngest
{
public static final SqlOperator OPERATOR = new SqlSpecialOperator("DELETE", SqlKind.OTHER);

final SqlNode deleteCondition;

public static DruidSqlDelete create(
SqlParserPos pos,
SqlNode targetTable,
SqlNode deleteCondition,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
)
{
SqlBasicCall invertedDeleteCondition = new SqlBasicCall(SqlStdOperatorTable.NOT, new SqlNode[] {deleteCondition}, pos);
SqlSelect undeletedRowsSelect = new SqlSelect(
pos,
SqlNodeList.EMPTY,
new SqlNodeList(Collections.singleton(SqlIdentifier.star(pos)), pos),
targetTable,
invertedDeleteCondition,
null,
null,
null,
null,
null,
null
);
return new DruidSqlDelete(pos, targetTable, undeletedRowsSelect, partitionedBy, partitionedByStringForUnparse, clusteredBy, deleteCondition);
}

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlDelete(
SqlParserPos pos,
SqlNode targetTable,
SqlNode undeletedRowsSelect,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy,
SqlNode deleteCondition
)
{
super(
pos,
SqlNodeList.EMPTY,
targetTable,
undeletedRowsSelect,
null,
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
this.deleteCondition = deleteCondition;
}

@Nonnull
@Override
public SqlOperator getOperator()
{
return OPERATOR;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
writer.sep("DELETE FROM");
final int opLeft = getOperator().getLeftPrec();
final int opRight = getOperator().getRightPrec();
getTargetTable().unparse(writer, opLeft, opRight);

writer.newlineAndIndent();
writer.sep("WHERE");
deleteCondition.unparse(writer, 0, 0);

writer.newlineAndIndent();
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);

writer.newlineAndIndent();
if (getClusteredBy() != null) {
writer.keyword("CLUSTERED BY");
SqlWriter.Frame frame = writer.startList("", "");
for (SqlNode clusterByOpts : getClusteredBy().getList()) {
clusterByOpts.unparse(writer, leftPrec, rightPrec);
}
writer.endList(frame);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.parser.DruidSqlDelete;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.run.SqlEngine;
Expand Down Expand Up @@ -159,6 +160,8 @@ private SqlStatementHandler createHandler(final SqlNode node) throws ValidationE
return new IngestHandler.InsertHandler(handlerContext, (DruidSqlInsert) query, explain);
} else if (query instanceof DruidSqlReplace) {
return new IngestHandler.ReplaceHandler(handlerContext, (DruidSqlReplace) query, explain);
} else if (query instanceof DruidSqlDelete) {
return new IngestHandler.DeleteHandler(handlerContext, (DruidSqlDelete) query, explain);
paul-rogers marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlDelete;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
Expand Down Expand Up @@ -350,4 +351,49 @@ public ExplainAttributes explainAttributes()
);
}
}

/**
* Handler for the DELETE statement.
*/
protected static class DeleteHandler extends IngestHandler
{
private final DruidSqlDelete sqlNode;

public DeleteHandler(
SqlStatementHandler.HandlerContext handlerContext,
DruidSqlDelete sqlNode,
SqlExplain explain
) throws ValidationException
{
super(
handlerContext,
sqlNode,
convertQuery(sqlNode),
explain
);
this.sqlNode = sqlNode;
}

@Override
protected DruidSqlIngest ingestNode()
{
return sqlNode;
}

@Override
public void validate() throws ValidationException
{
if (!handlerContext.plannerContext().featureAvailable(EngineFeature.CAN_REPLACE)) {
throw new ValidationException(StringUtils.format(
"Cannot DELETE with SQL engine '%s'.",
handlerContext.engine().name())
);
}
super.validate();
handlerContext.queryContextMap().put(
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
DruidSqlParserUtils.ALL
);
}
}
}
Loading