Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete syntax #13674

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ data: {
"DruidSqlInsertEof()"
"DruidSqlExplain()"
"DruidSqlReplaceEof()"
"DruidSqlDeleteEof()"
]

# List of methods for parsing custom literals.
Expand Down Expand Up @@ -440,6 +441,7 @@ data: {
# "dataTypeParserMethods".
implementationFiles: [
"common.ftl"
"delete.ftl"
"insert.ftl"
"explain.ftl"
"replace.ftl"
Expand Down
69 changes: 69 additions & 0 deletions sql/src/main/codegen/includes/delete.ftl
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Taken from syntax of SqlInsert statement from calcite parser, edited for replace syntax
SqlNode DruidSqlDeleteEof() :
{
SqlNode table;
SqlNode source = null;
final Span s;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNodeList clusteredBy = null;
}
{
<DELETE> { s = span(); }
<FROM>
table = CompoundIdentifier()
[
<WHERE> source = Expression(ExprContext.ACCEPT_SUB_QUERY)
]
[
<PARTITIONED> <BY>
partitionedBy = PartitionGranularity()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this syntax is changing a bit in this PR: #13686

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I'm looking forward to the catalog changes, I think this would make the delete syntax (and all the others actually) a lot more "SQL-like" since we can drop the partitioned by as a necessary part of the query.

]
[
<CLUSTERED> <BY>
clusteredBy = ClusterItems()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need clustering for delete? What does it mean to cluster deletions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since in the backend, Delete triggers a reingest of the datasource, the above is the clustering for that. I think this would be improved with the catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If our code that does delete needs partitioning, then that code is, I would assert, broken and needs to be fixed. Let's not add SQL syntax exposes partitioning that isn't actually needed.

Let's imagine some cases. I create segments with 1-hour partitioning. Then, I create more segments, in the same time period, with 1 day partitioning. Now, I realize I messed up and want to delete this entire group of segments. Quick: which partitioning do I use? Hour? Or Day? Or, do I need different delete tasks, one for the hour segments, one for Day? Is this the user experience we want?

Often, just writing down the proposed syntax, and thinking through the user experience, can save us lots of back-and-forth at the level of individual lines of code.

So, I propose we either fix the need for partitioning in the reingest, or we give it a fake partitioning. Remember: there is no new data to partition.

]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
// The reason for adding EOF here is to ensure that we create a DruidSqlReplace node after the syntax has been
// validated and throw SQL syntax errors before performing validations in the DruidSqlReplace which can overshadow the
// actual error message.
<EOF>
{
if (source == null) {
throw new ParseException("WHERE clause must be present in DELETE statements. (To delete all rows, WHERE TRUE must be explicitly specified.)");
}
return DruidSqlDelete.create(
s.end(source),
table,
source,
partitionedBy.lhs,
partitionedBy.rhs,
clusteredBy
);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.parser;

import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;

public class DruidSqlDelete extends DruidSqlIngest
{
public static final SqlOperator OPERATOR = new SqlSpecialOperator("DELETE", SqlKind.OTHER);

public static DruidSqlDelete create(
SqlParserPos pos,
SqlNode targetTable,
SqlNode source,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
)
{
SqlBasicCall invertedSource = new SqlBasicCall(SqlStdOperatorTable.NOT, new SqlNode[] {source}, pos);
SqlSelect sqlSelect = new SqlSelect(
pos,
SqlNodeList.EMPTY,
new SqlNodeList(Collections.singleton(SqlIdentifier.star(pos)), pos),
targetTable,
invertedSource,
null,
null,
null,
null,
null,
null
);
return new DruidSqlDelete(pos, targetTable, sqlSelect, partitionedBy, partitionedByStringForUnparse, clusteredBy);
}

public DruidSqlDelete(SqlParserPos pos,
SqlNode targetTable,
SqlNode source,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
)
{
super(
pos,
SqlNodeList.EMPTY,
targetTable,
source,
null,
partitionedBy,
partitionedByStringForUnparse,
clusteredBy
);
}

@Nonnull
@Override
public SqlOperator getOperator()
{
return OPERATOR;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
{
super.unparse(writer, leftPrec, rightPrec);
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
if (getClusteredBy() != null) {
writer.keyword("CLUSTERED BY");
SqlWriter.Frame frame = writer.startList("", "");
for (SqlNode clusterByOpts : getClusteredBy().getList()) {
clusterByOpts.unparse(writer, leftPrec, rightPrec);
}
writer.endList(frame);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.parser.DruidSqlDelete;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.run.SqlEngine;
Expand Down Expand Up @@ -158,6 +159,8 @@ private SqlStatementHandler createHandler(final SqlNode node) throws ValidationE
return new IngestHandler.InsertHandler(handlerContext, (DruidSqlInsert) query, explain);
} else if (query instanceof DruidSqlReplace) {
return new IngestHandler.ReplaceHandler(handlerContext, (DruidSqlReplace) query, explain);
} else if (query instanceof DruidSqlDelete) {
return new IngestHandler.DeleteHandler(handlerContext, (DruidSqlDelete) query, explain);
paul-rogers marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlDelete;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
Expand Down Expand Up @@ -344,4 +345,55 @@ public void validate() throws ValidationException
}
}
}

/**
* Handler for the DELETE statement.
*/
protected static class DeleteHandler extends IngestHandler
{
private final DruidSqlDelete sqlNode;

public DeleteHandler(
SqlStatementHandler.HandlerContext handlerContext,
DruidSqlDelete sqlNode,
SqlExplain explain
) throws ValidationException
{
super(
handlerContext,
sqlNode,
convertQuery(sqlNode),
explain
);
this.sqlNode = sqlNode;
}

@Override
public SqlNode sqlNode()
{
return sqlNode;
}

@Override
protected DruidSqlIngest ingestNode()
{
return sqlNode;
}

@Override
public void validate() throws ValidationException
{
if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_REPLACE)) {
throw new ValidationException(StringUtils.format(
"Cannot execute REPLACE with SQL engine '%s'.",
adarshsanjeev marked this conversation as resolved.
Show resolved Hide resolved
handlerContext.engine().name())
);
}
super.validate();
handlerContext.queryContextMap().put(
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
DruidSqlParserUtils.ALL
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite;

import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.junit.Test;

import java.util.Collections;

public class CalciteDeleteDmlTest extends CalciteIngestionDmlTest
{
@Test
public void testDeleteFromTableWithCondition()
{
testIngestionQuery()
.sql("DELETE FROM foo WHERE dim1 = 'New' PARTITIONED BY ALL TIME")
.expectTarget("foo", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource("foo")
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(new NotDimFilter(new SelectorDimFilter("dim1", "New", null)))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();
}

@Test
public void testDeleteFromTableWithoutWhereClause()
{
testIngestionQuery()
.sql("DELETE FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
"WHERE clause must be present in DELETE statements. (To delete all rows, WHERE TRUE must be explicitly specified.)"
)
.verify();
}

@Test
public void testDeleteAllFromTable()
{
testIngestionQuery()
.sql("DELETE FROM foo WHERE TRUE PARTITIONED BY ALL TIME")
.expectTarget("foo", FOO_TABLE_SIGNATURE)
.expectResources(dataSourceRead("foo"), dataSourceWrite("foo"))
.expectQuery(
newScanQueryBuilder()
.dataSource(InlineDataSource.fromIterable(Collections.emptyList(), FOO_TABLE_SIGNATURE))
.intervals(querySegmentSpec(Filtration.eternity()))
.filters(null)
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.context(REPLACE_ALL_TIME_CHUNKS)
.build()
)
.verify();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.druid.sql.calcite.external.InlineOperatorConversion;
import org.apache.druid.sql.calcite.external.LocalOperatorConversion;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.CalciteTests;
Expand All @@ -75,6 +77,13 @@ public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
.put(QueryContexts.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
.build();

protected static final Map<String, Object> REPLACE_ALL_TIME_CHUNKS = ImmutableMap.of(
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
"{\"type\":\"all\"}",
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
DruidSqlParserUtils.ALL
);

protected static final RowSignature FOO_TABLE_SIGNATURE =
RowSignature.builder()
.addTimeColumn()
Expand Down
Loading