Skip to content

Commit

Permalink
空表统一返回空df (#92)
Browse files Browse the repository at this point in the history
* 空表统一返回空df

* 修复失败的测试用例
  • Loading branch information
hellozepp authored Dec 9, 2022
1 parent 0fd7a92 commit 64e6f05
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package tech.mlsql.test.plugins.render

import org.apache.spark.streaming.SparkOperationUtil
import org.scalatest.FunSuite
import org.scalatest.funsuite.AnyFunSuite
import streaming.core.strategy.platform.SparkRuntime
import streaming.dsl.ScriptSQLExec
import tech.mlsql.job.JobManager
import tech.mlsql.plugins.render.ExceptionReplaceRender
import tech.mlsql.runtime.plugins.request_cleaner.RequestCleanerManager
import tech.mlsql.test.BasicMLSQLConfig

class ExceptionReplaceRenderSuite extends FunSuite with SparkOperationUtil with BasicMLSQLConfig {
class ExceptionReplaceRenderSuite extends AnyFunSuite with SparkOperationUtil with BasicMLSQLConfig {
test("ExceptionReplaceRender should work") {
withBatchContext(setupBatchContext(batchParamsWithoutHive)) { runtime: SparkRuntime =>
JobManager.init(runtime.sparkSession, initialDelay = 2 , checkTimeInterval = 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class FigTest extends AnyFunSuite {
| unemp: "失业率"""".stripMargin)
}

"yaml" should "with matrix" in {

test("yaml should with matrix") {
printCode(
"""
|runtime:
Expand All @@ -103,7 +102,7 @@ class FigTest extends AnyFunSuite {
| cbar: False""".stripMargin)
}

"yaml" should "with auc" in {
test("yaml should with auc") {

printCode(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class SQLDescriptiveMetrics(override val uid: String) extends SQLAlg with MllibF
if (metricSize <= 0) {
throw new IllegalArgumentException("The limit parameter `metricSize` is not allowed to be less than 1!")
}
if (df.isEmpty){
return df.sparkSession.emptyDataFrame
}
import spark.implicits._
val descriptiveRes = getDescriptiveMetrics(df, metricSize)
spark.createDataset[(String, String)](descriptiveRes).toDF("columnName", "descriptiveMetrics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class SQLPatternDistribution(override val uid: String) extends SQLAlg with Mllib

val find_patterns_udf = udf(SQLPatternDistribution.find_patterns(_, internalChLimit))
val find_alternative_pattern_udf = udf(SQLPatternDistribution.find_alternativePatterns(_, internalChLimit))
if (df.isEmpty){
return df.sparkSession.emptyDataFrame
}
val strColumns = df.schema.filter(_.dataType == StringType).map(s => col(s.name))
val fDf = df.select(strColumns: _*)
val res = fDf.schema.par.map(sc => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class SQLDataSummaryV2Test extends AnyFunSuite with SparkOperationUtil with Basi
println(res1DF.collect()(1).mkString(","))
println(res1DF.collect()(2).mkString(","))
println(res1DF.collect()(3).mkString(","))
assert(res1DF.collect()(0).mkString(",") === "name,1,,,,0.1667,5,string,elena,5,,AA,0,6,0.0,,,,1.0,,1,")
assert(res1DF.collect()(1).mkString(",") === "age,2,23.25,35.0,47.5,0.0,4,integer,57.0,,34.67,10.0,,6,0.0,-0.11,17.77,7.26,1.0,6,1,")
assert(res1DF.collect()(2).mkString(",") === "income,3,,,,0.0,6,string,533000.0,6,,432000.0,6,6,0.0,,,,0.6667,,0,433000.0")
assert(res1DF.collect()(3).mkString(",") === "date,4,,,,0.0,8,timestamp,2021-03-08 18:00:00,,,2021-03-08 18:00:00,,6,0.0,,,,0.1667,,0,2021-03-08 18:00:00")
assert(res1DF.collect()(0).mkString(",") === "name,1,,,,0.1667,5,string,elena,5,,AA,0,6,0.0,,,,1.0,5,1,")
assert(res1DF.collect()(1).mkString(",") === "age,2,23.25,35.0,47.5,0.0,4,integer,57.0,,34.67,10.0,,6,0.0,-0.11,17.77,7.26,1.0,,1,")
assert(res1DF.collect()(2).mkString(",") === "income,3,,,,0.0,6,string,533000.0,6,,432000.0,6,6,0.0,,,,0.6667,4,0,433000.0")
assert(res1DF.collect()(3).mkString(",") === "date,4,,,,0.0,8,timestamp,2021-03-08 18:00:00,,,2021-03-08 18:00:00,,6,0.0,,,,0.1667,1,0,2021-03-08 18:00:00")

val sseq = Seq(
("elena", 57, 57, 110L, "433000", Timestamp.valueOf(LocalDateTime.of(2021, 3, 8, 18, 0)), 110F, true, null, null, BigDecimal.valueOf(12), 1.123D),
Expand All @@ -109,18 +109,18 @@ class SQLDataSummaryV2Test extends AnyFunSuite with SparkOperationUtil with Basi
println(res2DF.collect()(9).mkString(","))
println(res2DF.collect()(10).mkString(","))
println(res2DF.collect()(11).mkString(","))
assert(res2DF.collect()(0).mkString(",") === "name,1,,,,0.1667,5,string,elena,5,,AA,0,6,0.0,,,,1.0,,1,")
assert(res2DF.collect()(1).mkString(",") === "favoriteNumber,2,14.25,57.0,57.0,0.0,4,integer,57.0,,37.83,-1.0,,6,0.0,-0.71,29.69,12.12,0.5,3,0,57.0")
assert(res2DF.collect()(2).mkString(",") === "age,3,23.25,35.0,47.5,0.0,4,integer,57.0,,34.67,10.0,,6,0.0,-0.11,17.77,7.26,1.0,6,1,")
assert(res2DF.collect()(3).mkString(",") === "mock_col1,4,112.5,125.0,145.0,0.0,8,long,160.0,,128.33,100.0,,6,0.0,0.22,23.17,9.46,1.0,6,1,")
assert(res2DF.collect()(4).mkString(",") === "income,5,,,,0.1667,6,string,533000.0,6,,432000.0,0,6,0.0,,,,0.8,,0,433000.0")
assert(res2DF.collect()(5).mkString(",") === "date,6,,,,0.0,8,timestamp,2021-03-08 18:00:00,,,2021-03-08 18:00:00,,6,0.0,,,,0.1667,,0,2021-03-08 18:00:00")
assert(res2DF.collect()(6).mkString(",") === "mock_col2,7,117.5,125.0,135.0,0.0,4,float,150.0,,127.5,110.0,,6,0.3333,0.43,17.08,8.54,1.0,4,1,")
assert(res2DF.collect()(7).mkString(",") === "alived,8,,,,0.0,1,boolean,true,,,false,,6,0.0,,,,0.3333,,0,true")
assert(res2DF.collect()(8).mkString(",") === "extra,9,,,,0.0,,void,,,,,,0,1.0,,,,0.0,,0,")
assert(res2DF.collect()(9).mkString(",") === "extra1,10,,,,0.0,,void,,,,,,0,1.0,,,,0.0,,0,")
assert(res2DF.collect()(10).mkString(",") === "extra2,11,2.0,2.0,2.0,0.0,16,decimal(38,18),12.0,,3.67,2.0,,6,0.0,1.79,4.08,1.67,0.3333,2,0,2.0")
assert(res2DF.collect()(11).mkString(",") === "extra3,12,1.34,2.11,3.09,0.0,8,double,3.38,,2.2,1.12,,6,0.0,0.15,1.01,0.41,0.6667,4,0,")
assert(res2DF.collect()(0).mkString(",") === "name,1,,,,0.1667,5,string,elena,5,,AA,0,6,0.0,,,,1.0,5,1,")
assert(res2DF.collect()(1).mkString(",") === "favoriteNumber,2,14.25,57.0,57.0,0.0,4,integer,57.0,,37.83,-1.0,,6,0.0,-0.71,29.69,12.12,0.5,,0,57.0")
assert(res2DF.collect()(2).mkString(",") === "age,3,23.25,35.0,47.5,0.0,4,integer,57.0,,34.67,10.0,,6,0.0,-0.11,17.77,7.26,1.0,,1,")
assert(res2DF.collect()(3).mkString(",") === "mock_col1,4,112.5,125.0,145.0,0.0,8,long,160.0,,128.33,100.0,,6,0.0,0.22,23.17,9.46,1.0,,1,")
assert(res2DF.collect()(4).mkString(",") === "income,5,,,,0.1667,6,string,533000.0,6,,432000.0,0,6,0.0,,,,0.8,4,0,433000.0")
assert(res2DF.collect()(5).mkString(",") === "date,6,,,,0.0,8,timestamp,2021-03-08 18:00:00,,,2021-03-08 18:00:00,,6,0.0,,,,0.1667,1,0,2021-03-08 18:00:00")
assert(res2DF.collect()(6).mkString(",") === "mock_col2,7,117.5,125.0,135.0,0.0,4,float,150.0,,127.5,110.0,,6,0.3333,0.43,17.08,8.54,1.0,,1,")
assert(res2DF.collect()(7).mkString(",") === "alived,8,,,,0.0,1,boolean,true,,,false,,6,0.0,,,,0.3333,2,0,true")
assert(res2DF.collect()(8).mkString(",") === "extra,9,,,,0.0,,void,,,,,,0,1.0,,,,0.0,0,0,")
assert(res2DF.collect()(9).mkString(",") === "extra1,10,,,,0.0,,void,,,,,,0,1.0,,,,0.0,0,0,")
assert(res2DF.collect()(10).mkString(",") === "extra2,11,2.0,2.0,2.0,0.0,16,decimal(38,18),12.0,,3.67,2.0,,6,0.0,1.79,4.08,1.67,0.3333,,0,2.0")
assert(res2DF.collect()(11).mkString(",") === "extra3,12,1.34,2.11,3.09,0.0,8,double,3.38,,2.2,1.12,,6,0.0,0.15,1.01,0.41,0.6667,,0,")

val sseq2: Seq[(Null, Null)] = Seq(
(null, null),
Expand All @@ -133,8 +133,8 @@ class SQLDataSummaryV2Test extends AnyFunSuite with SparkOperationUtil with Basi
res3DF.show()
println(res3DF.collect()(0).mkString(","))
println(res3DF.collect()(1).mkString(","))
assert(res3DF.collect()(0).mkString(",") === "col1,1,,,,0.0,,void,,,,,,0,1.0,,,,0.0,,0,")
assert(res3DF.collect()(1).mkString(",") === "col2,2,,,,0.0,,void,,,,,,0,1.0,,,,0.0,,0,")
assert(res3DF.collect()(0).mkString(",") === "col1,1,,,,0.0,,void,,,,,,0,1.0,,,,0.0,0,0,")
assert(res3DF.collect()(1).mkString(",") === "col2,2,,,,0.0,,void,,,,,,0,1.0,,,,0.0,0,0,")

val colNames = Array("id", "name", "age", "birth")
val schema = StructType(colNames.map(fieldName => StructField(fieldName, StringType, nullable = true)))
Expand Down

0 comments on commit 64e6f05

Please sign in to comment.