Skip to content

Commit

Permalink
[SPARK-39620][WEB UI] Use same condition in history server page and A…
Browse files Browse the repository at this point in the history
…PI to filter applications

### What changes were proposed in this pull request?

Updated REST API `/api/v1/applications`, to use the same condition as history server page to filter completed/incomplete applications.

### Why are the changes needed?

When opening summary page, history server follows this logic:

- If there's completed/incomplete application, page will add script in response, using AJAX to call the REST API to get the filtered list.
- If there's no such application, page will only return a message telling nothing found.

Issue is that page and REST API are using different conditions to filter applications. In `HistoryPage`, an application is considered as completed as long as the last attempt is completed. But in `ApplicationListResource`, all attempts should be completed. This brings inconsistency and will cause issue in a corner case.

In driver, event queues have capacity to protect memory. When there's too many events, some of them will be dropped and the event log file will be incomplete. For an application with multiple attempts, there's possibility that the last attempt is completed, but the previous attempts is considered as incomplete due to loss of application end event.

For this type of application, page thinks it is completed, but the API thinks it is still running. When opening summary page:
- When checking completed applications, page will call script, but API returns nothing.
- When checking incomplete applications, page returns nothing.

So the user won't be able to see this app in history server.

### Does this PR introduce _any_ user-facing change?

Yes, there will be a change on `/api/v1/applications` API and history server summary page.

When calling API, for application mentioned above, previously it is considered as running. After the change it is considered as completed. So the result will be different using same filter. But this change should be OK. Because attempts are executed sequentially and incrementally. So if an attempt with bigger ID is completed, the previous attempts can be considered as completed.

For history server summary page, previously user is not able to see the application. Now it will appear in the completed applications.

### How was this patch tested?

Add a new unit test `HistoryServerPageSuite`, which will check whether `HistoryPage` behaves the same as `ApplicationListResource` when filtering applications. To implement the test, there's a minor change of `HistoryPage`, exposing a method called `shouldDisplayApplications` to tell whether the summary page will display applications.

The test verifies that:
- If no completed/incomplete application found, `HistoryPage` should not display applications, and API should return an empty list.
- Otherwise, `HistoryPage` should display applications, and API should return a non-empty list.

Currently 2 scenarios are included:
- Application with last attempt completed but previous attempt incomplete.
- Application with last attempt incomplete but previous attempt completed.

Closes #37008 from kuwii/kuwii/hs-fix.

Authored-by: kuwii <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
a0x8o committed Jul 8, 2022
1 parent 159e874 commit 2dccd73
Show file tree
Hide file tree
Showing 29 changed files with 686 additions and 315 deletions.
3 changes: 3 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,11 @@ export("as.DataFrame",
"createDataFrame",
"createExternalTable",
"createTable",
"currentCatalog",
"currentDatabase",
"dropTempTable",
"dropTempView",
"listCatalogs",
"listColumns",
"listDatabases",
"listFunctions",
Expand All @@ -493,6 +495,7 @@ export("as.DataFrame",
"refreshByPath",
"refreshTable",
"setCheckpointDir",
"setCurrentCatalog",
"setCurrentDatabase",
"spark.lapply",
"spark.addFile",
Expand Down
60 changes: 60 additions & 0 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,66 @@

# catalog.R: SparkSession catalog functions

#' Returns the current default catalog
#'
#' Returns the current default catalog.
#'
#' @return name of the current default catalog.
#' @rdname currentCatalog
#' @name currentCatalog
#' @examples
#' \dontrun{
#' sparkR.session()
#' currentCatalog()
#' }
#' @note since 3.4.0
currentCatalog <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "currentCatalog")
}

#' Sets the current default catalog
#'
#' Sets the current default catalog.
#'
#' @param catalogName name of the catalog
#' @rdname setCurrentCatalog
#' @name setCurrentCatalog
#' @examples
#' \dontrun{
#' sparkR.session()
#' setCurrentCatalog("spark_catalog")
#' }
#' @note since 3.4.0
setCurrentCatalog <- function(catalogName) {
sparkSession <- getSparkSession()
if (class(catalogName) != "character") {
stop("catalogName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "setCurrentCatalog", catalogName))
}

#' Returns a list of catalog available
#'
#' Returns a list of catalog available.
#'
#' @return a SparkDataFrame of the list of catalog.
#' @rdname listCatalogs
#' @name listCatalogs
#' @examples
#' \dontrun{
#' sparkR.session()
#' listCatalogs()
#' }
#' @note since 3.4.0
listCatalogs <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
dataFrame(callJMethod(callJMethod(catalog, "listCatalogs"), "toDF"))
}

#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/pkgdown/_pkgdown_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,20 @@ reference:

- title: "SQL Catalog"
- contents:
- currentCatalog
- currentDatabase
- dropTempTable
- dropTempView
- listCatalogs
- listColumns
- listDatabases
- listFunctions
- listTables
- refreshByPath
- refreshTable
- recoverPartitions
- setCurrentCatalog
- setCurrentDatabase
- tableNames
- tables
- uncacheTable
Expand All @@ -283,7 +287,6 @@ reference:
- getLocalProperty
- install.spark
- setCheckpointDir
- setCurrentDatabase
- setJobDescription
- setJobGroup
- setLocalProperty
Expand Down
11 changes: 11 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -4011,6 +4011,17 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column
expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
})

test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", {
expect_equal(currentCatalog(), "spark_catalog")
expect_error(setCurrentCatalog("spark_catalog"), NA)
expect_error(setCurrentCatalog("zxwtyswklpf"),
paste0("Error in setCurrentCatalog : ",
"org.apache.spark.sql.connector.catalog.CatalogNotFoundException: ",
"Catalog 'zxwtyswklpf' plugin class not found: ",
"spark.sql.catalog.zxwtyswklpf is not defined"))
catalogs <- collect(listCatalogs())
})

test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
},
"INVALID_ARRAY_INDEX" : {
"message" : [
"The index <indexValue> is out of bounds. The array has <arraySize> elements. If necessary set <ansiConfig> to \"false\" to bypass this error."
"The index <indexValue> is out of bounds. The array has <arraySize> elements. Use `try_element_at` and increase the array index by 1(the starting array index is 1 for `try_element_at`) to tolerate accessing element at invalid index and return NULL instead. If necessary set <ansiConfig> to \"false\" to bypass this error."
]
},
"INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete = Option(request.getParameter("showIncomplete"))
.getOrElse("false").toBoolean

val displayApplications = parent.getApplicationList()
.exists(isApplicationCompleted(_) != requestedIncomplete)
val displayApplications = shouldDisplayApplications(requestedIncomplete)
val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
val lastUpdatedTime = parent.getLastUpdatedTime()
val providerConfig = parent.getProviderConfig()
Expand Down Expand Up @@ -91,6 +90,10 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
UIUtils.basicSparkPage(request, content, "History Server", true)
}

def shouldDisplayApplications(requestedIncomplete: Boolean): Boolean = {
parent.getApplicationList().exists(isApplicationCompleted(_) != requestedIncomplete)
}

private def makePageLink(request: HttpServletRequest, showIncomplete: Boolean): String = {
UIUtils.prependBaseUri(request, "/?" + "showIncomplete=" + showIncomplete)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[v1] class ApplicationListResource extends ApiRequestContext {
val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)

uiRoot.getApplicationInfoList.filter { app =>
val anyRunning = app.attempts.exists(!_.completed)
val anyRunning = app.attempts.isEmpty || !app.attempts.head.completed
// if any attempt is still running, we consider the app to also still be running;
// keep the app if *any* attempts fall in the right time window
((!anyRunning && includeCompleted) || (anyRunning && includeRunning)) &&
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.net.URL
import javax.servlet.http.HttpServletResponse

import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods.parse
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.status.api.v1.ApplicationStatus
import org.apache.spark.util.Utils

class HistoryServerPageSuite extends SparkFunSuite with BeforeAndAfter {
private implicit val format: DefaultFormats.type = DefaultFormats

private val logDirs = Seq(
getTestResourcePath("spark-events-broken/previous-attempt-incomplete"),
getTestResourcePath("spark-events-broken/last-attempt-incomplete")
)

private var server: Option[HistoryServer] = None
private val localhost: String = Utils.localHostNameForURI()
private var port: Int = -1

private def startHistoryServer(logDir: String): Unit = {
assert(server.isEmpty)
val conf = new SparkConf()
.set(HISTORY_LOG_DIR, logDir)
.set(UPDATE_INTERVAL_S.key, "0")
.set(IS_TESTING, true)
val provider = new FsHistoryProvider(conf)
provider.checkForLogs()
val securityManager = HistoryServer.createSecurityManager(conf)
val _server = new HistoryServer(conf, provider, securityManager, 18080)
_server.bind()
provider.start()
server = Some(_server)
port = _server.boundPort
}

private def stopHistoryServer(): Unit = {
server.foreach(_.stop())
server = None
}

private def callApplicationsAPI(requestedIncomplete: Boolean): Seq[JObject] = {
val param = if (requestedIncomplete) {
ApplicationStatus.RUNNING.toString.toLowerCase()
} else {
ApplicationStatus.COMPLETED.toString.toLowerCase()
}
val (code, jsonOpt, errOpt) = HistoryServerSuite.getContentAndCode(
new URL(s"http://$localhost:$port/api/v1/applications?status=$param")
)
assert(code == HttpServletResponse.SC_OK)
assert(jsonOpt.isDefined)
assert(errOpt.isEmpty)
val json = parse(jsonOpt.get).extract[List[JObject]]
json
}

override def afterEach(): Unit = {
super.afterEach()
stopHistoryServer()
}

test("SPARK-39620: should behaves the same as REST API when filtering applications") {
logDirs.foreach { logDir =>
startHistoryServer(logDir)
val page = new HistoryPage(server.get)
Seq(true, false).foreach { requestedIncomplete =>
val apiResponse = callApplicationsAPI(requestedIncomplete)
if (page.shouldDisplayApplications(requestedIncomplete)) {
assert(apiResponse.nonEmpty)
} else {
assert(apiResponse.isEmpty)
}
}
stopHistoryServer()
}
}
}
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,4 @@ over10k
exported_table/*
ansible-for-test-node/*
node_modules
spark-events-broken/*
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*
*/
public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable {
protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class);
protected static final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class);

private static final int DEFAULT_CAPACITY = 1 << 16;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Evolving;

import java.io.Serializable;

/**
* Represent an extract function, which extracts and returns the value of a
* specified datetime field from a datetime or interval value expression.
* <p>
* The currently supported fields names following the ISO standard:
* <ol>
* <li> <code>SECOND</code> Since 3.4.0 </li>
* <li> <code>MINUTE</code> Since 3.4.0 </li>
* <li> <code>HOUR</code> Since 3.4.0 </li>
* <li> <code>MONTH</code> Since 3.4.0 </li>
* <li> <code>QUARTER</code> Since 3.4.0 </li>
* <li> <code>YEAR</code> Since 3.4.0 </li>
* <li> <code>DAY_OF_WEEK</code> Since 3.4.0 </li>
* <li> <code>DAY</code> Since 3.4.0 </li>
* <li> <code>DAY_OF_YEAR</code> Since 3.4.0 </li>
* <li> <code>WEEK</code> Since 3.4.0 </li>
* <li> <code>YEAR_OF_WEEK</code> Since 3.4.0 </li>
* </ol>
*
* @since 3.4.0
*/

@Evolving
public class Extract implements Expression, Serializable {

private String field;
private Expression source;

public Extract(String field, Expression source) {
this.field = field;
this.source = source;
}

public String field() { return field; }
public Expression source() { return source; }

@Override
public Expression[] children() { return new Expression[]{ source() }; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,24 @@
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>DATE_ADD</code>
* <ul>
* <li>SQL semantic: <code>DATE_ADD(start_date, num_days)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>DATE_DIFF</code>
* <ul>
* <li>SQL semantic: <code>DATE_DIFF(end_date, start_date)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* <li>Name: <code>TRUNC</code>
* <ul>
* <li>SQL semantic: <code>TRUNC(date, format)</code></li>
* <li>Since version: 3.4.0</li>
* </ul>
* </li>
* </ol>
* Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off,
* including: add, subtract, multiply, divide, remainder, pmod.
Expand Down
Loading

0 comments on commit 2dccd73

Please sign in to comment.