Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6190][core] create LargeByteBuffer for eliminating 2GB block limit #5400

Closed
wants to merge 33 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Apr 7, 2015

} else {
val r = buffer.get() & 0xFF
if (buffer.remaining() == 0) {
cleanUp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this better done as part of close()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(In fact you do need close() in case the stream is closed before EOF is reached.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anyone is watching on the sidelines -- marcelo and I chatted about this a while and realized there is an issue with the existing use of ByteBufferInputStream (where this code was copied from) that prevents it from getting properly disposed in all cases. I've opened https://issues.apache.org/jira/browse/SPARK-6839

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29808 has finished for PR 5400 at commit 9f53203.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LargeByteBufferOutputStream(chunkSize: Int = 65536)
    • public class BufferTooLargeException extends IOException
    • public class LargeByteBufferHelper
    • public class WrappedLargeByteBuffer implements LargeByteBuffer
  • This patch does not change any dependencies.

private var _pos = 0

override def write(b: Int): Unit = {
output.write(b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to update _pos?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it seems like _pos is not really used.

@vanzin
Copy link
Contributor

vanzin commented Apr 7, 2015

I didn't look at the tests in detail; I found some discrepancies between the code and the LargeByteBuffer interface that should probably be fixed one way or another (either the interface needs updating, or the code needs fixing).

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29942 has started for PR 5400 at commit a759242.

@shaneknapp
Copy link
Contributor

jenkins, test this please

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29944 has finished for PR 5400 at commit a759242.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LargeByteBufferOutputStream(chunkSize: Int = 65536)
    • public class BufferTooLargeException extends IOException
    • public class LargeByteBufferHelper
    • public class WrappedLargeByteBuffer implements LargeByteBuffer
  • This patch does not change any dependencies.

@squito
Copy link
Contributor Author

squito commented Apr 9, 2015

jenkins, test this please

@SparkQA
Copy link

SparkQA commented Apr 9, 2015

Test build #29972 has finished for PR 5400 at commit a759242.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LargeByteBufferOutputStream(chunkSize: Int = 65536)
    • public class BufferTooLargeException extends IOException
    • public class LargeByteBufferHelper
    • public class WrappedLargeByteBuffer implements LargeByteBuffer
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 13, 2015

Test build #30190 has finished for PR 5400 at commit c3efa4c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LargeByteBufferOutputStream(chunkSize: Int = 65536)
    • public class BufferTooLargeException extends IOException
    • public class LargeByteBufferHelper
    • public class WrappedLargeByteBuffer implements LargeByteBuffer
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 14, 2015

Test build #30249 has finished for PR 5400 at commit e1d8fa8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LargeByteBufferOutputStream(chunkSize: Int = 65536)
    • public class BufferTooLargeException extends IOException
    • public class LargeByteBufferHelper
    • public class WrappedLargeByteBuffer implements LargeByteBuffer
  • This patch adds the following new dependencies:
    • commons-math3-3.4.1.jar
  • This patch removes the following dependencies:
    • commons-math3-3.1.1.jar

@SparkQA
Copy link

SparkQA commented Aug 19, 2015

Test build #41274 timed out for PR 5400 at commit 80c4032 after a configured wait of 175m.

@tgravescs
Copy link
Contributor

how long does the >2GB test take to run?

@snnn
Copy link

snnn commented Oct 19, 2015

How is it going? Is it still WIP? I can help to test.

}
}

// only for testing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment is redundant.

@vanzin
Copy link
Contributor

vanzin commented Nov 2, 2015

I had already reviewed this and I don't see any changes, so my only worry is still the same as Tom's: how long does the large file test takes. I guess it's not horrible if it's a single test taking 10s, but if we could avoid and still be reasonably sure that things work, it would be better.

@squito
Copy link
Contributor Author

squito commented Nov 4, 2015

Thanks for taking another look @vanzin, fixed those last issues. Sorry I never responded earlier @tgravescs -- that one test takes about 10s on my laptop, looks like it took 15s on the last jenkins run. Personally I feel better w/ the test in there. But I agree its not adding a ton of value -- happy to scrap it if you prefer.

@squito
Copy link
Contributor Author

squito commented Nov 4, 2015

jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Nov 4, 2015

Test build #45032 has finished for PR 5400 at commit 3447bb9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class LargeByteBufferInputStream extends InputStream\n * public class LargeByteBufferOutputStream extends OutputStream\n * public class JavaAssociationRulesExample\n * public class JavaPrefixSpanExample\n * public class JavaSimpleFPGrowth\n * public class BufferTooLargeException extends IOException\n * public class LargeByteBufferHelper\n * public class WrappedLargeByteBuffer implements LargeByteBuffer\n * class StreamInterceptor implements TransportFrameDecoder.Interceptor\n * public final class ChunkFetchSuccess extends ResponseWithBody\n * public abstract class ResponseWithBody implements ResponseMessage\n * public final class StreamFailure implements ResponseMessage\n * public final class StreamRequest implements RequestMessage\n * public final class StreamResponse extends ResponseWithBody\n * public class TransportFrameDecoder extends ChannelInboundHandlerAdapter\n

@rxin
Copy link
Contributor

rxin commented Dec 31, 2015

I'm going to close this pull request. If this is still relevant and you are interested in pushing it forward, please open a new pull request. Thanks!

@asfgit asfgit closed this in 93b52ab Dec 31, 2015
@scwf
Copy link
Contributor

scwf commented Jan 7, 2016

hi @squito, can you explain in which situation users will hit the 2g limit? will a job of processing very large data(such as PB level data) reach this limit?

@snnn
Copy link

snnn commented Jan 8, 2016

@scwf ,

  1. the shuffle output from one mapper to one reducer cannot be more than 2GB.
  2. partitions of an RDD cannot exceed 2GB.

@rxin
Copy link
Contributor

rxin commented Jan 8, 2016

@snnn 2 is not true. Partitions can be as large as possible. The cached size cannot be greater than 2GB.

@scwf
Copy link
Contributor

scwf commented Jan 8, 2016

The cached size cannot be greater than 2GB.

@rxin how to understand the cached size? the partition size of a cached rdd?

@vijay1106
Copy link

Hey does this address the issue of spark.sql.autoBroadcastJoinThreshold cannot be more than 2GB?

@jkhalid
Copy link

jkhalid commented Feb 12, 2019

@squito @SparkQA @vanzin @shaneknapp @tgravescs

I am using spark.sql on AWS Glue to generate a single large (it is the clients requirement to have a single file) csv compressed file which is greater than 2GB for sure. I am running to into this issue

write(transformed_feed)
File "script_2019-02-12-15-57-55.py", line 161, in write
output_path_premium, header=True, compression="gzip")
File "/mnt/yarn/usercache/root/appcache/application_1549986900582_0001/container_1549986900582_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 766, in csv
File "/mnt/yarn/usercache/root/appcache/application_1549986900582_0001/container_1549986900582_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/mnt/yarn/usercache/root/appcache/application_1549986900582_0001/container_1549986900582_0001_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/mnt/yarn/usercache/root/appcache/application_1549986900582_0001/container_1549986900582_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o210.csv.
.
.
.
.
.
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, ip-172-32-189-222.ec2.internal, executor 1): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

Below is python code used to write the file
def write(dataframe):
# write two files premium listings and non premium listings (critera : listing_priority > 30 = premium)
dataframe.filter(dataframe["listing_priority"] >= 30).drop('listing_priority').drop('image_count').write.csv(
output_path_premium, header=True, compression="gzip")
shell_command = "hdfs dfs -mv " + output_path_premium + '/part-' + ' ' + output_path_premium + output_file_premium
os.system(shell_command)
dataframe.filter(dataframe["listing_priority"] < 30).drop('listing_priority').drop('image_count').write.csv(
output_path_nonpremium, header=True, compression="gzip")
shell_command = "hdfs dfs -mv " + output_path_nonpremium + '/part-
' + ' ' + output_path_nonpremium + output_file_nonpremium
os.system(shell_command)

I am assuming its because the file is greater than 2GB . has this issue been fixed ?

@squito
Copy link
Contributor Author

squito commented Feb 12, 2019

@jkhalid What spark version were you on? many fixes were not available till spark 2.4.

can you share the entire stack trace, particularly the java portion?

the limit still exists for single records which are over 2GB. It shouldn't be a problem for reading a whole file which is over 2GB. but there may be some problem when going back and forth from python, I'm not very familiar w/ that part.

@jkhalid
Copy link

jkhalid commented Feb 12, 2019

@squito
:( My bad i guess AWS is using 2.2.1
https://aws.amazon.com/about-aws/whats-new/2018/04/aws-glue-now-supports-apache-spark-221/

and i was encountering the problem while writing the file not reading it

I am attaching the stack trace. Please let me know if you see something i missed out
stacktrace.txt

@jkhalid
Copy link

jkhalid commented Feb 13, 2019

@squito
Hey any idea from the stacktrace what i might be doing wrong

@vanzin
Copy link
Contributor

vanzin commented Feb 13, 2019

Can we move discussions unrelated to a long-closed PR to either jira or the mailing list? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.