Skip to content

Commit

Permalink
[Core] support scan metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Oct 23, 2023
1 parent dbec0c3 commit 995590b
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics.scan;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.metrics.AbstractMetricGroup;
import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.groups.GenericMetricGroup;

/** Metrics to measure scan operation. */
public class ScanMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
protected static final String GROUP_NAME = "scan";
private final AbstractMetricGroup genericMetricGroup;

public ScanMetrics(String table) {
this.genericMetricGroup = GenericMetricGroup.createGenericMetricGroup(table, GROUP_NAME);
registerGenericScanMetrics();
}

public AbstractMetricGroup getMetricGroup() {
return genericMetricGroup;
}

private final Histogram durationHistogram =
new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE);

private ScanStats latestScan;

@VisibleForTesting static final String LAST_SCAN_DURATION = "lastScanDuration";
@VisibleForTesting static final String SCAN_DURATION = "scanDuration";
@VisibleForTesting static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";

@VisibleForTesting
static final String LAST_SCAN_SKIPPED_TABLE_FILES = "lastScanSkippedTableFiles";

@VisibleForTesting
static final String LAST_SCAN_RESULTED_TABLE_FILES = "lastScanResultedTableFiles";

private void registerGenericScanMetrics() {
genericMetricGroup.gauge(
LAST_SCAN_DURATION, () -> latestScan == null ? 0L : latestScan.getDuration());
genericMetricGroup.histogram(SCAN_DURATION, durationHistogram);
genericMetricGroup.gauge(
LAST_SCANNED_MANIFESTS,
() -> latestScan == null ? 0L : latestScan.getScannedManifests());
genericMetricGroup.gauge(
LAST_SCAN_SKIPPED_TABLE_FILES,
() -> latestScan == null ? 0L : latestScan.getSkippedTableFiles());
genericMetricGroup.gauge(
LAST_SCAN_RESULTED_TABLE_FILES,
() -> latestScan == null ? 0L : latestScan.getResultedTableFiles());
}

public void reportScan(ScanStats scanStats) {
latestScan = scanStats;
durationHistogram.update(scanStats.getDuration());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.metrics.scan;

import org.apache.paimon.annotation.VisibleForTesting;

/** Statistics for a scan operation. */
public class ScanStats {
private final long duration;
private final long scannedManifests;
private final long skippedTableFiles;
private final long resultedTableFiles;

public ScanStats(
long duration, long scannedManifests, long skippedTableFiles, long resultedTableFiles) {
this.duration = duration;
this.scannedManifests = scannedManifests;
this.skippedTableFiles = skippedTableFiles;
this.resultedTableFiles = resultedTableFiles;
}

@VisibleForTesting
protected long getScannedManifests() {
return scannedManifests;
}

@VisibleForTesting
protected long getSkippedTableFiles() {
return skippedTableFiles;
}

@VisibleForTesting
protected long getResultedTableFiles() {
return resultedTableFiles;
}

@VisibleForTesting
protected long getDuration() {
return duration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metrics.scan.ScanMetrics;
import org.apache.paimon.metrics.scan.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
Expand All @@ -40,6 +42,9 @@
import org.apache.paimon.utils.ParallellyExecuteUtils;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
Expand All @@ -51,13 +56,15 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;

/** Default implementation of {@link FileStoreScan}. */
public abstract class AbstractFileStoreScan implements FileStoreScan {

private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreScan.class);
private final FieldStatsArraySerializer partitionStatsConverter;
private final RowType partitionType;
private final SnapshotManager snapshotManager;
Expand All @@ -80,6 +87,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private ManifestCacheFilter manifestCacheFilter = null;
private final Integer scanManifestParallelism;

private ScanMetrics scanMetrics;

public AbstractFileStoreScan(
RowType partitionType,
ScanBucketFilter bucketKeyFilter,
Expand All @@ -90,6 +99,7 @@ public AbstractFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
LOG.info("gjli: create AbstractFileStoreScan here");
this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
Expand All @@ -101,6 +111,7 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
this.scanMetrics = null;
}

@Override
Expand Down Expand Up @@ -188,6 +199,13 @@ public FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter)
return this;
}

@Override
public FileStoreScan withMetrics(ScanMetrics metrics) {
LOG.info("gjli: with a scan metrics");
this.scanMetrics = metrics;
return this;
}

@Override
public Plan plan() {

Expand Down Expand Up @@ -223,6 +241,7 @@ public List<ManifestEntry> files() {

private Pair<Snapshot, List<ManifestEntry>> doPlan(
Function<ManifestFileMeta, List<ManifestEntry>> readManifest) {
long started = System.nanoTime();
List<ManifestFileMeta> manifests = specifiedManifests;
Snapshot snapshot = null;
if (manifests == null) {
Expand All @@ -237,6 +256,9 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
}
}

long startDataFiles =
manifests.stream().mapToLong(f -> f.numAddedFiles() + f.numDeletedFiles()).sum();

Iterable<ManifestEntry> entries =
ParallellyExecuteUtils.parallelismBatchIterable(
files ->
Expand All @@ -248,6 +270,9 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
manifests,
scanManifestParallelism);

long skippedDataFiles =
startDataFiles - StreamSupport.stream(entries.spliterator(), false).count();

List<ManifestEntry> files = new ArrayList<>();
for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
Expand Down Expand Up @@ -295,7 +320,17 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(
.filter(this::filterWholeBucketByStats)
.flatMap(Collection::stream)
.collect(Collectors.toList());

long scanDuration = (System.nanoTime() - started) / 1_000_000;
LOG.info(
"gjli: report scan: scanDuration={}, scannedManifests={}, skippedDataFiles={}, resultedTableFiles={}",
scanDuration,
manifests.size(),
skippedDataFiles,
files.size());
if (scanMetrics != null) {
scanMetrics.reportScan(
new ScanStats(scanDuration, manifests.size(), skippedDataFiles, files.size()));
}
return Pair.of(snapshot, files);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.metrics.scan.ScanMetrics;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.Filter;
Expand Down Expand Up @@ -62,6 +63,8 @@ public interface FileStoreScan {

FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);

FileStoreScan withMetrics(ScanMetrics metrics);

/** Produce a {@link Plan}. */
Plan plan();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public SnapshotReader newSnapshotReader() {
snapshotManager(),
splitGenerator(),
nonPartitionFilterConsumer(),
DefaultValueAssigner.create(tableSchema));
DefaultValueAssigner.create(tableSchema),
name());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,17 @@ public StartingContext startingContext() {

@Override
public Plan plan() {
LOG.info("gjli: plan here");
if (!initialized) {
LOG.info("gjli: initScanner here");
initScanner();
}

if (nextSnapshotId == null) {
LOG.info("gjli: tryFirstPlan here");
return tryFirstPlan();
} else {
LOG.info("gjli: nextPlan here");
return nextPlan();
}
}
Expand Down Expand Up @@ -167,19 +171,22 @@ private Plan nextPlan() {
// first check changes of overwrite
if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE
&& supportStreamingReadOverwrite) {
LOG.info("gjli: meet overwrite");
LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan overwritePlan =
followUpScanner.getOverwriteChangesPlan(nextSnapshotId, snapshotReader);
currentWatermark = overwritePlan.watermark();
nextSnapshotId++;
return overwritePlan;
} else if (followUpScanner.shouldScanSnapshot(snapshot)) {
LOG.info("gjli: need to scan snapshot {}", snapshot.id());
LOG.debug("Find snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan plan = followUpScanner.scan(nextSnapshotId, snapshotReader);
currentWatermark = plan.watermark();
nextSnapshotId++;
return plan;
} else {
LOG.info("gjli: no need to scan snapshot {}, ++", nextSnapshotId);
nextSnapshotId++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.metrics.scan.ScanMetrics;
import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class SnapshotReaderImpl implements SnapshotReader {

private ScanMode scanMode = ScanMode.ALL;
private RecordComparator lazyPartitionComparator;
private final ScanMetrics scanMetrics;

public SnapshotReaderImpl(
FileStoreScan scan,
Expand All @@ -79,8 +81,10 @@ public SnapshotReaderImpl(
SnapshotManager snapshotManager,
SplitGenerator splitGenerator,
BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
DefaultValueAssigner defaultValueAssigner) {
this.scan = scan;
DefaultValueAssigner defaultValueAssigner,
String table) {
this.scanMetrics = new ScanMetrics(table);
this.scan = scan.withMetrics(scanMetrics);
this.tableSchema = tableSchema;
this.options = options;
this.snapshotManager = snapshotManager;
Expand Down
Loading

0 comments on commit 995590b

Please sign in to comment.