Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Verifier] Wrap checksum queries with partition predicate for reused output table #22965

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions presto-verifier/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,27 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-hive</artifactId>
<exclusions>
<exclusion>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-cache</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-hive-common</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-hive-metastore</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
Expand Down Expand Up @@ -198,6 +214,11 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.verifier.checksum;

import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.GroupBy;
import com.facebook.presto.sql.tree.GroupingElement;
Expand Down Expand Up @@ -52,17 +53,17 @@ public ChecksumValidator(Map<Category, Provider<ColumnValidator>> columnValidato
this.columnValidators = columnValidators;
}

public Query generateChecksumQuery(QualifiedName tableName, List<Column> columns)
public Query generateChecksumQuery(QualifiedName tableName, List<Column> columns, Optional<Expression> partitionPredicate)
{
ImmutableList.Builder<SelectItem> selectItems = ImmutableList.builder();
selectItems.add(new SingleColumn(new FunctionCall(QualifiedName.of("count"), ImmutableList.of())));
for (Column column : columns) {
selectItems.addAll(columnValidators.get(column.getCategory()).get().generateChecksumColumns(column));
}
return simpleQuery(new Select(false, selectItems.build()), new Table(tableName));
return simpleQuery(new Select(false, selectItems.build()), new Table(tableName), partitionPredicate, Optional.empty());
}

public Query generatePartitionChecksumQuery(QualifiedName tableName, List<Column> dataColumns, List<Column> partitionColumns)
public Query generatePartitionChecksumQuery(QualifiedName tableName, List<Column> dataColumns, List<Column> partitionColumns, Optional<Expression> partitionPredicate)
{
ImmutableList.Builder<SelectItem> selectItems = ImmutableList.builder();
selectItems.add(new SingleColumn(new FunctionCall(QualifiedName.of("count"), ImmutableList.of())));
Expand All @@ -79,15 +80,15 @@ public Query generatePartitionChecksumQuery(QualifiedName tableName, List<Column
return simpleQuery(
new Select(false, selectItems.build()),
new Table(tableName),
Optional.empty(),
partitionPredicate,
Optional.of(new GroupBy(false, groupByList.build())),
Optional.empty(),
Optional.of(new OrderBy(orderByList.build())),
Optional.empty(),
Optional.empty());
}

public Query generateBucketChecksumQuery(QualifiedName tableName, List<Column> partitionColumns, List<Column> dataColumns)
public Query generateBucketChecksumQuery(QualifiedName tableName, List<Column> partitionColumns, List<Column> dataColumns, Optional<Expression> partitionPredicate)
{
ImmutableList.Builder<SelectItem> selectItems = ImmutableList.builder();
selectItems.add(new SingleColumn(new FunctionCall(QualifiedName.of("count"), ImmutableList.of())));
Expand All @@ -106,7 +107,7 @@ public Query generateBucketChecksumQuery(QualifiedName tableName, List<Column> p
return simpleQuery(
new Select(false, selectItems.build()),
new Table(tableName),
Optional.empty(),
partitionPredicate,
Optional.of(new GroupBy(false, groupByList.build())),
Optional.empty(),
Optional.of(new OrderBy(orderByList.build())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ public DataMatchResult verify(
ChecksumQueryContext testChecksumQueryContext)
{
List<Column> testColumns = getColumns(getHelperAction(), typeManager, test.getObjectName());
Query testChecksumQuery = checksumValidator.generateChecksumQuery(test.getObjectName(), testColumns);
Query testChecksumQuery = checksumValidator.generateChecksumQuery(test.getObjectName(), testColumns, test.getPartitionsPredicate());
testChecksumQueryContext.setChecksumQuery(formatSql(testChecksumQuery));

List<Column> controlColumns = null;
ChecksumResult controlChecksumResult = null;

if (isControlEnabled()) {
controlColumns = getColumns(getHelperAction(), typeManager, control.getObjectName());
Query controlChecksumQuery = checksumValidator.generateChecksumQuery(control.getObjectName(), controlColumns);
Query controlChecksumQuery = checksumValidator.generateChecksumQuery(control.getObjectName(), controlColumns, control.getPartitionsPredicate());
controlChecksumQueryContext.setChecksumQuery(formatSql(controlChecksumQuery));

QueryResult<ChecksumResult> controlChecksum = callAndConsume(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult co
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::setQueryId));

// Run checksum query
Query checksumQuery = checksumValidator.generateChecksumQuery(queryBundle.getObjectName(), columns);
Query checksumQuery = checksumValidator.generateChecksumQuery(queryBundle.getObjectName(), columns, Optional.empty());
ChecksumResult testChecksum = getOnlyElement(callAndConsume(
() -> prestoAction.execute(checksumQuery, DETERMINISM_ANALYSIS_CHECKSUM, ChecksumResult::fromResultSet),
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(run::setChecksumQueryId)).getResults());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private List<ChecksumResult> runPartitionChecksum(
ChecksumQueryContext checksumQueryContext,
QueryStage queryStage)
{
Query partitionChecksumQuery = checksumValidator.generatePartitionChecksumQuery(bundle.getObjectName(), dataColumns, partitionColumns);
Query partitionChecksumQuery = checksumValidator.generatePartitionChecksumQuery(bundle.getObjectName(), dataColumns, partitionColumns, bundle.getPartitionsPredicate());
checksumQueryContext.setPartitionChecksumQuery(formatSql(partitionChecksumQuery));
return callAndConsume(
() -> getHelperAction().execute(partitionChecksumQuery, queryStage, ChecksumResult::fromResultSet),
Expand All @@ -287,7 +287,7 @@ private List<ChecksumResult> runBucketChecksum(
ChecksumQueryContext checksumQueryContext,
QueryStage queryStage)
{
Query bucketChecksumQuery = checksumValidator.generateBucketChecksumQuery(bundle.getObjectName(), partitionColumns, dataColumns);
Query bucketChecksumQuery = checksumValidator.generateBucketChecksumQuery(bundle.getObjectName(), partitionColumns, dataColumns, bundle.getPartitionsPredicate());
List<ChecksumResult> checksumResults = callAndConsume(
() -> getHelperAction().execute(bucketChecksumQuery, queryStage, ChecksumResult::fromResultSet),
stats -> stats.getQueryStats().map(QueryStats::getQueryId).ifPresent(checksumQueryContext::setBucketChecksumQueryId)).getResults();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.verifier.framework;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.jdbi.v3.core.mapper.reflect.ColumnName;
import org.jdbi.v3.core.mapper.reflect.JdbiConstructor;
Expand All @@ -37,6 +38,7 @@ public class QueryConfiguration
private final Optional<String> password;
private final Map<String, String> sessionProperties;
private final boolean isReusableTable;
private final List<String> partitions;

@JdbiConstructor
public QueryConfiguration(
Expand All @@ -45,9 +47,10 @@ public QueryConfiguration(
@ColumnName("username") Optional<String> username,
@ColumnName("password") Optional<String> password,
@ColumnName("session_properties") Optional<Map<String, String>> sessionProperties,
@ColumnName("client_tags") Optional<List<String>> clientTags)
@ColumnName("client_tags") Optional<List<String>> clientTags,
@ColumnName("partitions") Optional<List<String>> partitions)
{
this(catalog, schema, username, password, sessionProperties, clientTags.filter(tags -> tags.contains(CLIENT_TAG_OUTPUT_RETAINED)).isPresent());
this(catalog, schema, username, password, sessionProperties, clientTags.filter(tags -> tags.contains(CLIENT_TAG_OUTPUT_RETAINED)).isPresent(), partitions);
}

public QueryConfiguration(
Expand All @@ -56,14 +59,16 @@ public QueryConfiguration(
Optional<String> username,
Optional<String> password,
Optional<Map<String, String>> sessionProperties,
boolean isReusableTable)
boolean isReusableTable,
Optional<List<String>> partitions)
{
this.catalog = requireNonNull(catalog, "catalog is null");
this.schema = requireNonNull(schema, "schema is null");
this.username = requireNonNull(username, "username is null");
this.password = requireNonNull(password, "password is null");
this.sessionProperties = ImmutableMap.copyOf(sessionProperties.orElse(ImmutableMap.of()));
this.isReusableTable = isReusableTable;
this.partitions = ImmutableList.copyOf(partitions.orElse(ImmutableList.of()));
}

public QueryConfiguration applyOverrides(QueryConfigurationOverrides overrides)
Expand All @@ -85,7 +90,8 @@ public QueryConfiguration applyOverrides(QueryConfigurationOverrides overrides)
Optional.ofNullable(overrides.getUsernameOverride().orElse(username.orElse(null))),
Optional.ofNullable(overrides.getPasswordOverride().orElse(password.orElse(null))),
Optional.of(sessionProperties),
isReusableTable);
isReusableTable,
Optional.of(partitions));
}

public String getCatalog()
Expand Down Expand Up @@ -118,6 +124,11 @@ public boolean isReusableTable()
return isReusableTable;
}

public List<String> getPartitions()
{
return partitions;
}

@Override
public boolean equals(Object obj)
{
Expand All @@ -133,13 +144,14 @@ public boolean equals(Object obj)
Objects.equals(username, o.username) &&
Objects.equals(password, o.password) &&
Objects.equals(sessionProperties, o.sessionProperties) &&
isReusableTable == o.isReusableTable;
isReusableTable == o.isReusableTable &&
Objects.equals(partitions, partitions);
}

@Override
public int hashCode()
{
return Objects.hash(catalog, schema, username, password, sessionProperties, isReusableTable);
return Objects.hash(catalog, schema, username, password, sessionProperties, isReusableTable, partitions);
}

@Override
Expand All @@ -152,6 +164,7 @@ public String toString()
.add("password", password)
.add("sessionProperties", sessionProperties)
.add("isReusableTable", isReusableTable)
.add("partitions", partitions)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.verifier.framework;

import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.QualifiedName;
import com.facebook.presto.sql.tree.Statement;

Expand All @@ -25,6 +26,7 @@ public class QueryObjectBundle
extends QueryBundle
{
private final QualifiedName objectName;
private final Optional<Expression> partitionsPredicate;
private final boolean reuseTable;
private final Optional<String> rewrittenFunctionCalls;

Expand All @@ -35,11 +37,13 @@ public QueryObjectBundle(
List<Statement> teardownQueries,
ClusterType cluster,
Optional<String> rewrittenFunctionCalls,
Optional<Expression> partitionsPredicate,
boolean reuseTable)
{
super(setupQueries, query, teardownQueries, cluster);
this.objectName = requireNonNull(objectName, "objectName is null");
this.rewrittenFunctionCalls = requireNonNull(rewrittenFunctionCalls, "rewrittenFunctionCalls is null");
this.partitionsPredicate = partitionsPredicate;
this.reuseTable = reuseTable;
}

Expand All @@ -57,4 +61,9 @@ public boolean isReuseTable()
{
return reuseTable;
}

public Optional<Expression> getPartitionsPredicate()
{
return partitionsPredicate;
}
}
Loading
Loading