Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend the PARTITIONED BY clause to accept string literals for the time partitioning #15836

Merged
merged 12 commits into from
Feb 9, 2024
27 changes: 27 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,33 @@ The following ISO 8601 periods are supported for `TIME_FLOOR` and the string con
- P3M
- P1Y

The string constant can also include any of the keywords mentioned above:

- `HOUR` - Same as `'PT1H'`
- `DAY` - Same as `'P1D'`
- `MONTH` - Same as `'P1M'`
- `YEAR` - Same as `'P1Y'`
- `ALL TIME`
- `ALL` - Alias for `ALL TIME`

The `WEEK` granularity is deprecated and not supported in MSQ.

Examples:

```SQL
-- Keyword
PARTITIONED BY HOUR

-- String literal
PARTITIONED BY 'HOUR'

-- ISO 8601 period
PARTITIONED BY 'PT1H'

-- TIME_FLOOR function
PARTITIONED BY TIME_FLOOR(__time, 'PT1H')
```

For more information about partitioning, see [Partitioning](concepts.md#partitioning-by-time). <br /><br />
*Avoid partitioning by week, `P1W`, because weeks don't align neatly with months and years, making it difficult to partition by coarser granularities later.

Expand Down
1 change: 1 addition & 0 deletions sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ data: {
"org.apache.calcite.sql.SqlNodeList"
"org.apache.calcite.sql.SqlBasicCall"
"org.apache.druid.java.util.common.granularity.Granularity"
"org.apache.druid.java.util.common.granularity.GranularityType"
"org.apache.druid.java.util.common.granularity.Granularities"
"org.apache.druid.sql.calcite.parser.DruidSqlInsert"
"org.apache.druid.sql.calcite.parser.DruidSqlParserUtils"
Expand Down
30 changes: 12 additions & 18 deletions sql/src/main/codegen/includes/common.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,53 @@
* under the License.
*/

// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
SqlNode PartitionGranularity() :
{
SqlNode e;
Granularity granularity;
String unparseString;
SqlNode result;
}
{
(
<HOUR>
{
granularity = Granularities.HOUR;
unparseString = "HOUR";
result = SqlLiteral.createSymbol(GranularityType.HOUR, getPos());
}
|
<DAY>
{
granularity = Granularities.DAY;
unparseString = "DAY";
result = SqlLiteral.createSymbol(GranularityType.DAY, getPos());
}
|
<MONTH>
{
granularity = Granularities.MONTH;
unparseString = "MONTH";
result = SqlLiteral.createSymbol(GranularityType.MONTH, getPos());
}
|
<YEAR>
{
granularity = Granularities.YEAR;
unparseString = "YEAR";
result = SqlLiteral.createSymbol(GranularityType.YEAR, getPos());
}
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
result = SqlLiteral.createSymbol(GranularityType.ALL, getPos());
}
[
<TIME>
{
unparseString += " TIME";
result = SqlLiteral.createSymbol(GranularityType.ALL, getPos());
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
{
granularity = DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some validation should happen here; I don't see a nice way to do it but I've found this:

result = new SqlLiteral(DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e), SYMBOL, getPos());

which might work (but the typeName must be supplied - other candidate could be: UNKNOWN )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added validation here in another way, let me know if ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - but why not do the conversion here - I think you might also able to do similar thing on all the other branches

result = new SqlLiteral(DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e), SYMBOL, getPos());

you could shortcut the transient String + Symbol stuff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new SqlNode type, let me know if good now.

unparseString = e.toString();
// validate, the return Granularity value is not needed
DruidSqlParserUtils.convertSqlNodeToGranularityThrowingParseExceptions(e);
result = e;
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
return result;
}
}

Expand Down
6 changes: 3 additions & 3 deletions sql/src/main/codegen/includes/insert.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SqlNode DruidSqlInsertEof() :
final SqlNodeList columnList;
final Span s;
final Pair<SqlNodeList, SqlNodeList> p;
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNode partitionedBy = null;
SqlNodeList clusteredBy = null;
String exportFileFormat = null;
}
Expand Down Expand Up @@ -93,7 +93,7 @@ SqlNode DruidSqlInsertEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -112,6 +112,6 @@ SqlNode DruidSqlInsertEof() :
return insertNode;
}
SqlInsert sqlInsert = (SqlInsert) insertNode;
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, exportFileFormat);
return DruidSqlInsert.create(sqlInsert, partitionedBy, clusteredBy, exportFileFormat);
}
}
7 changes: 3 additions & 4 deletions sql/src/main/codegen/includes/replace.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ SqlNode DruidSqlReplaceEof() :
final Span s;
SqlNode tableRef = null;
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlNode partitionedBy = null;
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
Expand Down Expand Up @@ -78,7 +77,7 @@ SqlNode DruidSqlReplaceEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -91,7 +90,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, replaceTimeQuery, exportFileFormat);
return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery, exportFileFormat);
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;
import java.util.List;

/**
* Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
Expand All @@ -37,10 +38,7 @@ public abstract class DruidSqlIngest extends SqlInsert
public static final String SQL_EXPORT_FILE_FORMAT = "__exportFileFormat";

@Nullable
protected final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;
protected final SqlNode partitionedBy;
zachjsh marked this conversation as resolved.
Show resolved Hide resolved

@Nullable
protected final SqlNodeList clusteredBy;
Expand All @@ -53,22 +51,20 @@ public DruidSqlIngest(
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
super(pos, keywords, targetTable, source, columnList);

this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
this.exportFileFormat = exportFileFormat;
}

@Nullable
public Granularity getPartitionedBy()
public SqlNode getPartitionedBy()
{
return partitionedBy;
}
Expand All @@ -84,4 +80,14 @@ public String getExportFileFormat()
{
return exportFileFormat;
}

@Override
public List<SqlNode> getOperandList()
{
return ImmutableNullableList.<SqlNode>builder()
.addAll(super.getOperandList())
.add(partitionedBy)
.add(clusteredBy)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.sql.parser.SqlParserPos;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -41,28 +41,49 @@ public class DruidSqlInsert extends DruidSqlIngest
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;

public static DruidSqlInsert create(
@Nonnull SqlInsert insertNode,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
return new DruidSqlInsert(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
partitionedBy,
clusteredBy,
exportFileFormat
);
}

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* While partitionedBy can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlInsert(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable SqlNode partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
pos,
keywords,
targetTable,
source,
columnList,
partitionedBy,
partitionedByStringForUnparse,
clusteredBy,
exportFileFormat
);
Expand Down Expand Up @@ -95,9 +116,9 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
getSource().unparse(writer, 0, 0);
writer.newlineAndIndent();

if (partitionedByStringForUnparse != null) {
if (getPartitionedBy() != null) {
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
writer.keyword(partitionedBy.toString());
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
}

if (getClusteredBy() != null) {
Expand Down
Loading
Loading