Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mamogaaa committed Jun 19, 2024
1 parent f4f7e91 commit 7ec990c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import tech.ytsaurus.client.request.{CompleteOperation, GetOperation, UpdateOper
import tech.ytsaurus.client.rpc.YTsaurusClientAuth
import tech.ytsaurus.core.GUID
import tech.ytsaurus.spyt.wrapper.YtWrapper
import tech.ytsaurus.ysontree.{YTree, YTreeMapNode, YTreeNode, YTreeTextSerializer}
import tech.ytsaurus.ysontree.{YTree, YTreeListNode, YTreeMapNode, YTreeNode, YTreeTextSerializer}

import java.nio.file.Paths
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -313,18 +313,7 @@ private[spark] object YTsaurusOperationManager extends Logging {

val releaseConfig: YTreeMapNode = getDocument(ytClient, releaseConfigPath)

val portoLayers = conf.get(YTSAURUS_PORTO_LAYER_PATHS).map(layers => {
val builder = YTree.listBuilder()
layers.split(',').foreach(layer => {
builder.value(YTree.stringNode(layer))
})
builder.buildList()
}).getOrElse(releaseConfig.getListO("layer_paths").orElse(YTree.listBuilder().buildList()))
conf.get(YTSAURUS_EXTRA_PORTO_LAYER_PATHS).foreach(extraPortoLayers => {
extraPortoLayers.split(',').foreach(layer => {
portoLayers.add(YTree.stringNode(layer))
})
})
val portoLayers = getPortoLayers(conf, releaseConfig.getListO("layer_paths").orElse(YTree.listBuilder().buildList()))

val filePaths = releaseConfig.getListO("file_paths").orElse(YTree.listBuilder().buildList())
val pythonPaths = globalConfig.getMapO("python_cluster_paths").orElse(YTree.mapBuilder().buildMap())
Expand Down Expand Up @@ -396,6 +385,22 @@ private[spark] object YTsaurusOperationManager extends Logging {
}
}

private[ytsaurus] def getPortoLayers(conf: SparkConf, defaultLayers: YTreeListNode): YTreeNode = {
val layers = conf.get(YTSAURUS_PORTO_LAYER_PATHS).map(layers => {
val builder = YTree.listBuilder()
layers.split(',').foreach(layer => {
builder.value(YTree.stringNode(layer))
})
builder.buildList()
}).getOrElse(defaultLayers)
conf.get(YTSAURUS_EXTRA_PORTO_LAYER_PATHS).foreach(extraPortoLayers => {
extraPortoLayers.split(',').foreach(layer => {
layers.add(YTree.stringNode(layer))
})
})
layers
}

private[ytsaurus] def getDocument(ytClient: YTsaurusClient, path: String): YTreeMapNode = {
if (ytClient.existsNode(path).join()) {
ytClient.getNode(path).join().mapNode()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@

package org.apache.spark.scheduler.cluster.ytsaurus

import org.apache.spark.deploy.ytsaurus.Config.SPARK_PRIMARY_RESOURCE
import org.apache.spark.deploy.ytsaurus.Config.{SPARK_PRIMARY_RESOURCE, YTSAURUS_EXTRA_PORTO_LAYER_PATHS, YTSAURUS_PORTO_LAYER_PATHS}
import org.apache.spark.internal.config.{FILES, JARS, SUBMIT_PYTHON_FILES}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers
import tech.ytsaurus.spyt.test.LocalYt
import tech.ytsaurus.ysontree.YTree

import scala.collection.JavaConverters._
import java.util.stream.Collectors

class YTsaurusOperationManagerSuite extends SparkFunSuite with BeforeAndAfter with Matchers {

Expand Down Expand Up @@ -48,4 +53,31 @@ class YTsaurusOperationManagerSuite extends SparkFunSuite with BeforeAndAfter wi

result shouldBe empty
}

test("Test layer_paths override") {
val conf = new SparkConf()
conf.set(SPARK_PRIMARY_RESOURCE, "spark-shell")
conf.set(YTSAURUS_PORTO_LAYER_PATHS, "//path/to/layers/1,//path/to/layers/2")

val result = YTsaurusOperationManager.getPortoLayers(conf, YTree.listBuilder().buildList()).asList().asScala.map(x => x.stringValue())

result should contain theSameElementsAs Seq("//path/to/layers/1", "//path/to/layers/2")
}

test("Test layer_paths override + extra layers") {
val conf = new SparkConf()
conf.set(SPARK_PRIMARY_RESOURCE, "spark-shell")
conf.set(YTSAURUS_PORTO_LAYER_PATHS, "//path/to/layers/1,//path/to/layers/2")
conf.set(YTSAURUS_EXTRA_PORTO_LAYER_PATHS, "//path/to/layers/3,//path/to/layers/4")

val result = YTsaurusOperationManager.getPortoLayers(conf, YTree.listBuilder().buildList()).asList().asScala.map(x => x.stringValue())

result should contain theSameElementsAs Seq(
"//path/to/layers/1",
"//path/to/layers/2",
"//path/to/layers/3",
"//path/to/layers/4",
)
}

}

0 comments on commit 7ec990c

Please sign in to comment.