Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23486]cache the function name from the external catalog for lookupFunctions #20795

Closed
wants to merge 12 commits into from

Conversation

kevinyu98
Copy link
Contributor

@kevinyu98 kevinyu98 commented Mar 11, 2018

What changes were proposed in this pull request?

This PR will cache the function name from external catalog, it is used by lookupFunctions in the analyzer, and it is cached for each query plan. The original problem is reported in the spark-19737

How was this patch tested?

create new test file LookupFunctionsSuite and add test case in SessionCatalogSuite

@hvanhovell
Copy link
Contributor

Ok to test

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test for this too.

throw new NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName)
override def apply(plan: LogicalPlan): LogicalPlan = {
val catalogFunctionNameSet = new mutable.HashSet[FunctionIdentifier]()
plan.transformAllExpressions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: indent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed, thanks.

@viirya
Copy link
Member

viirya commented Mar 11, 2018

I've added a test like this:

class LookupFunctionsSuite extends PlanTest {

  test("SPARK-23486: LookupFunctions should not check the same function name more than once") {
    val externalCatalog = new CustomInMemoryCatalog
    val analyzer = {
      val conf = new SQLConf()
      val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin, conf)
      catalog.createDatabase(
        CatalogDatabase("default", "", new URI("loc"), Map.empty),
        ignoreIfExists = false)
      new Analyzer(catalog, conf)
    }

    val unresolvedFunc = UnresolvedFunction("func", Seq.empty, false)
    val plan = Project(
      Seq(Alias(unresolvedFunc, "call1")(), Alias(unresolvedFunc, "call2")()),
      table("TaBlE"))
    analyzer.LookupFunctions.apply(plan)
    assert(externalCatalog.getFunctionExistsCalledTimes == 1)
  }
}

class CustomInMemoryCatalog extends InMemoryCatalog {

  private var functionExistsCalledTimes: Int = 0

  override def functionExists(db: String, funcName: String): Boolean = synchronized {
    functionExistsCalledTimes = functionExistsCalledTimes + 1
    true
  }

  def getFunctionExistsCalledTimes: Int = functionExistsCalledTimes
}

Maybe others have better idea.

@SparkQA
Copy link

SparkQA commented Mar 11, 2018

Test build #88161 has finished for PR 20795 at commit 701100c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kevinyu98
Copy link
Contributor Author

@viirya Thanks a lot. I will create a new test file LookupFunctionsSuite under sql/catalyst/analysis.

plan.transformAllExpressions {
case f: UnresolvedFunction if catalogFunctionNameSet.contains(f.name) => f
case f: UnresolvedFunction if catalog.functionExists(f.name) =>
catalogFunctionNameSet.add(f.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normalize the name before adding it to the cache? This can cover more cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I will do that. Thanks.

override def apply(plan: LogicalPlan): LogicalPlan = {
val catalogFunctionNameSet = new mutable.HashSet[FunctionIdentifier]()
plan.transformAllExpressions {
case f: UnresolvedFunction if catalogFunctionNameSet.contains(f.name) => f
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normalize FunctionIdentifier when looking up it too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will normalize the look up too. Thanks.

}

private def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = {
FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT), name.database)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name.database.getOrElse("default")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the FunctionIdentifier's signature for database is Option, it is not string. since we are just used in this local cache, I think it is ok to not convert to "default" string. I saw when we do the registerFunction in FunctionRegistry.scala, we didn't put the "default" in normalizeFuncName either. What do you think? thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Hello Simon, I will leave this as it is for now, but if you think it is better to have the Option(name.database.getOrElse("default")) in the catalogFunctionNameSet, let me know. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry for replying late. What I thought is if two FunctionIdentifiers, one is with default database name Some("default"), another is None. They should be equal to each other here.

I actually mean name.database.orElse(Some("default")).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Shouldn't we be using the current database if database is not specified ? I am trying to understand why we should use "default" here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinyu98 For built-in functions, we don't need to normalize their database name.

Rethink about it, actually here it is not for function resolution, I think it is OK to leave it as name.database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya @kevinyu98 We need to check what happens in the following case .

use currentdb;
select currentdb.function1(), function1() from ....

In this case, the 2nd function should be resolved from the local cache if this optimization
were to work. If we just use name.database instead of defaulting to current database , will it still happen ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dilipbiswal @viirya Thanks for pointing this out. If we just use the name.database, the cache will store "None" for the database name, the 2nd function will not resolved from the local cache. We need to use the catalog.getCurrentDatabase for the database name in the cache.
After running more test cases, I think it is better to cache the external function name only, not include the build-in function. If we all agree this approach, I can submit the code for review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For built-in functions, it may no a big deal if we don't find it in this cache. It should be very fast to query built-in functions. I remember the main issue of this ticket is external function lookup where it means more loading on connection with metastore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree @viirya

@SparkQA
Copy link

SparkQA commented Mar 13, 2018

Test build #88188 has finished for PR 20795 at commit 99cc3b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 13, 2018

Test build #88189 has finished for PR 20795 at commit 211abcb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class LookupFunctionsSuite extends PlanTest
  • class CustomInMemoryCatalog extends InMemoryCatalog

@SparkQA
Copy link

SparkQA commented Mar 16, 2018

Test build #88292 has finished for PR 20795 at commit 1e5ba02.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

f
case f: UnresolvedFunction =>
withPosition(f) {
throw new NoSuchFunctionException(f.name.database.getOrElse("default"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think this should be current database instead of default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Yeah..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya @dilipbiswal @gatorsmile @liancheng ok, I will change this.

@gatorsmile
Copy link
Member

Please ping me if this is ready to review.

@SparkQA
Copy link

SparkQA commented Mar 19, 2018

Test build #88386 has finished for PR 20795 at commit 93b115e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88447 has finished for PR 20795 at commit 17f7e74.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -175,6 +175,8 @@ private[sql] class HiveSessionCatalog(
super.functionExists(name) || hiveFunctions.contains(name.funcName)
}

override def externalFunctionExists(name: FunctionIdentifier): Boolean = functionExists(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to your logic, I think HiveSessionCatalog should override both buildinFunctionExists and externalFunctionExists. Like:

override def buildinFunctionExists(name: FunctionIdentifier): Boolean = {
   super.buildinFunctionExists(name) || hiveFunctions.contains(name.funcName)
}
override def externalFunctionExists(name: FunctionIdentifier): Boolean = functionExists(name) = {
  super.externalFunctionExists(name)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 thanks very much for reviewing. I am a little confused. So HiveSessionCatalog's builtinFunctionExists is essentially same as its parent. That is the reason i didn't override it in HiveSessionCatalog. However the logic to lookup an external function is different in HiveSessionCatalog as we also have to handle the special function "histogram_numeric". Thats why i choose to override the externalFunctionExists. One clarification is that builtinFunctionExists solely looks at FunctionRegistry to lookup a function.

  1. builtInFunctionExists => Looks up a function in FunctionRegistry (same for both SessionCatalog and HiveSessionCatalog
  2. externalFunctionExists => Looks up an external catalog to find the function. HiveSessionCatalog has one additional semantics to handle the special function called histogram_numeric.
  3. as you point out, I need to change the override externalFunctionExists to this:
    override def externalFunctionExists(name: FunctionIdentifier): Boolean = { super.externalFunctionExists(name) || hiveFunctions.contains(name.funcName)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you mean functions like "histogram_numeric" should be regarded as externalFunction in hiveContext ? I am not sure about this. But if that's right your current code is OK :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, seem "histogram_numeric" is not supported in spark natively yet, I think once this jira closed (https://issues.apache.org/jira/browse/SPARK-16280), we don't need these codes in the HiveSessionCatalog.

requireDbExists(db)
externalCatalog.functionExists(db, name.funcName)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid code duplication, you should modify the functionExists like:

def functionExists(name: FunctionIdentifier): Boolean = {
  buildinFunctionExists(name) || externalFunctionExists(name)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will change.

@WeichenXu123
Copy link
Contributor

And I don't think it need to split into builtin and external function exist check in this case. Just following code works fine:

  object LookupFunctions extends Rule[LogicalPlan] {
    override def apply(plan: LogicalPlan): LogicalPlan = {
      val cachedNameSet = new mutable.HashSet[FunctionIdentifier]()
      plan.transformAllExpressions {
        case f: UnresolvedFunction
          if cachedNameSet.contains(normalizeFuncName(f.name)) => f
        case f: UnresolvedFunction if catalog.functionExists(f.name) =>
          cachedNameSet.add(normalizeFuncName(f.name))
          f
        case f: UnresolvedFunction =>
          withPosition(f) {
            throw new NoSuchFunctionException(f.name.database.getOrElse(catalog.getCurrentDatabase),
              f.name.funcName)
          }
      }
    }

    private def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = ...
  }

Isn't it ?

@kevinyu98
Copy link
Contributor Author

@WeichenXu123 I didn't split until this disussion [discussion] (#20795 (comment)). The original jira report is about lookup HiveSessionCatalog, so I think to just cache the externalFunctionName to avoid the complexity of code.

@viirya
Copy link
Member

viirya commented Mar 22, 2018

I'm also a bit confusing why we need to split built-in and external functions.

@WeichenXu123
Copy link
Contributor

Yea, I understand the reason to split built-in and external because you only want to cache external function name. But cache all used function names in a query do not cost too much so that maybe do not worth to do that.

@kevinyu98
Copy link
Contributor Author

the reason I was thinking to split is for the below scenario:
In order to avoid cache twice for the external function name in the cache as the scenario described by Dilip, we decide to use getCurrentDatabase during normalizeFuncName.

but it will fail for the spark's builtin function, for example:

use currentdb;
select function1(), currentdb.function1() from ...

if the function1 is builtin function, for example max, and the currentdb doesn't have the function max.

the first time, max will be found from builtin function checking (functionRegistry.functionExists(name)), spark's builtin function checking didn't use the
database name if you don't explicit specify. So the cache will store the builtin function max as

currentdb.max

the second function currentdb.max will be found in the cache, even the currentdb doesn't have the max function.

but during ResolveFunctions in the analyzer, currentdb.max can't be resolved, and it will get NoSuchFunctionException for max.

@viirya
Copy link
Member

viirya commented Mar 22, 2018

Can we just skip caching for built-in functions?

@kevinyu98
Copy link
Contributor Author

@viirya yes, my latest submitted code only caching the external functions, skip the built-in functions.
@WeichenXu123 I will change this comment only comment. Let me know if you have concerns? Thanks.

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Test build #88576 has finished for PR 20795 at commit 029ee6c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Mar 26, 2018

Test build #88580 has finished for PR 20795 at commit 029ee6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

builtinFunctionExists(name) || externalFunctionExists(name)
}

def builtinFunctionExists(name: FunctionIdentifier): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the functions in functionRegistry are built-in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile sorry for the delay. Not all the functions in functionRegistry are built-in, there could be UDF. I am thinking to change builtinFunctionExists to registryFunctionExist, what do you think? I also need to change the PR description if we just cache the external function names. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us do not add these APIs to SessionCatalog. Just create private functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /**
   * Returns whether this function has been registered in the function registry of the current
   * session. If not existed, returns false.
   */
  def isRegisteredFunction(name: FunctionIdentifier): Boolean = {
    functionRegistry.functionExists(name)
  }

  /**
   * Returns whether it is a persistent function. If not existed, returns false.
   */
  def isPersistentFunction(name: FunctionIdentifier): Boolean = {
    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
    databaseExists(db) && externalCatalog.functionExists(db, name.funcName)
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add the unit test cases to SessionCatalogSuite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move them after def isTemporaryFunction(name: FunctionIdentifier): Boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Sorry for the delay. I was working on something else. Thanks very much for the help, I will add these two new APIs in SessionCatalog and add test cases in SessionCatalogSuite

@kevinyu98 kevinyu98 changed the title [SPARK-23486]cache the function name from the catalog for lookupFunctions [SPARK-23486]cache the function name from the external catalog for lookupFunctions Apr 10, 2018
@SparkQA
Copy link

SparkQA commented Apr 10, 2018

Test build #89142 has finished for PR 20795 at commit d1ee9cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kevinyu98
Copy link
Contributor Author

sorry for the delay. I was working on some other projects. I am back and focus on addressing the comments now.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92333 has finished for PR 20795 at commit d1ee9cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

ping @kevinyu98 @dilipbiswal

@kevinyu98
Copy link
Contributor Author

@gatorsmile Hi Sean, I am so sorry for the long delay. I will address the comments today and submit the code for reviewing.

Thanks very much !
Kevin

@SparkQA
Copy link

SparkQA commented Jul 11, 2018

Test build #92849 has finished for PR 20795 at commit 8dceda9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(analyzer.LookupFunctions.normalizeFuncName
(unresolvedFunc.name).database == Some("default"))
assert(catalog.isRegisteredFunction(unresolvedFunc.name) == false)
assert(catalog.isRegisteredFunction(FunctionIdentifier("max")) == true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean adding another test case to check whether LookupFunctions does not resolve the registeredFunction more than once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need to add assert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I add the test case, can you verify ? thanks a lot.


def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = {
FunctionIdentifier(name.funcName.toLowerCase(Locale.ROOT),
name.database.orElse(Some(catalog.getCurrentDatabase)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinyu98 I have a question. So we normalize the funcName here. How about name.database ? Is that normalized already by the time we are here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinyu98 how about consideration of conf.caseSensitiveAnalysis ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I will change the code for the name.database. Thanks.

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92915 has finished for PR 20795 at commit 0db2826.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CustomerFunctionRegistry extends SimpleFunctionRegistry

@dilipbiswal
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 12, 2018

Test build #92919 has finished for PR 20795 at commit 0db2826.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class CustomerFunctionRegistry extends SimpleFunctionRegistry

}
}

class CustomerFunctionRegistry extends SimpleFunctionRegistry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinyu98 Instead of extending FunctionRegistry and Catalog, what do think of extending SessionCatalog and overriding isRegisteredFunction and isPersistentFunction. So after a invocation of LookupFunction we get a count of how many times isRegisteredFunction was called and how many times isPersistentFunction was called ? We can just create an instance of analyzer with a extended Session catalog that we can use in more than one test ? Would that be simpler ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is fine to me. The major goal of these test cases is to count the number of invocation of functionExists. That is why the current way is more straightforward to reviewers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Sure Sean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

}
}

def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a common utility function. We can refactor the code later.

@gatorsmile
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Jul 13, 2018

Test build #92954 has finished for PR 20795 at commit 26f2f54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master

@asfgit asfgit closed this in 0ce11d0 Jul 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants