Skip to content

Commit

Permalink
Merge pull request apache#66 from markhamstra/csd-1.4
Browse files Browse the repository at this point in the history
SKIPME merging Apache branch-1.4 bug fixes
  • Loading branch information
markhamstra committed Jul 19, 2015
2 parents 98e8af7 + cb102d5 commit e8336ec
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Comparison function that defines the sort order for application attempts within the same
* application. Order is: running attempts before complete attempts, running attempts sorted
* by start time, completed attempts sorted by end time.
* application. Order is: attempts are sorted by descending start time.
* Most recent attempt state matches with current state of the app.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
Expand All @@ -363,11 +363,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def compareAttemptInfo(
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
if (a1.completed == a2.completed) {
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
} else {
!a1.completed
}
a1.startTime >= a2.startTime
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
appListAfterRename.size should be (1)
}

test("apps with multiple attempts") {
test("apps with multiple attempts with order") {
val provider = new FsHistoryProvider(createTestConf())

val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
)

updateAndCheck(provider) { list =>
Expand All @@ -255,7 +254,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
)

updateAndCheck(provider) { list =>
Expand All @@ -264,30 +263,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
list.head.attempts.head.attemptId should be (Some("attempt2"))
}

val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
writeFile(attempt3, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
SparkListenerApplicationEnd(4L)
)

updateAndCheck(provider) { list =>
list should not be (null)
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be (Some("attempt2"))
list.head.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt3"))
}

val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(attempt2, true, None,
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)

updateAndCheck(provider) { list =>
list.size should be (2)
list.head.attempts.size should be (1)
list.last.attempts.size should be (2)
list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))

list.foreach { case app =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,9 @@ object GraphImpl {
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
.withTargetStorageLevel(edgeStorageLevel).cache()
.withTargetStorageLevel(edgeStorageLevel)
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel).cache()
.withTargetStorageLevel(vertexStorageLevel)
GraphImpl(vertexRDD, edgeRDD)
}

Expand All @@ -346,9 +346,14 @@ object GraphImpl {
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {

vertices.cache()

// Convert the vertex partitions in edges to the correct type
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
.cache()

GraphImpl.fromExistingRDDs(vertices, newEdges)
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
<chill.version>0.5.0</chill.version>
<ivy.version>2.4.0</ivy.version>
<oro.version>2.0.8</oro.version>
<codahale.metrics.version>3.1.0</codahale.metrics.version>
<codahale.metrics.version>3.1.2</codahale.metrics.version>
<avro.version>1.7.7</avro.version>
<avro.mapred.classifier>hadoop2</avro.mapred.classifier>
<jets3t.version>0.7.1</jets3t.version>
Expand Down
10 changes: 8 additions & 2 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ def func(iterator):
for obj in iterator:
acc = op(obj, acc)
yield acc
# collecting result of mapPartitions here ensures that the copy of
# zeroValue provided to each partition is unique from the one provided
# to the final reduce call
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)

Expand Down Expand Up @@ -878,8 +881,11 @@ def func(iterator):
for obj in iterator:
acc = seqOp(acc, obj)
yield acc

return self.mapPartitions(func).fold(zeroValue, combOp)
# collecting result of mapPartitions here ensures that the copy of
# zeroValue provided to each partition is unique from the one provided
# to the final reduce call
vals = self.mapPartitions(func).collect()
return reduce(combOp, vals, zeroValue)

def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
"""
Expand Down
141 changes: 129 additions & 12 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,127 @@ def test_deleting_input_files(self):

def test_sampling_default_seed(self):
# Test for SPARK-3995 (default seed setting)
data = self.sc.parallelize(range(1000), 1)
data = self.sc.parallelize(xrange(1000), 1)
subset = data.takeSample(False, 10)
self.assertEqual(len(subset), 10)

def test_aggregate_mutable_zero_value(self):
# Test for SPARK-9021; uses aggregate and treeAggregate to build dict
# representing a counter of ints
# NOTE: dict is used instead of collections.Counter for Python 2.6
# compatibility
from collections import defaultdict

# Show that single or multiple partitions work
data1 = self.sc.range(10, numSlices=1)
data2 = self.sc.range(10, numSlices=2)

def seqOp(x, y):
x[y] += 1
return x

def comboOp(x, y):
for key, val in y.items():
x[key] += val
return x

counts1 = data1.aggregate(defaultdict(int), seqOp, comboOp)
counts2 = data2.aggregate(defaultdict(int), seqOp, comboOp)
counts3 = data1.treeAggregate(defaultdict(int), seqOp, comboOp, 2)
counts4 = data2.treeAggregate(defaultdict(int), seqOp, comboOp, 2)

ground_truth = defaultdict(int, dict((i, 1) for i in range(10)))
self.assertEqual(counts1, ground_truth)
self.assertEqual(counts2, ground_truth)
self.assertEqual(counts3, ground_truth)
self.assertEqual(counts4, ground_truth)

def test_aggregate_by_key_mutable_zero_value(self):
# Test for SPARK-9021; uses aggregateByKey to make a pair RDD that
# contains lists of all values for each key in the original RDD

# list(range(...)) for Python 3.x compatibility (can't use * operator
# on a range object)
# list(zip(...)) for Python 3.x compatibility (want to parallelize a
# collection, not a zip object)
tuples = list(zip(list(range(10))*2, [1]*20))
# Show that single or multiple partitions work
data1 = self.sc.parallelize(tuples, 1)
data2 = self.sc.parallelize(tuples, 2)

def seqOp(x, y):
x.append(y)
return x

def comboOp(x, y):
x.extend(y)
return x

values1 = data1.aggregateByKey([], seqOp, comboOp).collect()
values2 = data2.aggregateByKey([], seqOp, comboOp).collect()
# Sort lists to ensure clean comparison with ground_truth
values1.sort()
values2.sort()

ground_truth = [(i, [1]*2) for i in range(10)]
self.assertEqual(values1, ground_truth)
self.assertEqual(values2, ground_truth)

def test_fold_mutable_zero_value(self):
# Test for SPARK-9021; uses fold to merge an RDD of dict counters into
# a single dict
# NOTE: dict is used instead of collections.Counter for Python 2.6
# compatibility
from collections import defaultdict

counts1 = defaultdict(int, dict((i, 1) for i in range(10)))
counts2 = defaultdict(int, dict((i, 1) for i in range(3, 8)))
counts3 = defaultdict(int, dict((i, 1) for i in range(4, 7)))
counts4 = defaultdict(int, dict((i, 1) for i in range(5, 6)))
all_counts = [counts1, counts2, counts3, counts4]
# Show that single or multiple partitions work
data1 = self.sc.parallelize(all_counts, 1)
data2 = self.sc.parallelize(all_counts, 2)

def comboOp(x, y):
for key, val in y.items():
x[key] += val
return x

fold1 = data1.fold(defaultdict(int), comboOp)
fold2 = data2.fold(defaultdict(int), comboOp)

ground_truth = defaultdict(int)
for counts in all_counts:
for key, val in counts.items():
ground_truth[key] += val
self.assertEqual(fold1, ground_truth)
self.assertEqual(fold2, ground_truth)

def test_fold_by_key_mutable_zero_value(self):
# Test for SPARK-9021; uses foldByKey to make a pair RDD that contains
# lists of all values for each key in the original RDD

tuples = [(i, range(i)) for i in range(10)]*2
# Show that single or multiple partitions work
data1 = self.sc.parallelize(tuples, 1)
data2 = self.sc.parallelize(tuples, 2)

def comboOp(x, y):
x.extend(y)
return x

values1 = data1.foldByKey([], comboOp).collect()
values2 = data2.foldByKey([], comboOp).collect()
# Sort lists to ensure clean comparison with ground_truth
values1.sort()
values2.sort()

# list(range(...)) for Python 3.x compatibility
ground_truth = [(i, list(range(i))*2) for i in range(10)]
self.assertEqual(values1, ground_truth)
self.assertEqual(values2, ground_truth)

def test_aggregate_by_key(self):
data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2)

Expand Down Expand Up @@ -624,8 +741,8 @@ def test_zip_with_different_serializers(self):

def test_zip_with_different_object_sizes(self):
# regress test for SPARK-5973
a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i)
b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i)
a = self.sc.parallelize(xrange(10000)).map(lambda i: '*' * i)
b = self.sc.parallelize(xrange(10000, 20000)).map(lambda i: '*' * i)
self.assertEqual(10000, a.zip(b).count())

def test_zip_with_different_number_of_items(self):
Expand All @@ -647,7 +764,7 @@ def test_zip_with_different_number_of_items(self):
self.assertRaises(Exception, lambda: a.zip(b).count())

def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
rdd = self.sc.parallelize(xrange(1000))
self.assertTrue(950 < rdd.countApproxDistinct(0.03) < 1050)
self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.03) < 1050)
self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.03) < 1050)
Expand Down Expand Up @@ -777,7 +894,7 @@ def test_distinct(self):
def test_external_group_by_key(self):
self.sc._conf.set("spark.python.worker.memory", "1m")
N = 200001
kv = self.sc.parallelize(range(N)).map(lambda x: (x % 3, x))
kv = self.sc.parallelize(xrange(N)).map(lambda x: (x % 3, x))
gkv = kv.groupByKey().cache()
self.assertEqual(3, gkv.count())
filtered = gkv.filter(lambda kv: kv[0] == 1)
Expand Down Expand Up @@ -871,7 +988,7 @@ def test_narrow_dependency_in_join(self):

# Regression test for SPARK-6294
def test_take_on_jrdd(self):
rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
rdd = self.sc.parallelize(xrange(1 << 20)).map(lambda x: str(x))
rdd._jrdd.first()

def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
Expand Down Expand Up @@ -1503,13 +1620,13 @@ def run():
self.fail("daemon had been killed")

# run a normal job
rdd = self.sc.parallelize(range(100), 1)
rdd = self.sc.parallelize(xrange(100), 1)
self.assertEqual(100, rdd.map(str).count())

def test_after_exception(self):
def raise_exception(_):
raise Exception()
rdd = self.sc.parallelize(range(100), 1)
rdd = self.sc.parallelize(xrange(100), 1)
with QuietTest(self.sc):
self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
self.assertEqual(100, rdd.map(str).count())
Expand All @@ -1525,22 +1642,22 @@ def test_after_jvm_exception(self):
with QuietTest(self.sc):
self.assertRaises(Exception, lambda: filtered_data.count())

rdd = self.sc.parallelize(range(100), 1)
rdd = self.sc.parallelize(xrange(100), 1)
self.assertEqual(100, rdd.map(str).count())

def test_accumulator_when_reuse_worker(self):
from pyspark.accumulators import INT_ACCUMULATOR_PARAM
acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x))
self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc1.add(x))
self.assertEqual(sum(range(100)), acc1.value)

acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x))
self.sc.parallelize(xrange(100), 20).foreach(lambda x: acc2.add(x))
self.assertEqual(sum(range(100)), acc2.value)
self.assertEqual(sum(range(100)), acc1.value)

def test_reuse_worker_after_take(self):
rdd = self.sc.parallelize(range(100000), 1)
rdd = self.sc.parallelize(xrange(100000), 1)
self.assertEqual(0, rdd.first())

def count():
Expand Down

0 comments on commit e8336ec

Please sign in to comment.