Skip to content

Commit

Permalink
[Core] support scan metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Oct 25, 2023
1 parent d010f2d commit c825b34
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
Expand All @@ -51,6 +53,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand Down Expand Up @@ -80,6 +83,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private ManifestCacheFilter manifestCacheFilter = null;
private final Integer scanManifestParallelism;

private ScanMetrics scanMetrics = null;

public AbstractFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketKeyFilter,
Expand Down Expand Up @@ -188,6 +193,12 @@ public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter)
return this;
}

@Override
public FileStoreScan withMetrics(ScanMetrics metrics) {
this.scanMetrics = metrics;
return this;
}

@Override
public Plan plan() {

Expand Down Expand Up @@ -223,6 +234,7 @@ public List<ManifestEntry> files() {

private Pair<Snapshot, List<ManifestEntry>> doPlan(
Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
long started = System.nanoTime();
List<ManifestFileMeta> manifests = specifiedManifests;
Snapshot snapshot = null;
if (manifests == null) {
Expand All @@ -237,6 +249,9 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
}
}

long startDataFiles =
manifests.stream().mapToLong(f -> f.numAddedFiles() + f.numDeletedFiles()).sum();

Iterable<ManifestEntry> entries =
ParallellyExecuteUtils.parallelismBatchIterable(
files ->
Expand All @@ -248,8 +263,12 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
manifests,
scanManifestParallelism);

long skippedByPartitionAndStats =
startDataFiles - StreamSupport.stream(entries.spliterator(), false).count();

List<ManifestEntry> files = new ArrayList<>();
for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
Collection<ManifestEntry> mergedEntries = ManifestEntry.mergeEntries(entries);
for (ManifestEntry file : mergedEntries) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
Expand Down Expand Up @@ -278,6 +297,8 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
}
}

long afterBucketFilter = files.size();
long skippedByBucketAndLevelFilter = mergedEntries.size() - files.size();
// We group files by bucket here, and filter them by the whole bucket filter.
// Why do this: because in primary key table, we can't just filter the value
// by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
Expand All @@ -296,6 +317,18 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
.flatMap(Collection::stream)
.collect(Collectors.toList());

long skippedByWholeBucketFiles = afterBucketFilter - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
if (scanMetrics != null) {
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
skippedByPartitionAndStats,
skippedByBucketAndLevelFilter,
skippedByWholeBucketFiles,
files.size()));
}
return Pair.of(snapshot, files);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.Filter;
Expand Down Expand Up @@ -62,6 +63,8 @@ public interface FileStoreScan {

FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);

FileStoreScan withMetrics(ScanMetrics metrics);

/** Produce a {@link Plan}. */
Plan plan();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.metrics;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.MetricGroup;
import org.apache.paimon.metrics.MetricRegistry;

/** Metrics to measure scan operation. */
public class ScanMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
@VisibleForTesting protected static final String GROUP_NAME = "scan";
private final MetricGroup metricGroup;

public ScanMetrics(MetricRegistry registry, String tableName) {
this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
registerGenericScanMetrics();
}

@VisibleForTesting
public MetricGroup getMetricGroup() {
return metricGroup;
}

private Histogram durationHistogram;

private ScanStats latestScan;

@VisibleForTesting static final String LAST_SCAN_DURATION = "lastScanDuration";
@VisibleForTesting static final String SCAN_DURATION = "scanDuration";
@VisibleForTesting static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";

@VisibleForTesting
static final String LAST_SKIPPED_BY_PARTITION_AND_STATS = "lastSkippedByPartitionAndStats";

@VisibleForTesting
static final String LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER =
"lastSkippedByBucketAndLevelFilter";

@VisibleForTesting
static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER =
"lastSkippedByWholeBucketFilesFilter";

@VisibleForTesting
static final String LAST_SCAN_SKIPPED_TABLE_FILES = "lastScanSkippedTableFiles";

@VisibleForTesting
static final String LAST_SCAN_RESULTED_TABLE_FILES = "lastScanResultedTableFiles";

private void registerGenericScanMetrics() {
metricGroup.gauge(
LAST_SCAN_DURATION, () -> latestScan == null ? 0L : latestScan.getDuration());
durationHistogram = metricGroup.histogram(SCAN_DURATION, HISTOGRAM_WINDOW_SIZE);
metricGroup.gauge(
LAST_SCANNED_MANIFESTS,
() -> latestScan == null ? 0L : latestScan.getScannedManifests());
metricGroup.gauge(
LAST_SKIPPED_BY_PARTITION_AND_STATS,
() -> latestScan == null ? 0L : latestScan.getSkippedByPartitionAndStats());
metricGroup.gauge(
LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER,
() -> latestScan == null ? 0L : latestScan.getSkippedByBucketAndLevelFilter());
metricGroup.gauge(
LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER,
() -> latestScan == null ? 0L : latestScan.getSkippedByWholeBucketFiles());
metricGroup.gauge(
LAST_SCAN_SKIPPED_TABLE_FILES,
() -> latestScan == null ? 0L : latestScan.getSkippedTableFiles());
metricGroup.gauge(
LAST_SCAN_RESULTED_TABLE_FILES,
() -> latestScan == null ? 0L : latestScan.getResultedTableFiles());
}

public void reportScan(ScanStats scanStats) {
latestScan = scanStats;
durationHistogram.update(scanStats.getDuration());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation.metrics;

import org.apache.paimon.annotation.VisibleForTesting;

/** Statistics for a scan operation. */
public class ScanStats {
private final long duration;
private final long scannedManifests;
private final long skippedByPartitionAndStats;
private final long skippedByBucketAndLevelFilter;

private final long skippedByWholeBucketFiles;
private final long skippedTableFiles;
private final long resultedTableFiles;

public ScanStats(
long duration,
long scannedManifests,
long skippedByPartitionAndStats,
long skippedByBucketAndLevelFilter,
long skippedByWholeBucketFiles,
long resultedTableFiles) {
this.duration = duration;
this.scannedManifests = scannedManifests;
this.skippedByPartitionAndStats = skippedByPartitionAndStats;
this.skippedByBucketAndLevelFilter = skippedByBucketAndLevelFilter;
this.skippedByWholeBucketFiles = skippedByWholeBucketFiles;
this.skippedTableFiles =
skippedByPartitionAndStats
+ skippedByBucketAndLevelFilter
+ skippedByWholeBucketFiles;
this.resultedTableFiles = resultedTableFiles;
}

@VisibleForTesting
protected long getScannedManifests() {
return scannedManifests;
}

@VisibleForTesting
protected long getSkippedTableFiles() {
return skippedTableFiles;
}

@VisibleForTesting
protected long getResultedTableFiles() {
return resultedTableFiles;
}

@VisibleForTesting
protected long getSkippedByPartitionAndStats() {
return skippedByPartitionAndStats;
}

@VisibleForTesting
protected long getSkippedByBucketAndLevelFilter() {
return skippedByBucketAndLevelFilter;
}

@VisibleForTesting
protected long getSkippedByWholeBucketFiles() {
return skippedByWholeBucketFiles;
}

@VisibleForTesting
protected long getDuration() {
return duration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public SnapshotReader newSnapshotReader() {
snapshotManager(),
splitGenerator(),
nonPartitionFilterConsumer(),
DefaultValueAssigner.create(tableSchema));
DefaultValueAssigner.create(tableSchema),
name());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
Expand Down Expand Up @@ -57,6 +58,8 @@ public interface SnapshotReader {

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

SnapshotReader withMetricRegistry(MetricRegistry registry);

/** Get splits plan from snapshot. */
Plan read();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -72,14 +74,18 @@ public class SnapshotReaderImpl implements SnapshotReader {
private ScanMode scanMode = ScanMode.ALL;
private RecordComparator lazyPartitionComparator;

private final String tableName;

public SnapshotReaderImpl(
FileStoreScan scan,
TableSchema tableSchema,
CoreOptions options,
SnapshotManager snapshotManager,
SplitGenerator splitGenerator,
BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
DefaultValueAssigner defaultValueAssigner) {
DefaultValueAssigner defaultValueAssigner,
String tableName) {
this.tableName = tableName;
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
Expand Down Expand Up @@ -173,6 +179,12 @@ public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
scan.withMetrics(new ScanMetrics(registry, tableName));
return this;
}

/** Get splits from {@link FileKind#ADD} files. */
@Override
public Plan read() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
Expand Down Expand Up @@ -247,6 +248,12 @@ public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
// won't register metric
return this;
}

@Override
public Plan read() {
return snapshotReader.read();
Expand Down
Loading

0 comments on commit c825b34

Please sign in to comment.