Skip to content

Commit

Permalink
[core] support reader pending records
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Aug 3, 2023
1 parent 9f8e151 commit db236ae
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.source;

import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.TableRead;

import org.apache.flink.api.connector.source.SourceReader;
Expand All @@ -41,7 +42,7 @@ public FileStoreSourceReader(
SourceReaderContext readerContext, TableRead tableRead, @Nullable Long limit) {
// limiter is created in SourceReader, it can be shared in all split readers
super(
() -> new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(limit)),
() -> new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(limit), new FileStoreSourceReaderMetrics(readerContext.metricGroup()), readerContext.getIndexOfSubtask()),
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
Expand All @@ -55,7 +56,7 @@ public FileStoreSourceReader(
elementsQueue) {
super(
elementsQueue,
() -> new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(limit)),
() -> new FileStoreSourceSplitReader(tableRead, RecordLimiter.create(limit), new FileStoreSourceReaderMetrics(readerContext.metricGroup()), readerContext.getIndexOfSubtask()),
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.table.source.Split;
Expand Down Expand Up @@ -63,13 +64,19 @@ public class FileStoreSourceSplitReader

private boolean paused;

public FileStoreSourceSplitReader(TableRead tableRead, @Nullable RecordLimiter limiter) {
private final FileStoreSourceReaderMetrics fileStoreSourceReaderMetrics;
private long currentPendingRecords;
private int subTask;

public FileStoreSourceSplitReader(TableRead tableRead, @Nullable RecordLimiter limiter, FileStoreSourceReaderMetrics fileStoreSourceReaderMetrics, int subTask) {
this.tableRead = tableRead;
this.limiter = limiter;
this.splits = new LinkedList<>();
this.pool = new Pool<>(1);
this.pool.add(new FileStoreRecordIterator());
this.paused = false;
this.fileStoreSourceReaderMetrics = fileStoreSourceReaderMetrics;
this.subTask = subTask;
}

@Override
Expand Down Expand Up @@ -121,6 +128,7 @@ public void handleSplitsChanges(SplitsChange<FileStoreSourceSplit> splitsChange)
}

splits.addAll(splitsChange.splits());
currentPendingRecords = splits.stream().map(split -> split.split().rowCount()).reduce(0L, Long::sum);
}

/**
Expand Down Expand Up @@ -174,7 +182,10 @@ private void checkSplitOrStartNext() throws IOException {
}
if (currentNumRead > 0) {
seek(currentNumRead);
currentPendingRecords -= currentNumRead;
}
// Track this reader's record lag
fileStoreSourceReaderMetrics.updateRecordsLag(subTask, currentPendingRecords);
}

private void seek(long toSkip) throws IOException {
Expand Down Expand Up @@ -293,4 +304,8 @@ public Set<String> finishedSplits() {
return Collections.emptySet();
}
}

public long getCurrentPendingRecords() {
return currentPendingRecords;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.source.metrics;

import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.concurrent.ConcurrentMap;


/**
* Source reader metrics.
*/
public class FileStoreSourceReaderMetrics {
private static final Logger LOG = LoggerFactory.getLogger(FileStoreSourceReaderMetrics.class);

// Source reader metric group
private final SourceReaderMetricGroup sourceReaderMetricGroup;

// Map for tracking records lag of topic partitions
@Nullable
private ConcurrentMap<Integer, Long> recordsLagMetrics;

public FileStoreSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) {
this.sourceReaderMetricGroup = sourceReaderMetricGroup;
this.sourceReaderMetricGroup.setPendingRecordsGauge(
() -> {
long pendingRecordsTotal = 0;
for (long recordsLag : this.recordsLagMetrics.values()) {
pendingRecordsTotal += recordsLag;
}
return pendingRecordsTotal;
});
}

/**
* Add a partition's records-lag metric to tracking list if this partition never appears before.
*
* <p>This method also lazily register {@link
* org.apache.flink.runtime.metrics.MetricNames#PENDING_RECORDS} in {@link
* SourceReaderMetricGroup}
*
*/
public void updateRecordsLag(int subTask, long bucketLag) {
recordsLagMetrics.putIfAbsent(subTask, bucketLag);
}
}

0 comments on commit db236ae

Please sign in to comment.