Skip to content

Commit

Permalink
[FLINK-33354][runtime] Using the InputStream instead of byte array to…
Browse files Browse the repository at this point in the history
… avoid contiguous huge memory usage
  • Loading branch information
1996fanrui committed Nov 7, 2023
1 parent b1786ba commit b759794
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.util.GroupCache;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

import javax.annotation.Nullable;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.util.List;

/**
Expand Down Expand Up @@ -281,7 +282,7 @@ public void loadBigData(
// enters a terminal state)
jobInformation =
InstantiationUtil.deserializeObject(
FileUtils.readAllBytes(dataFile.toPath()),
new BufferedInputStream(Files.newInputStream(dataFile.toPath())),
getClass().getClassLoader());
jobInformationCache.put(jobId, jobInfoKey, jobInformation);
}
Expand All @@ -303,7 +304,7 @@ public void loadBigData(
// enters a terminal state)
taskInformation =
InstantiationUtil.deserializeObject(
FileUtils.readAllBytes(dataFile.toPath()),
new BufferedInputStream(Files.newInputStream(dataFile.toPath())),
getClass().getClassLoader());
taskInformationCache.put(jobId, taskInfoKey, taskInformation);
}
Expand Down

0 comments on commit b759794

Please sign in to comment.