Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Non-deterministic output from MapOutputTracker.getStatistics() with AQE on GPU #598

Closed
andygrove opened this issue Aug 20, 2020 · 7 comments
Assignees
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Contributor

andygrove commented Aug 20, 2020

Describe the bug

The unit tests being added in #536 are failing intermittently because the data returned by MapOutputTracker.getStatistics is not consistent between runs for 2 of the 100 partitions. The test relies on statistics being correct because the skewed join optimization is triggered based on these statistics.

For example, I modified Spark 3.1 to add some debug logging in MapOutputTracker.getStatistics for one of these partitions:

        val buffer = new ListBuffer[Long]()
        for (s <- statuses) {
          for (i <- 0 until totalSizes.length) {
            val blockSize = s.getSizeForBlock(i)
            totalSizes(i).addAndGet(blockSize)
            if (i == 53) {
              buffer += blockSize
            }
          }
        }
        println(s"partition 53: ${buffer.mkString(",")}")

Here are some example output values from multiple runs of the same test:

789,789,334,88,106,88,88,0,0,0
717,593,334,88,106,88,88,0,0,0
717,789,334,88,106,88,88,0,0,0

My first assumption was that values were being dropped when combining the sizes of partitions from different map outputs. There does appear to be a bug that does exactly this in MapOutputTracker.getStatistics when it uses multiple threads but not when it runs on a single thread, which is the case when running these tests.

The above logging suggest that getSizeForBlock is somehow reporting incorrect results.

Steps/Code to reproduce bug

Run the new test added in #536 until it fails. I have not been able to reproduce the failure when running from IntelliJ, only from mvn verify and only against Spark 3.1 so far but since this is most likely a race condition it is possible that it occurs with Spark versions and I just haven't seen that yet.

Expected behavior

MapOutputTracker.getStatistics should return the same values on each run.

Environment details (please complete the following information)
I have seen the tests fail both locally and in CI.

Additional context
None.

@andygrove andygrove added bug Something isn't working ? - Needs Triage Need team to review and classify labels Aug 20, 2020
@andygrove andygrove added this to the Aug 17 - Aug 28 milestone Aug 20, 2020
@andygrove andygrove self-assigned this Aug 20, 2020
@andygrove
Copy link
Contributor Author

After debugging this some more, I can see that the data sizes reported by BypassMergeShuffleWriter vary between runs. The data is serialized by GpuColumnarBatchSerializer and it appears that this method is not deterministic for some reason in the amount of data it writes to disk for a given column.

@jlowe
Copy link
Member

jlowe commented Aug 21, 2020

IIRC the serializer is using the shuffle compression codec, and therefore the output size will be sensitive to the ordering of the data. Many GPU operations do not guarantee output ordering, like hash aggregates, hash joins, etc. Could that be happening here?

@andygrove
Copy link
Contributor Author

I keep hoping to see compression because that would explain this. We are calling JCudfSerialization.writeToStream(columns, dOut, startRow, numRows) to write the data out, and I don't see compression involved there, but maybe I am missing something?

Also the serialization size reported by JCudfSerialization.getSerializedSizeInBytes(columns, startRow, numRows) stays consistent even as the actual written data size varies.

@jlowe
Copy link
Member

jlowe commented Aug 21, 2020

I believe the stream being written is either a codec stream somewhere along the stream stack or the data captured on that stream will be subsequently compressed. Spark compresses the shuffle data with the configured shuffle compression codec, so it's definitely getting compressed somewhere before it hits disk.

@andygrove
Copy link
Contributor Author

Thanks. I do see now that there is an LZ4BlockOutputStream in the chain of streams that we are writing to.

@andygrove
Copy link
Contributor Author

I think my conclusion based on this is that Spark is deterministic for the AQE skew join test on CPU because the LZ4 compression is being applied per row and compression is consistent in that case (or not even compressing, since these rows are just one or two long values) whereas the vales written when we are using GPU are columns and the compression is more effective here (and variable, depending on the order of data within the column).

I can probably close this issue then. I will have to re-think how we write unit tests for the skew join optimization since the Spark tests are not going to work as written for our use case.

@jlowe
Copy link
Member

jlowe commented Aug 21, 2020

If we want to key off of size of shuffle output, then the shuffle needs to be deterministic. I'm not sure the CPU is even guaranteed to be deterministic for this unless total order partitioning used. In many cases partitions can arrive from shuffle in different orders and therefore theoretically output could be in a different order, even on the CPU, unless something is forcing shuffle to fetch partitions in a very specific order (like total order partitioning).

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Sep 29, 2020
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
…IDIA#598)

Signed-off-by: spark-rapids automation <[email protected]>

Signed-off-by: spark-rapids automation <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants