Skip to content

Commit

Permalink
[CELEBORN-1692] Set mount point in fromPbFileInfoMap
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa authored and SteNicholas committed Nov 7, 2024
1 parent 612b1e4 commit 10530fa
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.InvalidProtocolBufferException

import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationMeta, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
Expand Down Expand Up @@ -131,7 +131,8 @@ object PbSerDeUtils {
@throws[InvalidProtocolBufferException]
def fromPbFileInfoMap(
data: Array[Byte],
cache: ConcurrentHashMap[String, UserIdentifier]): ConcurrentHashMap[String, DiskFileInfo] = {
cache: ConcurrentHashMap[String, UserIdentifier],
mountPoints: util.HashSet[String]): ConcurrentHashMap[String, DiskFileInfo] = {
val pbFileInfoMap = PbFileInfoMap.parseFrom(data)
val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]
pbFileInfoMap.getValuesMap.entrySet().asScala.foreach { entry =>
Expand All @@ -141,10 +142,16 @@ object PbSerDeUtils {
val userIdentifierKey = pbUserIdentifier.getTenantId + "-" + pbUserIdentifier.getName
if (!cache.containsKey(userIdentifierKey)) {
val fileInfo = fromPbFileInfo(pbFileInfo)
if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
fileInfo.setMountPoint(DeviceInfo.getMountPoint(fileInfo.getFilePath, mountPoints))
}
cache.put(userIdentifierKey, fileInfo.getUserIdentifier)
fileInfoMap.put(fileName, fileInfo)
} else {
val fileInfo = fromPbFileInfo(pbFileInfo, cache.get(userIdentifierKey))
if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
fileInfo.setMountPoint(DeviceInfo.getMountPoint(fileInfo.getFilePath, mountPoints))
}
fileInfoMap.put(fileName, fileInfo)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation, PbPackedWorkerResource, PbWorkerResource, StorageInfo}
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.quota.ResourceConsumption
Expand Down Expand Up @@ -75,9 +75,28 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
new ReduceFileMeta(chunkOffsets2, 123),
file2.getAbsolutePath,
6000L)
val mapFileInfo1 = new DiskFileInfo(
userIdentifier1,
true,
new MapFileMeta(1024, 10),
file1.getAbsolutePath,
6000L)
val mapFileInfo2 = new DiskFileInfo(
userIdentifier2,
true,
new MapFileMeta(1024, 10),
file2.getAbsolutePath,
6000L)
val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]()
mapFileInfo1.setMountPoint("/mnt")
mapFileInfo2.setMountPoint("/mnt")

fileInfoMap.put("file1", fileInfo1)
fileInfoMap.put("file2", fileInfo2)
fileInfoMap.put("mapFile1", mapFileInfo1)
fileInfoMap.put("mapFile2", mapFileInfo2)
val mountPoints = new util.HashSet[String]
mountPoints.add("/mnt")
val cache = JavaUtils.newConcurrentHashMap[String, UserIdentifier]()

val resourceConsumption1 = ResourceConsumption(1000, 2000, 3000, 4000)
Expand Down Expand Up @@ -204,7 +223,7 @@ class PbSerDeUtilsTest extends CelebornFunSuite {

test("fromAndToPbFileInfoMap") {
val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, cache)
val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, cache, mountPoints)
val restoredFileInfo1 = restoredFileInfoMap.get("file1")
val restoredFileInfo2 = restoredFileInfoMap.get("file2")

Expand All @@ -221,6 +240,16 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredFileInfo2.getUserIdentifier.equals(fileInfo2.getUserIdentifier))
}

test("fromAndToPBFileInfoMapMountPoint") {
val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, cache, mountPoints)
val restoredFileInfo1 = restoredFileInfoMap.get("mapFile1")
val restoredFileInfo2 = restoredFileInfoMap.get("mapFile2")

assert(restoredFileInfo1.getMountPoint.equals(mapFileInfo1.getMountPoint))
assert(restoredFileInfo2.getMountPoint.equals(mapFileInfo2.getMountPoint))
}

test("fromAndToPbUserIdentifier") {
val pbUserIdentifier = PbSerDeUtils.toPbUserIdentifier(userIdentifier1)
val restoredUserIdentifier = PbSerDeUtils.fromPbUserIdentifier(pbUserIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
if (key.startsWith(SHUFFLE_KEY_PREFIX)) {
val shuffleKey = parseDbShuffleKey(key)
try {
val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache)
val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache, mountPoints)
logDebug(s"Reload DB: $shuffleKey -> $files")
diskFileInfos.put(shuffleKey, files)
committedFileInfos.put(shuffleKey, files)
Expand Down

0 comments on commit 10530fa

Please sign in to comment.