Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into feature
Browse files Browse the repository at this point in the history
Conflicts:
	python/pyspark/mllib/feature.py
  • Loading branch information
davies committed Oct 17, 2014
2 parents a405ae7 + e678b9f commit 59781b9
Show file tree
Hide file tree
Showing 41 changed files with 326 additions and 352 deletions.
225 changes: 32 additions & 193 deletions core/src/main/java/org/apache/spark/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,252 +18,91 @@
package org.apache.spark;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import scala.Function0;
import scala.Function1;
import scala.Unit;
import scala.collection.JavaConversions;

import org.apache.spark.annotation.DeveloperApi;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.util.TaskCompletionListener;
import org.apache.spark.util.TaskCompletionListenerException;

/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*/
@DeveloperApi
public class TaskContext implements Serializable {

private int stageId;
private int partitionId;
private long attemptId;
private boolean runningLocally;
private TaskMetrics taskMetrics;

/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*
* @param stageId stage id
* @param partitionId index of the partition
* @param attemptId the number of attempts to execute this task
* @param runningLocally whether the task is running locally in the driver JVM
* @param taskMetrics performance metrics of the task
*/
@DeveloperApi
public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally,
TaskMetrics taskMetrics) {
this.attemptId = attemptId;
this.partitionId = partitionId;
this.runningLocally = runningLocally;
this.stageId = stageId;
this.taskMetrics = taskMetrics;
}

/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*
* @param stageId stage id
* @param partitionId index of the partition
* @param attemptId the number of attempts to execute this task
* @param runningLocally whether the task is running locally in the driver JVM
*/
@DeveloperApi
public TaskContext(int stageId, int partitionId, long attemptId, boolean runningLocally) {
this.attemptId = attemptId;
this.partitionId = partitionId;
this.runningLocally = runningLocally;
this.stageId = stageId;
this.taskMetrics = TaskMetrics.empty();
}

* Contextual information about a task which can be read or mutated during
* execution. To access the TaskContext for a running task use
* TaskContext.get().
*/
public abstract class TaskContext implements Serializable {
/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*
* @param stageId stage id
* @param partitionId index of the partition
* @param attemptId the number of attempts to execute this task
* Return the currently active TaskContext. This can be called inside of
* user functions to access contextual information about running tasks.
*/
@DeveloperApi
public TaskContext(int stageId, int partitionId, long attemptId) {
this.attemptId = attemptId;
this.partitionId = partitionId;
this.runningLocally = false;
this.stageId = stageId;
this.taskMetrics = TaskMetrics.empty();
public static TaskContext get() {
return taskContext.get();
}

private static ThreadLocal<TaskContext> taskContext =
new ThreadLocal<TaskContext>();

/**
* :: Internal API ::
* This is spark internal API, not intended to be called from user programs.
*/
public static void setTaskContext(TaskContext tc) {
static void setTaskContext(TaskContext tc) {
taskContext.set(tc);
}

public static TaskContext get() {
return taskContext.get();
}

/** :: Internal API :: */
public static void unset() {
static void unset() {
taskContext.remove();
}

// List of callback functions to execute when the task completes.
private transient List<TaskCompletionListener> onCompleteCallbacks =
new ArrayList<TaskCompletionListener>();

// Whether the corresponding task has been killed.
private volatile boolean interrupted = false;

// Whether the task has completed.
private volatile boolean completed = false;

/**
* Checks whether the task has completed.
* Whether the task has completed.
*/
public boolean isCompleted() {
return completed;
}
public abstract boolean isCompleted();

/**
* Checks whether the task has been killed.
* Whether the task has been killed.
*/
public boolean isInterrupted() {
return interrupted;
}
public abstract boolean isInterrupted();

/** @deprecated: use isRunningLocally() */
@Deprecated
public abstract boolean runningLocally();

public abstract boolean isRunningLocally();

/**
* Add a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
* <p/>
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
public TaskContext addTaskCompletionListener(TaskCompletionListener listener) {
onCompleteCallbacks.add(listener);
return this;
}
public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener);

/**
* Add a listener in the form of a Scala closure to be executed on task completion.
* This will be called in all situations - success, failure, or cancellation.
* <p/>
* An example use is for HadoopRDD to register a callback to close the input stream.
*/
public TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f) {
onCompleteCallbacks.add(new TaskCompletionListener() {
@Override
public void onTaskCompletion(TaskContext context) {
f.apply(context);
}
});
return this;
}
public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f);

/**
* Add a callback function to be executed on task completion. An example use
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
*
* Deprecated: use addTaskCompletionListener
*
* @deprecated: use addTaskCompletionListener
*
* @param f Callback function.
*/
@Deprecated
public void addOnCompleteCallback(final Function0<Unit> f) {
onCompleteCallbacks.add(new TaskCompletionListener() {
@Override
public void onTaskCompletion(TaskContext context) {
f.apply();
}
});
}

/**
* ::Internal API::
* Marks the task as completed and triggers the listeners.
*/
public void markTaskCompleted() throws TaskCompletionListenerException {
completed = true;
List<String> errorMsgs = new ArrayList<String>(2);
// Process complete callbacks in the reverse order of registration
List<TaskCompletionListener> revlist =
new ArrayList<TaskCompletionListener>(onCompleteCallbacks);
Collections.reverse(revlist);
for (TaskCompletionListener tcl: revlist) {
try {
tcl.onTaskCompletion(this);
} catch (Throwable e) {
errorMsgs.add(e.getMessage());
}
}

if (!errorMsgs.isEmpty()) {
throw new TaskCompletionListenerException(JavaConversions.asScalaBuffer(errorMsgs));
}
}

/**
* ::Internal API::
* Marks the task for interruption, i.e. cancellation.
*/
public void markInterrupted() {
interrupted = true;
}

@Deprecated
/** Deprecated: use getStageId() */
public int stageId() {
return stageId;
}

@Deprecated
/** Deprecated: use getPartitionId() */
public int partitionId() {
return partitionId;
}

@Deprecated
/** Deprecated: use getAttemptId() */
public long attemptId() {
return attemptId;
}

@Deprecated
/** Deprecated: use isRunningLocally() */
public boolean runningLocally() {
return runningLocally;
}

public boolean isRunningLocally() {
return runningLocally;
}
public abstract void addOnCompleteCallback(final Function0<Unit> f);

public int getStageId() {
return stageId;
}
public abstract int stageId();

public int getPartitionId() {
return partitionId;
}
public abstract int partitionId();

public long getAttemptId() {
return attemptId;
}
public abstract long attemptId();

/** ::Internal API:: */
public TaskMetrics taskMetrics() {
return taskMetrics;
}
/** ::DeveloperApi:: */
@DeveloperApi
public abstract TaskMetrics taskMetrics();
}
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ class SparkContext(config: SparkConf) extends Logging {
// For tests, do not enable the UI
None
}
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
Expand Down Expand Up @@ -342,6 +341,10 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
postApplicationStart()

// Bind the SparkUI after starting the task scheduler
// because certain pages and listeners depend on it
ui.foreach(_.bind())

private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
Expand Down Expand Up @@ -815,6 +818,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContextHelper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* This class exists to restrict the visibility of TaskContext setters.
*/
private [spark] object TaskContextHelper {

def setTaskContext(tc: TaskContext): Unit = TaskContext.setTaskContext(tc)

def unset(): Unit = TaskContext.unset()

}
Loading

0 comments on commit 59781b9

Please sign in to comment.