Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
karuppayya committed Jul 5, 2023
1 parent 268d9fe commit 8c4652f
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
*/
package org.apache.iceberg.metrics;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class InMemoryReadMetricReporter implements MetricsReporter {

private ScanReport metricsReport;
private MetricsReport metricsReport;

@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
this.metricsReport = (ScanReport) report;
}
this.metricsReport = (ScanReport) report;
}

public ScanReport scanReport() {
return metricsReport;
Preconditions.checkArgument(
metricsReport instanceof ScanReport, "Metric report is not a scan report");
return (ScanReport) metricsReport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask>
SparkReadConf readConf,
Schema expectedSchema,
List<Expression> filters,
Supplier<ScanReport> metricsReportSupplier) {
super(spark, table, scan, readConf, expectedSchema, filters, metricsReportSupplier);
Supplier<ScanReport> scanReportSupplier) {
super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier);

this.snapshotId = readConf.snapshotId();
this.startSnapshotId = readConf.startSnapshotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
Schema expectedSchema,
List<Expression> filters,
Supplier<ScanReport> metricsReportSupplier) {
this.metricsReportSupplier = metricsReportSupplier;
Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch());
SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, expectedSchema);

Expand All @@ -97,6 +96,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters : Collections.emptyList();
this.branch = readConf.branch();
this.metricsReportSupplier = metricsReportSupplier;
}

protected Table table() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@

import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadConf;
Expand All @@ -41,12 +39,8 @@ class SparkStagedScan extends SparkScan {

private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of tasks

SparkStagedScan(
SparkSession spark,
Table table,
SparkReadConf readConf,
Supplier<ScanReport> scanReportSupplier) {
super(spark, table, readConf, table.schema(), ImmutableList.of(), scanReportSupplier);
SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) {
super(spark, table, readConf, table.schema(), ImmutableList.of(), null);

this.taskSetId = readConf.scanTaskSetId();
this.splitSize = readConf.splitSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.iceberg.spark.source;

import org.apache.iceberg.Table;
import org.apache.iceberg.metrics.InMemoryReadMetricReporter;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
Expand All @@ -40,7 +39,6 @@ class SparkStagedScanBuilder implements ScanBuilder {

@Override
public Scan build() {
return new SparkStagedScan(
spark, table, readConf, (new InMemoryReadMetricReporter())::scanReport);
return new SparkStagedScan(spark, table, readConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.iceberg.spark.source.metrics;

import java.util.Arrays;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomSumMetric;

public class TotalFileSize implements CustomMetric {
public class TotalFileSize extends CustomSumMetric {

static final String NAME = "totalFileSize";

Expand All @@ -34,9 +33,4 @@ public String name() {
public String description() {
return "total file size";
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return String.valueOf(Arrays.stream(taskMetrics).sum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.apache.iceberg.spark.source.metrics;

import java.util.Arrays;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.metric.CustomSumMetric;

public class TotalPlanningDuration implements CustomMetric {
public class TotalPlanningDuration extends CustomSumMetric {

static final String NAME = "totalPlanningDuration";

Expand All @@ -34,9 +33,4 @@ public String name() {
public String description() {
return "total planning duration";
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return String.valueOf(Arrays.stream(taskMetrics).sum());
}
}

0 comments on commit 8c4652f

Please sign in to comment.