Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compacting small files for Hive tables #9398

Closed
wants to merge 42 commits into from

Conversation

losipiuk
Copy link
Member

@losipiuk losipiuk commented Sep 27, 2021

POC PR: High level review comments. No nit-picking at this phase please.

The PR adds support for ALTER TABLE execute syntax.

On top of that, it adds support for compacting small files for non-transactional, non-bucketed Hive tables.

ALTER TABLE xxxxx EXECUTE compact_small_files WITH(file_size_threshold = ...)

Fixes #9466

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial thoughts.

Sorry for a bunch of low-levels too

}
}

Scope tableScope = analyze(table, scope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to visitTable(table, scope)?
seem this will re-resolve table again. can we reusable tableScope (relationtype) creation without doing that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think we could if recorded in the scope what was the type of analyzed relation (view/materialized view/table). But it does not seem we are recording that information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We record that in io.trino.sql.analyzer.Analysis#registerTable

Would be good to reuse visitTable logic, since there is a lot going on here: tables, views, materialized views, redirections. Masks and filters -- we could pull them from Analysis too.

}
node.getWhere().ifPresent(where -> analyzeWhere(node, tableScope, where));

// analyze ORDER BY
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORDER BY is nice because it allows us to tap into distributed sort when rewriting data
however, plan order by may be too limiting

  • total ordering may be not required, sorting subsets of data can be sufficient (per file, per certain amount of data, grouped execution)
  • different ordering schemes (eg z-order) may come useful. we should think how we will model them (an expression?)

Copy link
Member Author

@losipiuk losipiuk Sep 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not used so far. And for local ordering (e.g for Z-ordering) we can express intention via WITH parameters.
e.g

WITH (z_order_columns = ARRAY['a', 'b'])

Given that maybe we should drop support for ORDER BY for now?

Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slowly making my way through the commits, but had a question. Have you thought about how these procedures will interact with Access Control?

// TODO maybe refactor AbstractPropertyManager and use as a base so there is less code copied
public class TableProceduresPropertyManager
{
private final ConcurrentMap<Key, Map<String, PropertyMetadata<?>>> connectorProperties = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be Concurrent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is modelled after AbstractPropertyManager. I think theoretically the map can be modified by multiple threads in parallel as connectors are registered/unregistered. Not sure if that is really the case now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that is really the case now.

it's currently not.
the connectors are loaded serially during server startup, but it's conceivable that this becomes parallel

@losipiuk
Copy link
Member Author

losipiuk commented Oct 4, 2021

Slowly making my way through the commits, but had a question. Have you thought about how these procedures will interact with Access Control?

Great question. I left it for later and totally forgot about that. I guess the most straightforward approach would be to add
checkCanExecuteTableProcedure(SecurityContext context, QualifiedObjectName tableName, QualifiedObjectName procedureName) to AccessControl. I will try to add that in.

@losipiuk
Copy link
Member Author

losipiuk commented Oct 4, 2021

Slowly making my way through the commits, but had a question. Have you thought about how these procedures will interact with Access Control?

Great question. I left it for later and totally forgot about that. I guess the most straightforward approach would be to add checkCanExecuteTableProcedure(SecurityContext context, QualifiedObjectName tableName, QualifiedObjectName procedureName) to AccessControl. I will try to add that in.

Added "Add access control for table procedures" commit.

@losipiuk
Copy link
Member Author

losipiuk commented Oct 4, 2021

@findepi, @electrum, @martint would great to get a review from one of you guys on this one :)

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to "Add support for table procedures SPI calls to Metadata"

// TODO maybe refactor AbstractPropertyManager and use as a base so there is less code copied
public class TableProceduresPropertyManager
{
private final ConcurrentMap<Key, Map<String, PropertyMetadata<?>>> connectorProperties = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that is really the case now.

it's currently not.
the connectors are loaded serially during server startup, but it's conceivable that this becomes parallel

import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

// TODO maybe refactor AbstractPropertyManager and use as a base so there is less code copied
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like what you need is to make AbstractPropertyManager<K> (K - key)

and add public method in subclasses that would convert API to internal key

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The use would be somewhat less-nice. But it will work. Do you want me to update PR towards that?

return properties.build();
}

private Object evaluatePropertyValue(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be static and shared with AbstractPropertyManager

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Add parser/analyzer support for ALTER TABLE ... EXECUTE"

}
}

Scope tableScope = analyze(table, scope);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We record that in io.trino.sql.analyzer.Analysis#registerTable

Would be good to reuse visitTable logic, since there is a lot going on here: tables, views, materialized views, redirections. Masks and filters -- we could pull them from Analysis too.

@Override
protected Void visitTableExecute(TableExecute node, Integer indent)
{
builder.append("ALTER TABLE ");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you ignore indent (since it must be 0 here), add checkArgument(indent==0,"...")

same for others, preexistings (separate pr)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is true now. But this is not obvious to me that it must always be true.
E.g. EXPLAIN ALTER ... does not bump indent. But it could do so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Add access control for table procedures"

@losipiuk
Copy link
Member Author

losipiuk commented Oct 5, 2021

I sent out first batch of fixups. Let me know if I prefer to keep it this way for a while. Or should I squash those in?

@losipiuk losipiuk force-pushed the lo/distributed-dml-1 branch 3 times, most recently from eae2195 to ec74a7f Compare October 5, 2021 16:02
// todo: would be great to call getTableExecuteSplitsInfo if we are executing plan which requires that.
TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stage.getStageId().getQueryId());
Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
tableExecuteSplitsInfo.ifPresent(tableExecuteContext::setSplitsInfo);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing it here seems wrong as it looks like we can have multiple SourcePartitionedSchedulers per single query. E.g. those can be created via FixedSourcePartitionedScheduler. Need a rework.
cc: @findepi

@losipiuk losipiuk force-pushed the lo/distributed-dml-1 branch 2 times, most recently from 39c46e2 to dd93cc4 Compare October 7, 2021 19:36
Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"fixups"

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"fixups"

@@ -1034,6 +1034,8 @@ protected Scope visitTableExecute(TableExecute node, Optional<Scope> scope)
analysis.getParameters());
analysis.setTableExecuteProperties(tableProperties);

analysis.setUpdateType("EXECUTE", tableName, Optional.empty(), Optional.empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used for https://trino.io/docs/current/sql/execute.html ?
can the two be mistaken?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe EXECUTE you mentioned classifies as an update-statement. And in code we are not passing "EXECUTE" ever to io.trino.execution.QueryStateMachine#setUpdateType.
I can put something else here (not sure how much it matters), but EXECUTE was matching what we do for other ALTER ... statements. E.g. for ALTER TABLE ... ADD COLUMN we put ADD COLUMN as update type.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Add planner logic for TableExecute statement"

lgtm, however you may want to consult this particular commit with @kasiafi too

Comment on lines +887 to +841
// TODO support broader range.
throw new TrinoException(NOT_SUPPORTED, "only predicates expressible as TupleDomain can be used with ALTER TABLE ... EXECUTE");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure about it. First, this is gonna change (eg #7994), but it's OK to support less. Correctness first.
However, expressability with a TD doesn't guarantee predicate will be subsumed. For example, Hive connector doesn't consume TD over non-partition columns.
Thus it feels the check here, at this stage, doesn't guarantee anything and we need a different check in a different place. So i hope we can remove the check here.

Copy link
Member Author

@losipiuk losipiuk Oct 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - that is fair point that even with check here we depend on the connector, that it actually consumes the predicate instead of ignoring it. And if it cannot for some reason it should throw an exception.
I am not sure if we can provide any engine-side validation if connector behaves up to contract to be honest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you kept the predicate in the form of Expression through the Optimizer phase, some optimizations could be applied and transform a not supported predicate into a supported one.

Copy link
Member Author

@losipiuk losipiuk Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Though then I would need to structure SPI very differently. The getTableHandleForExecute would not take the constraint parameter and we would depend on applyFilter to do the job. Yet then we would need to have some mechanism (validation optimizer rule?) to verify that at the end there is no filter between TableScanNode and TableWriterNode if TableScanNode if we are in executing ALTER TABLE EXECUTE.

It feels doable, though more complex and I am not sure if we are getting the true benefit, given the fact that the condition passed in the WHERE clause will most probably be simplistic (conjunction of range predicates?).

@kasiafi do you feel strongly about that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would depend on applyFilter to do the job. Yet then we would need to have some mechanism (validation optimizer rule?) to verify that at the end there is no filter

If I understand correctly, we still need some validation that the whole constraint is consumed? Maybe it would be good to use existing mechanisms for pushing predicate / handling the non-accepted part?

However, if the Constraint is built here, I was thinking if this validation could go to the Analyzer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, we still need some validation that the whole constraint is consumed

We cannot really validate that. The contract is that connector should throw exception if the constraint cannot be consumed fully. We can discuss if contract is nice when it comes to SPI shape, but we cannot do anything if connector does not obey it (does not consume, and does not throw). If I change the approach to depend on applyFilter we still cannot do any validation. If connector does not consume predicate, yet it returns empty remainingFilter in ConstraintApplicationResult the engin will not know.

Maybe it would be good to use existing mechanisms for pushing predicate / handling the non-accepted part

It feels to me it would be a bit nicer. And (as you said) handle more predicate shapes. On the other hand the logic of single procedure would be even more spread around the codebase, and harder to follow. I would start with proposed appraoch and refactor as a followup if we decide it is worth it.

However, if the Constraint is built here, I was thinking if this validation could go to the Analyzer

Not sure I fully understand what you suggest here. Let's chat on slack.

assignments.build(),
false,
Optional.empty());
RelationPlan tableScanPlan = new RelationPlan(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use RelationPlanner to process the table and get the RelationPlan? The above mostly duplicates the code of RelationPlanner.visitTable().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not that straightforward. visitTable in RelationPlanner would take TableHandle to be used with TableScanNode from analysis. This is not what I want.
I want to use TableHandle from TableExecuteHandle.sourceTableHandle. Any suggestions how should i proceed?

  • I guess I can modify analysis object I have before calling out to RelationPlanner (or rather create new modified one base on the we got on call to LogicalPlanner.planStatement).
  • The other option would be to create a public helper RelationPlanner.planTableWithHandle(Table table, TableHandle handle) and make it share code with RelationPlanner.visitTable.

Leaning towards latter. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ... current code is actually messed up.
I am using TableExecuteHandle.sourceTableHandle in the TableScanOperator but I am using ColumnHandles taken from analysis; and the two may not be compatible.

        for (Field field : scope.getRelationType().getAllFields()) {
            Symbol symbol = symbolAllocator.newSymbol(field);

            outputSymbolsBuilder.add(symbol);
            assignments.put(symbol, analysis.getColumn(field));
        }

I need to wrap my head around it :/

Copy link
Member Author

@losipiuk losipiuk Oct 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ... current code is actually messed up.

Or maybe that is not a problem. We have pieces of code already when we change TableHandle in TSO but still use old ColumnHandles (e.g. after applyFilter).

Another question: Is this fine to assume here that order of symbols in plan I got for planning TS for Table matches the order of ColumnHandles i got from ConnectorTableMetadata?

Comment on lines +887 to +841
// TODO support broader range.
throw new TrinoException(NOT_SUPPORTED, "only predicates expressible as TupleDomain can be used with ALTER TABLE ... EXECUTE");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you kept the predicate in the form of Expression through the Optimizer phase, some optimizations could be applied and transform a not supported predicate into a supported one.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Pass splits info to TableFinish operator for ALTER TABLE EXECUTE"

*/
default void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments)
default void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle, Collection<Slice> fragments, List<Object> tableExecuteState)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since io.trino.spi.connector.ConnectorSplitSource#getTableExecuteSplitsInfo returns Optional, i think this should be Optional<List> too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current contract is that for TableExecute flow ConnectorSplitSource must return non-empty optional here. Hence starting from TableExecuteContext we do not have any Optionals. I think this is simpler this way.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Add tableExecuteSplitsInfo to FixedSplitSource"

Add support for compacting small files for non-transactional,
non-bucketed Hive tables.

ALTER TABLE xxxxx EXECUTE OPTIMIZE WITH(file_size_threshold = ...)
if (!someDeleted) {
throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Error while deleting ", e);
}
log.error(e, "Error while deleting data files in FINISH phase of OPTIMIZE; remaining files need to be deleted manually: " + tableExecuteState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwing is bad and not throwing isn't great either. this is irrecoverable by us and requires user intervention

i think throwing is a better idea than logging, as it at least ensures problem is surfaced to the person invoking the procedure (query)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I very much would prefer to throw. Let me see. Maybe we can throw and still skip cleanup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL now (last 2 commits)

@losipiuk
Copy link
Member Author

Replaced with: #9665

@losipiuk losipiuk closed this Oct 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Support distributed table-scoped connector-provided procedures
4 participants