diff --git a/docs/SparkContext.md b/docs/SparkContext.md index cb2898da5b..61f69801ab 100644 --- a/docs/SparkContext.md +++ b/docs/SparkContext.md @@ -354,7 +354,7 @@ Multiple external cluster managers registered for the url [url]: [serviceLoaders `getClusterManager` is used when `SparkContext` is requested for a [SchedulerBackend and TaskScheduler](#createTaskScheduler). -## Running Job Synchronously +## Running Job (Synchronously) { #runJob } ```scala runJob[T, U: ClassTag]( @@ -368,7 +368,7 @@ runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] -runJob[T, U: ClassTag]( +runJob[T, U: ClassTag]( // (1)! rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], @@ -386,9 +386,11 @@ runJob[T, U: ClassTag]( partitions: Seq[Int]): Array[U] ``` +1. Requests the [DAGScheduler](#dagScheduler) to [run a job](scheduler/DAGScheduler.md#runJob) + ![Executing action](images/spark-runjob.png) -`runJob` finds the [call site](#getCallSite) and [cleans up](#clean) the given `func` function. +`runJob` determines the [call site](#getCallSite) and [cleans up](#clean) the given `func` function. `runJob` prints out the following INFO message to the logs: @@ -403,19 +405,23 @@ RDD's recursive dependencies: [toDebugString] ``` -`runJob` requests the [DAGScheduler](#dagScheduler) to [run a job](scheduler/DAGScheduler.md#runJob). +`runJob` requests the [DAGScheduler](#dagScheduler) to [run a job](scheduler/DAGScheduler.md#runJob) with the following: -`runJob` requests the [ConsoleProgressBar](#progressBar) to [finishAll](ConsoleProgressBar.md#finishAll) if defined. +* The given `rdd` +* The given `func` [cleaned up](#clean) +* The given `partitions` +* The [call site](#getCallSite) +* The given `resultHandler` function (_procedure_) +* The [local properties](#localProperties) -In the end, `runJob` requests the given `RDD` to [doCheckpoint](rdd/RDD.md#doCheckpoint). +!!! note + `runJob` is blocked until the job has finished (regardless of the result, successful or not). -`runJob` throws an `IllegalStateException` when `SparkContext` is [stopped](#stopped): +`runJob` requests the [ConsoleProgressBar](#progressBar) (if available) to [finishAll](ConsoleProgressBar.md#finishAll). -```text -SparkContext has been shutdown -``` +In the end, `runJob` requests the given `RDD` to [doCheckpoint](rdd/RDD.md#doCheckpoint). -### Demo +### Demo { #runJob-demo } `runJob` is essentially executing a `func` function on all or a subset of partitions of an RDD and returning the result as an array (with elements being the results per partition). @@ -495,6 +501,18 @@ maxNumConcurrentTasks( * `DAGScheduler` is requested to [checkBarrierStageWithNumSlots](scheduler/DAGScheduler.md#checkBarrierStageWithNumSlots) +## withScope { #withScope } + +```scala +withScope[U]( + body: => U): U +``` + +`withScope` [withScope](rdd/RDDOperationScope.md#withScope) with this `SparkContext`. + +!!! note + `withScope` is used for most (if not all) `SparkContext` API operators. + ## Logging Enable `ALL` logging level for `org.apache.spark.SparkContext` logger to see what happens inside. diff --git a/docs/rdd/RDD.md b/docs/rdd/RDD.md index a13b213127..0bcf909b73 100644 --- a/docs/rdd/RDD.md +++ b/docs/rdd/RDD.md @@ -1,10 +1,15 @@ +--- +title: RDD +subtitle: Resilient Distributed Dataset +--- + # RDD — Description of Distributed Computation `RDD[T]` is an [abstraction](#contract) of [fault-tolerant resilient distributed datasets](#implementations) that are mere descriptions of computations over a distributed collection of records (of type `T`). ## Contract -###  Computing Partition +### Computing Partition { #compute } ```scala compute( @@ -18,7 +23,7 @@ Used when: * `RDD` is requested to [computeOrReadCheckpoint](#computeOrReadCheckpoint) -###  getPartitions +### getPartitions { #getPartitions } ```scala getPartitions: Array[Partition] @@ -75,7 +80,7 @@ isBarrier(): Boolean * `ShuffleDependency` is requested to [canShuffleMergeBeEnabled](ShuffleDependency.md#canShuffleMergeBeEnabled) * `DAGScheduler` is requested to [checkBarrierStageWithRDDChainPattern](../scheduler/DAGScheduler.md#checkBarrierStageWithRDDChainPattern), [checkBarrierStageWithDynamicAllocation](../scheduler/DAGScheduler.md#checkBarrierStageWithDynamicAllocation), [checkBarrierStageWithNumSlots](../scheduler/DAGScheduler.md#checkBarrierStageWithNumSlots), [handleTaskCompletion](../scheduler/DAGScheduler.md#handleTaskCompletion) (`FetchFailed` case to mark a map stage as broken) -### isBarrier_ { #isBarrier_ } +### isBarrier\_ { #isBarrier_ } ```scala isBarrier_ : Boolean // (1)! @@ -241,17 +246,26 @@ checkpointRDD: Option[CheckpointRDD[T]] `checkpointRDD` returns the [CheckpointRDD](RDDCheckpointData.md#checkpointRDD) of the [RDDCheckpointData](#checkpointData) (if defined and so this `RDD` checkpointed). +--- + `checkpointRDD` is used when: * `RDD` is requested for the [dependencies](#dependencies), [partitions](#partitions) and [preferred locations](#preferredLocations) (all using _final_ methods!) -## doCheckpoint +## doCheckpoint { #doCheckpoint } ```scala doCheckpoint(): Unit ``` -`doCheckpoint` executes in `checkpoint` scope. +!!! note "RDD.doCheckpoint, SparkContext.runJob and Dataset.checkpoint" + `doCheckpoint` is called every time a Spark job is submitted (using [SparkContext.runJob](../SparkContext.md#runJob)). + + I found it quite interesting at the very least. + + `doCheckpoint` is triggered when `Dataset.checkpoint` operator ([Spark SQL]({{ book.spark_sql }}/Dataset/#checkpoint)) is executed (with `eager` flag on) which will likely trigger one or more Spark jobs on the underlying RDD anyway. + +`doCheckpoint` executes in [checkpoint](RDDOperationScope.md#withScope) scope. `doCheckpoint` turns the [doCheckpointCalled](#doCheckpointCalled) flag on (to prevent multiple executions). @@ -382,6 +396,18 @@ rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( * [RDD.sortBy](spark-rdd-transformations.md#sortBy) * [PairRDDFunctions.combineByKey](PairRDDFunctions.md#combineByKey) +## withScope { #withScope } + +```scala +withScope[U]( + body: => U): U +``` + +`withScope` [withScope](RDDOperationScope.md#withScope) with this [SparkContext](#sc). + +!!! note + `withScope` is used for most (if not all) `RDD` API operators. +