Skip to content

Commit

Permalink
[flink] Return creationTimeEpochMills directly
Browse files Browse the repository at this point in the history
  • Loading branch information
schnappi17 committed Aug 11, 2023
1 parent e9d2958 commit b3034bb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -77,9 +76,8 @@ public boolean isStreaming() {
return isStreaming;
}

public Optional<DataFileMeta> getLatestFile() {
return this.dataFiles.stream()
.max(Comparator.comparingLong(f -> f.creationTime().getMillisecond()));
public OptionalLong getLatestFileCreationEpochMillis() {
return this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.table.source.DataSplit;
Expand All @@ -45,7 +44,6 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;

Expand Down Expand Up @@ -181,11 +179,7 @@ private void checkSplitOrStartNext() throws IOException {

// update metric when split changes
if (sourceReaderMetrics != null && nextSplit.split() instanceof DataSplit) {
Optional<DataFileMeta> latestFile = ((DataSplit) nextSplit.split()).getLatestFile();
long eventTime =
latestFile.isPresent()
? latestFile.get().creationTimeEpochMillis()
: FileStoreSourceReaderMetrics.UNDEFINED;
long eventTime = ((DataSplit) nextSplit.split()).getLatestFileCreationEpochMillis().orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
}

Expand Down

0 comments on commit b3034bb

Please sign in to comment.