Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could lead to incorrect answers #22079

Closed
wants to merge 6 commits into from

Conversation

bersprockets
Copy link
Contributor

@bersprockets bersprockets commented Aug 11, 2018

What changes were proposed in this pull request?

Back port of #20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:

import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named spark.sql.execution.sortBeforeRepartition to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and spark.sql.execution.sortBeforeRepartition set to true), the following query returns 1000000:

import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000

Author: Xingbo Jiang [email protected]

How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

@gatorsmile
Copy link
Member

cc @jiangxb1987

@gatorsmile
Copy link
Member

The original fix efccc02 has been merged to Spark 2.3. After 5+ months, we have not received any correctness regression that is caused by this fix, although this fix definitely will introduce a performance regression. I think we should merge it. The general risk is not very high and it resolves a serious correctness bug.

@SparkQA
Copy link

SparkQA commented Aug 11, 2018

Test build #94619 has finished for PR 22079 at commit efccc02.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class RecordBinaryComparator extends RecordComparator

@bersprockets
Copy link
Contributor Author

The test "model load / save" in ChiSqSelectorSuite fails because of this line in
ChiSqSelector.scala

spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))

In 2.4, the line is:

spark.createDataFrame(sc.makeRDD(dataArray, 1)).write.parquet(Loader.dataPath(path))

If you change 2.4 to also have that line, and also remove the follow-up PR (#20426) to avoid sorting when there is one partition, this test also fails on 2.4 in the same way.

So I am not sure which way to go: Update ChiSqSelector.scala to be like 2.4 (simply a one line change), or make the test accept this new order.

…ncorrect answers

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <[email protected]>

Closes apache#20393 from jiangxb1987/shuffle-repartition.
@SparkQA
Copy link

SparkQA commented Aug 12, 2018

Test build #94633 has finished for PR 22079 at commit 495cba5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class RecordBinaryComparator extends RecordComparator

@jiangxb1987
Copy link
Contributor

We shall also include #20088 in this backport PR.

@bersprockets
Copy link
Contributor Author

@jiangxb1987

We shall also include #20088 in this backport PR.

I did that shortly after commenting, which allowed the tests to pass. I squashed it into the first commit, so it wasn't obvious I did it.

Should I also include #20426 in this PR, or treat that separately?

@jiangxb1987
Copy link
Contributor

Both seems fine to me, it's just a minor improvement. Normally we don't backport a improvement, but since it's a simple and small change I'm confident it is safe to also include the change in a backport PR.

@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94656 has finished for PR 22079 at commit 8d2d558.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bersprockets
Copy link
Contributor Author

Hmmm... I somehow managed to break SparkR tests but fixing a comment. It seems to have auto-retried and broke the second time too.

@bersprockets
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94665 has finished for PR 22079 at commit 8d2d558.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…repartition(1)

In `ShuffleExchangeExec`, we don't need to insert extra local sort before round-robin partitioning, if the new partitioning has only 1 partition, because under that case all output rows go to the same partition.

The existing test cases.

Author: Xingbo Jiang <[email protected]>

Closes apache#20426 from jiangxb1987/repartition1.
while (i <= leftLen - 8) {
res = (int) ((Platform.getLong(leftObj, leftOff + i) -
Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE);
if (res != 0) return res;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue.
Any particular reason why this code is written like this with the inaccuracy and not simply the idiomatic comparator comparison ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question, @jiangxb1987 any thoughts on this? (it was this way in the original)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can recall It's actually a mistake. At first the function returned a Long value and later I changed the return value to Integer, let me fix it. Thanks for discovering this @mridulm !

@@ -144,7 +144,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {
val dataArray = Array.tabulate(model.selectedFeatures.length) { i =>
Data(model.selectedFeatures(i))
}
spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))
spark.createDataFrame(sc.makeRDD(dataArray, 1)).write.parquet(Loader.dataPath(path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 and @bersprockets . SPARK-22905 consists of two commits.

If we want to include SPARK-22905 here, it had better be explicit and complete by putting [SPARK-22905] into the PR title and includes both patches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Thanks, I will include the other commit and also update the title.

## What changes were proposed in this pull request?
make sure model data is stored in order.  WeichenXu123

## How was this patch tested?
existing tests

Author: Zheng RuiFeng <[email protected]>

Closes apache#20113 from zhengruifeng/gmm_save.
@bersprockets bersprockets changed the title [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could lead to incorrect answers [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could lead to incorrect answers Aug 13, 2018
@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94701 has finished for PR 22079 at commit 81f57fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94704 has finished for PR 22079 at commit bab8e68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Aug 14, 2018

overall I'm in favor of backporting this, and it looks like the only changes to the original were very small, so I'm in favor of this.

@bersprockets
Copy link
Contributor Author

@jiangxb1987

Here are some of the differences from the original PR

@bersprockets
Copy link
Contributor Author

@jiangxb1987 gentle ping.

asfgit pushed a commit that referenced this pull request Aug 21, 2018
…en two words is divisible by Integer.MAX_VALUE.

#22079 (comment) It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue.
This PR fixes the issue.

Add new test cases in `RecordBinaryComparatorSuite`.

Closes #22101 from jiangxb1987/fix-rbc.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
(cherry picked from commit 4fb96e5)
Signed-off-by: Xiao Li <[email protected]>
@gatorsmile
Copy link
Member

The PR #22101 has been merged. Please merge the latest one to this PR. Thanks!

@bersprockets
Copy link
Contributor Author

@gatorsmile So I should include all the related PRs merged to master as a single PR here? Just verifying.

## What changes were proposed in this pull request?

Add a new test suite to test RecordBinaryComparator.

## How was this patch tested?

New test suite.

Author: Xingbo Jiang <[email protected]>

Closes apache#21570 from jiangxb1987/rbc-test.
…en two words is divisible by Integer.MAX_VALUE.

## What changes were proposed in this pull request?

apache#22079 (comment) It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue.
This PR fixes the issue.

## How was this patch tested?
Add new test cases in `RecordBinaryComparatorSuite`.

Closes apache#22101 from jiangxb1987/fix-rbc.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
@bersprockets bersprockets changed the title [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could lead to incorrect answers [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.2] Shuffle+Repartition on a DataFrame could lead to incorrect answers Aug 21, 2018
@gatorsmile
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95053 has finished for PR 22079 at commit 2edad85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95054 has finished for PR 22079 at commit 2edad85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @jiangxb1987

@jiangxb1987
Copy link
Contributor

LGTM, thanks!

@gatorsmile
Copy link
Member

Thanks! Merged to 2.2

@bersprockets
Copy link
Contributor Author

@gatorsmile Weird, I don't see it on branch-2.2. Is that a sync issue?

asfgit pushed a commit that referenced this pull request Aug 23, 2018
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of #20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes #22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Zheng RuiFeng <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
@gatorsmile
Copy link
Member

d7c3aae Done

henryr pushed a commit to henryr/spark that referenced this pull request Aug 23, 2018
…1] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393 and apache#22079.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Zheng RuiFeng <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
@gatorsmile
Copy link
Member

@bersprockets Could you please close this PR?

csd-jenkins pushed a commit to alteryx/spark that referenced this pull request Aug 27, 2018
* [PYSPARK] Updates to Accumulators

(cherry picked from commit 15fc237)

* [SPARK-24987][SS] - Fix Kafka consumer leak when no new offsets for TopicPartition

## What changes were proposed in this pull request?

This small fix adds a `consumer.release()` call to `KafkaSourceRDD` in the case where we've retrieved offsets from Kafka, but the `fromOffset` is equal to the `lastOffset`, meaning there is no new data to read for a particular topic partition. Up until now, we'd just return an empty iterator without closing the consumer which would cause a FD leak.

If accepted, this pull request should be merged into master as well.

## How was this patch tested?

Haven't ran any specific tests, would love help on how to test methods running inside `RDD.compute`.

Author: Yuval Itzchakov <[email protected]>

Closes apache#21997 from YuvalItzchakov/master.

(cherry picked from commit b7fdf8e)
Signed-off-by: cody koeninger <[email protected]>

* [SPARK-25015][BUILD] Update Hadoop 2.7 to 2.7.7

## What changes were proposed in this pull request?

Update Hadoop 2.7 to 2.7.7 to pull in bug and security fixes.

## How was this patch tested?

Existing tests.

Author: Sean Owen <[email protected]>

Closes apache#21987 from srowen/SPARK-25015.

(cherry picked from commit 5f9633d)
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-24948][SHS][BACKPORT-2.3] Delegate check access permissions to the file system

## What changes were proposed in this pull request?

In `SparkHadoopUtil. checkAccessPermission`,  we consider only basic permissions in order to check whether a user can access a file or not. This is not a complete check, as it ignores ACLs and other policies a file system may apply in its internal. So this can result in returning wrongly that a user cannot access a file (despite he actually can).

The PR proposes to delegate to the filesystem the check whether a file is accessible or not, in order to return the right result. A caching layer is added for performance reasons.

## How was this patch tested?

added UT

Author: Marco Gaido <[email protected]>

Closes apache#22021 from mgaido91/SPARK-24948_2.3.

* [MINOR][BUILD] Update Jetty to 9.3.24.v20180605

Update Jetty to 9.3.24.v20180605 to pick up security fix

Existing tests.

Closes apache#22055 from srowen/Jetty9324.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit eb9a696)
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-25076][SQL] SQLConf should not be retrieved from a stopped SparkSession

## What changes were proposed in this pull request?

When a `SparkSession` is stopped, `SQLConf.get` should use the fallback conf to avoid weird issues like
```
sbt.ForkMain$ForkError: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at scala.Option.getOrElse(Option.scala:121)
...
```

## How was this patch tested?

a new test suite

Closes apache#22056 from cloud-fan/session.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
(cherry picked from commit fec67ed)
Signed-off-by: Xiao Li <[email protected]>

* [SPARK-24950][SQL] DateTimeUtilsSuite daysToMillis and millisToDays fails w/java 8 181-b13

## What changes were proposed in this pull request?

- Update DateTimeUtilsSuite so that when testing roundtripping in daysToMillis and millisToDays multiple skipdates can be specified.
- Updated test so that both new years eve 2014 and new years day 2015 are skipped for kiribati time zones.  This is necessary as java versions pre 181-b13 considered new years day 2015 to be skipped while susequent versions corrected this to new years eve.

## How was this patch tested?
Unit tests

Author: Chris Martin <[email protected]>

Closes apache#21901 from d80tb7/SPARK-24950_datetimeUtilsSuite_failures.

(cherry picked from commit c5b8d54)
Signed-off-by: Sean Owen <[email protected]>

* Preparing Spark release v2.3.2-rc4

* Preparing development version 2.3.3-SNAPSHOT

* [MINOR][BUILD] Add ECCN notice required by http://www.apache.org/dev/crypto.html

Add ECCN notice required by http://www.apache.org/dev/crypto.html
See https://issues.apache.org/jira/browse/LEGAL-398

This should probably be backported to 2.3, 2.2, as that's when the key dep (commons crypto) turned up. BC is actually unused, but still there.

N/A

Closes apache#22064 from srowen/ECCN.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 91cdab5)
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-25081][CORE] Nested spill in ShuffleExternalSorter should not access released memory page

## What changes were proposed in this pull request?

This issue is pretty similar to [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907).

"allocateArray" in [ShuffleInMemorySorter.reset](https://github.com/apache/spark/blob/9b8521e53e56a53b44c02366a99f8a8ee1307bbf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java#L99) may trigger a spill and cause ShuffleInMemorySorter access the released `array`. Another task may get the same memory page from the pool. This will cause two tasks access the same memory page. When a task reads memory written by another task, many types of failures may happen. Here are some examples I  have seen:

- JVM crash. (This is easy to reproduce in a unit test as we fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes which usually points to an invalid memory address)
- java.lang.IllegalArgumentException: Comparison method violates its general contract!
- java.lang.NullPointerException at org.apache.spark.memory.TaskMemoryManager.getPage(TaskMemoryManager.java:384)
- java.lang.UnsupportedOperationException: Cannot grow BufferHolder by size -536870912 because the size after growing exceeds size limitation 2147483632

This PR resets states in `ShuffleInMemorySorter.reset` before calling `allocateArray` to fix the issue.

## How was this patch tested?

The new unit test will make JVM crash without the fix.

Closes apache#22062 from zsxwing/SPARK-25081.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
(cherry picked from commit f5aba65)
Signed-off-by: Shixiong Zhu <[email protected]>

* [SPARK-24908][R][STYLE] removing spaces to make lintr happy

## What changes were proposed in this pull request?

during my travails in porting spark builds to run on our centos worker, i managed to recreate (as best i could) the centos environment on our new ubuntu-testing machine.

while running my initial builds, lintr was crashing on some extraneous spaces in test_basic.R (see:  https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6-ubuntu-test/862/console)

after removing those spaces, the ubuntu build happily passed the lintr tests.

## How was this patch tested?

i then tested this against a modified spark-master-test-sbt-hadoop-2.6 build (see https://amplab.cs.berkeley.edu/jenkins/view/RISELab%20Infra/job/testing-spark-master-test-with-updated-R-crap/4/), which scp'ed a copy of test_basic.R in to the repo after the git clone.  everything seems to be working happily.

Author: shane knapp <[email protected]>

Closes apache#21864 from shaneknapp/fixing-R-lint-spacing.

(cherry picked from commit 3efdf35)
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-25084][SQL][BACKPORT-2.3] distribute by" on multiple columns (wrap in brackets) may lead to codegen issue

## What changes were proposed in this pull request?

Backport apache#22066 to branch-2.3

Use different API in 2.3 here
```scala
|${ctx.JAVA_INT} $childResult = 0;
```

"distribute by" on multiple columns (wrap in brackets) may lead to codegen issue.

Simple way to reproduce:

```scala
  val df = spark.range(1000)
  val columns = (0 until 400).map{ i => s"id as id$i" }
  val distributeExprs = (0 until 100).map(c => s"id$c").mkString(",")
  df.selectExpr(columns : _*).createTempView("test")
  spark.sql(s"select * from test distribute by ($distributeExprs)").count()
```

## How was this patch tested?

UT in Jenkins

Closes apache#22077 from LantaoJin/SPARK-25084_2.3.

Authored-by: LantaoJin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-25028][SQL] Avoid NPE when analyzing partition with NULL values

## What changes were proposed in this pull request?

`ANALYZE TABLE ... PARTITION(...) COMPUTE STATISTICS` can fail with a NPE if a partition column contains a NULL value.

The PR avoids the NPE, replacing the `NULL` values with the default partition placeholder.

## How was this patch tested?

added UT

Closes apache#22036 from mgaido91/SPARK-25028.

Authored-by: Marco Gaido <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c220cc4)
Signed-off-by: Wenchen Fan <[email protected]>

* Preparing Spark release v2.3.2-rc5

* Preparing development version 2.3.3-SNAPSHOT

* [MINOR][SQL][DOC] Fix `to_json` example in function description and doc

## What changes were proposed in this pull request?

This PR fixes the an example for `to_json` in doc and function description.

- http://spark.apache.org/docs/2.3.0/api/sql/#to_json
- `describe function extended`

## How was this patch tested?

Pass the Jenkins with the updated test.

Closes apache#22096 from dongjoon-hyun/minor_json.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
(cherry picked from commit e2ab7de)
Signed-off-by: hyukjinkwon <[email protected]>

* [SPARK-25051][SQL] FixNullability should not stop on AnalysisBarrier

## What changes were proposed in this pull request?

The introduction of `AnalysisBarrier` prevented `FixNullability` to go through all the nodes. This introduced a bug, which can lead to wrong results, as the nullability of the output attributes of an outer join can be wrong.

The PR makes `FixNullability` going through the `AnalysisBarrier`s.

## How was this patch tested?

added UT

Author: Marco Gaido <[email protected]>

Closes apache#22102 from mgaido91/SPARK-25051.

* [MINOR][DOC][SQL] use one line for annotation arg value

## What changes were proposed in this pull request?

Put annotation args in one line, or API doc generation will fail.

~~~
[error] /Users/meng/src/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:1559: annotation argument needs to be a constant; found: "_FUNC_(expr) - Returns the character length of string data or number of bytes of ".+("binary data. The length of string data includes the trailing spaces. The length of binary ").+("data includes binary zeros.")
[error]     "binary data. The length of string data includes the trailing spaces. The length of binary " +
[error]                                                                                                  ^
[info] No documentation generated with unsuccessful compiler run
[error] one error found
[error] (catalyst/compile:doc) Scaladoc generation failed
[error] Total time: 27 s, completed Aug 17, 2018 3:20:08 PM
~~~

## How was this patch tested?

sbt catalyst/compile:doc passed

Closes apache#22137 from mengxr/minor-doc-fix.

Authored-by: Xiangrui Meng <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
(cherry picked from commit f454d52)
Signed-off-by: hyukjinkwon <[email protected]>

* [SPARK-25144][SQL][TEST][BRANCH-2.3] Free aggregate map when task ends

## What changes were proposed in this pull request?

[SPARK-25144](https://issues.apache.org/jira/browse/SPARK-25144) reports memory leaks on Apache Spark 2.0.2 ~ 2.3.2-RC5.

```scala
scala> case class Foo(bar: Option[String])
scala> val ds = List(Foo(Some("bar"))).toDS
scala> val result = ds.flatMap(_.bar).distinct
scala> result.rdd.isEmpty
18/08/19 23:01:54 WARN Executor: Managed memory leak detected; size = 8650752 bytes, TID = 125
res0: Boolean = false
```

This is a backport of cloud-fan 's apache#21738 which is a single commit among 3 commits of SPARK-21743. In addition, I added a test case to prevent regressions in branch-2.3 and branch-2.2. Although SPARK-21743 is reverted due to regression, this subpatch can go to branch-2.3 and branch-2.2. This will be merged as cloud-fan 's commit.

## How was this patch tested?

Pass the jenkins with a newly added test case.

Closes apache#22150 from dongjoon-hyun/SPARK-25144.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>

* [DOCS] Fixed NDCG formula issues

When j is 0, log(j+1) will be 0, and this leads to division by 0 issue.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22090 from yueguoguo/patch-1.

Authored-by: Zhang Le <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 219ed7b)
Signed-off-by: Sean Owen <[email protected]>

* [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE.

apache#22079 (comment) It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue.
This PR fixes the issue.

Add new test cases in `RecordBinaryComparatorSuite`.

Closes apache#22101 from jiangxb1987/fix-rbc.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
(cherry picked from commit 4fb96e5)
Signed-off-by: Xiao Li <[email protected]>

* [SPARK-25114][2.3][CORE][FOLLOWUP] Fix RecordBinaryComparatorSuite build failure

## What changes were proposed in this pull request?

Fix RecordBinaryComparatorSuite build failure

## How was this patch tested?

Existing tests.

Closes apache#22166 from jiangxb1987/SPARK-25114-2.3.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Xiao Li <[email protected]>

* [SPARK-25205][CORE] Fix typo in spark.network.crypto.keyFactoryIterations

Closes apache#22195 from squito/SPARK-25205.

Authored-by: Imran Rashid <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
(cherry picked from commit 0ce09ec)
Signed-off-by: hyukjinkwon <[email protected]>

* [SPARK-25234][SPARKR] avoid integer overflow in parallelize

## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow.

## How was this patch tested?

unit test

Closes apache#22225 from mengxr/SPARK-25234.

Authored-by: Xiangrui Meng <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>
(cherry picked from commit 9714fa5)
Signed-off-by: Xiangrui Meng <[email protected]>

* [SPARK-25124][ML] VectorSizeHint setSize and getSize don't return values backport to 2.3

## What changes were proposed in this pull request?
In feature.py, VectorSizeHint setSize and getSize don't return value. Add return.

(Please fill in changes proposed in this fix)

## How was this patch tested?

Unit Test added

Closes apache#22228 from huaxingao/spark-25124-2.3.

Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Joseph K. Bradley <[email protected]>
asfgit pushed a commit that referenced this pull request Aug 27, 2018
…1] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

    Back port of #20393 and #22079.

    Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

    The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
    upstream stage -> repartition stage -> result stage
    (-> indicate a shuffle)
    When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

    The following code returns 931532, instead of 1000000:
    ```
    import scala.sys.process._

    import org.apache.spark.TaskContext
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()
    ```

    In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

    The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

    This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

    Add unit test in ExchangeSuite.

    With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
    ```
    import scala.sys.process._

    import org.apache.spark.TaskContext

    spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()

    res7: Long = 1000000
    ```

    Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

Author: Xingbo Jiang <[email protected]>
Author: Henry Robinson <[email protected]>

Closes #22211 from henryr/spark-23207-branch-2.1.
@bersprockets bersprockets deleted the SPARK-23207 branch December 30, 2018 17:31
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 25, 2019
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Zheng RuiFeng <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 26, 2019
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Zheng RuiFeng <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 27, 2019
…2] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Bruce Robbins <[email protected]>
Co-authored-by: Zheng RuiFeng <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants