Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6832] Impl Spark DSv2 YARN Connector that supports reading YARN aggregation logs #6856

Open
wants to merge 41 commits into
base: master
Choose a base branch
from

Conversation

naive-zhang
Copy link
Contributor

@naive-zhang naive-zhang commented Dec 20, 2024

see #6832

Why are the changes needed?

Impl Spark DSv2 YARN Connector that supports reading YARN aggregation logs and YARN Apps

How was this patch tested?

Add tests of YarnAppQuerySuite, YarnCatalogSuite and YarnLogQuerySuite

Was this patch authored or co-authored using generative AI tooling?

Be nice. Be informative.

@codecov-commenter
Copy link

codecov-commenter commented Dec 20, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 0.00%. Comparing base (b265ccb) to head (1d4d45d).

Additional details and impacted files
@@          Coverage Diff           @@
##           master   #6856   +/-   ##
======================================
  Coverage    0.00%   0.00%           
======================================
  Files         687     687           
  Lines       42463   42463           
  Branches     5796    5796           
======================================
  Misses      42463   42463           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class YarnCatalog extends TableCatalog with SupportsNamespaces with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not support namespace, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've remove relevant code~

structType: StructType,
transforms: Array[Transform],
map: util.Map[String, String]): Table = {
throw new UnsupportedOperationException("Create table is not supported")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's canonicalize all error messages to The tables in catalog ${catalogName} does not support ALTER TABLE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for your review, I've modified.

class YarnLogTable extends Table with SupportsRead {
override def name(): String = "app_logs"

override def schema(): StructType =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark also has SupportsMetadataColumns, maybe we should consider converting some cols to metadata col

import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.sources.Filter

trait BasicScanBuilder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this abstract layer is really helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems unnecessary and I've removed it

// fet apps
val applicationReports: java.util.List[ApplicationReport] =
yarnAppPartition.filters match {
case filters if filters.isEmpty => yarnClient.getApplications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it retrieve all apps into memory, or streamly?

// type => in (a,b,c), batch query
case filters =>
filters.collectFirst {
case EqualTo("id", appId: String) => java.util.Collections.singletonList(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one application may have multiple attempts

StructField("start_time", LongType, nullable = false),
StructField("finish_time", LongType, nullable = false),
StructField("tracking_url", StringType, nullable = false),
StructField("original_tracking_url", StringType, nullable = false)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the tracking_url is always present

extensions/spark/kyuubi-spark-connector-yarn/pom.xml Outdated Show resolved Hide resolved

private val remoteAppLogDir = {
val dir = SparkSession.active.sparkContext
.getConf.getOption(remoteAppLogDirKey) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is incorrect.

use SparkSession.conf in SQL cases instead of SparkContext.getConf in SQL cases because the latter returns the global static conf, which is immutable after Spark is launched.

and I think it is not worth having an additional spark conf key, just use SparkSession.sessionState.newHadoopConf to create a Configuration, and read the hadoop conf directly.

private def tryPushDownPredicates(): mutable.Seq[FileStatus] = {
filters match {
case pushed if pushed.isEmpty => listFiles(remoteAppLogDir)
case pushed => pushed.collectFirst {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should support multiple predications ...

case EqualTo("container_id", containerId: String) =>
listFiles(s"${remoteAppLogDir}/*/*/*/*/${containerId}") ++
// compatible for hadoop2
listFiles(s"${remoteAppLogDir}/*/*/*/${containerId}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you leave some comments to explain the directory structure and Hadoop code/JIRA reference?

val fileIterator = fs.listFiles(status.getPath, true)
while (fileIterator.hasNext) {
val fileStatus = fileIterator.next()
if (fileStatus.isFile) logFiles += fileStatus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if dir?

@pan3793
Copy link
Member

pan3793 commented Dec 23, 2024

@naive-zhang it may take some time to merge the whole feature into the master, to speed up the process, you may want to split it into several PRs, for example, the first PR just include a YarnCatalog, and then each PR focus one table

@pan3793
Copy link
Member

pan3793 commented Jan 21, 2025

@naive-zhang Please let me know if you are still interested in this PR, if not, or if you don't have time to complete it, it's also good to let me know so that others who are interested in it can take over~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants