Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caused by: javax.net.ssl.SSLException: java.net.SocketException: Broken pipe (Write failed) while writing #410

Closed
afilipchik opened this issue Jul 3, 2020 · 6 comments

Comments

@afilipchik
Copy link

afilipchik commented Jul 3, 2020

Hello,

I'm working on native GCS support for Apache Flink. I implemented a native FS by wrapping it around GoogleHadoopFileSystem and it works fine most of the time till it hits a wall around 250 open files.

Errors I see in logs are:

java.io.IOException: Upload failed for '2020/06/30/02/part-2-73.avro_chunks/0'
        at com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(BaseAbstractGoogleAsyncWriteChannel.java:297)
        at com.google.cloud.hadoop.util.BaseAbstractGoogleAsyncWriteChannel.close(BaseAbstractGoogleAsyncWriteChannel.java:214)
        at java.nio.channels.Channels$1.close(Channels.java:178)
        at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:128)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
        at com.css.flink.fs.gcs.writer.GcsRecoverableFsDataOutputStream.closeChunk(GcsRecoverableFsDataOutputStream.java:192)
        at com.css.flink.fs.gcs.writer.GcsRecoverableFsDataOutputStream.close(GcsRecoverableFsDataOutputStream.java:302)
        at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263)
        at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.dispose(PartFileWriter.java:77)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.disposePartFile(Bucket.java:248)
        at java.util.HashMap$Values.forEach(HashMap.java:981)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.close(Buckets.java:310)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.close(StreamingFileSink.java:476)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:651)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:562)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:480)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error writing request body to server
        at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3597)
        at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3580)
        at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:53)
        at com.google.api.client.util.IOUtils.copy(IOUtils.java:87)
        at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:67)
        at com.google.api.client.http.MultipartContent.writeTo(MultipartContent.java:108)
        at com.google.api.client.http.javanet.NetHttpRequest$DefaultOutputWriter.write(NetHttpRequest.java:76)
        at com.google.api.client.http.javanet.NetHttpRequest.writeContentToOutputStream(NetHttpRequest.java:171)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:117)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:551)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:568)
        at com.google.api.client.googleapis.media.MediaHttpUploader.directUpload(MediaHttpUploader.java:360)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:334)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:551)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:475)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:592)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:107)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

and sometimes it hits OOM.

I tried to play with different modes:
like setting fs.gs.outputstream.direct.upload.enable to true/false, but can't find anything that works reliably.

The way wrapper is implemented:
It returns fs.create(new org.apache.hadoop.fs.Path(currentChunkName)); to Flink and Flink is responsible for closing it. So, can be open for several minutes (depending of the job's config).

I'm thinking maybe I should switch to writing files locally and uploading them when stream is closed. Is there way to configure GoogleHadoopFileSystem to do it that way (local and upload on close)? Or, am I doing anything wrong with letting Flink to open unlimited number of files?

Any advice will be appreciated. I will contribute it to Flink once it is stable enough.

Thank you

@medb
Copy link
Contributor

medb commented Jul 6, 2020

May you provide more details on why do you want to implement a native FS for Flink using GCS connector instead of using Flink HDFS support with GSC connector?

Also, my you provide the next information:

  1. How many files do you write and what their average size?
  2. Do you write all of them in the same single directory or a directory tree?
  3. How many files do you write at the same time?
  4. Is java.io.IOException: Error writing request body to server exception is the only cause of the failures in your case or there are others too?

Regarding OOMs, GCS connectors allocates 64 MiB + 1 MiB + 8 MiB = 73 MiB per each written/created file, that's why if you write many files in the same JVM you can hit OOM if you do not allocate enough memory for a JVM.

@afilipchik
Copy link
Author

I had to go with custom one as HDFS connector doesn't support Flink's StreamingFile sink. Only hdfs:// can be used with this sink as it supports trim operation. StreamingSink is nice as it natively integrates with Flink check pointing.

  1. Technically depends on actual job, but It was dying on 200 files. Size wise there were 1Mb -> 50Mb (test data).
  2. directory tree. I'm prototyping kafka -> gcs, with dir structure: [kafka_topic]/yyyy/mm/dd/files
  3. It depends. Normal operations worst case: kafka_topics * partitions / number workers. Back fill worst case: kafka_topics * partitions / number * [number of days it can read between flink checkpoints].
  4. That one was the only one, but I was using direct upload flag and wondering whether it was a good idea. When I set it to true I'm just getting OOMs

But 73 MiB explains it. I thought buffers are around 1MiB. Over long weekend I rewrote it by adding local file buffer and limiting upload parallelism to a preconfigured number and was able to run it on 3Gb of heap.

@medb
Copy link
Contributor

medb commented Jul 6, 2020

Interesting, does it mean that for a Flink native FS and StreamingSink you don't need trim or similar functionality that is required for HDFS?

If you are writing small files then you can reduce GCS upload chunk size (64 MiB by default) to a lower value (8 MiB, for example - fs.gs.outputstream.upload.chunk.size=8388608) to workaround OOMs without increasing JVM heap size - it will reduce upload throughput to GCS but it doesn't matter for sub 64 MiB files and still should be faster than writing to a local FS and uploading to GCS after that.

For small files direct upload makes sense, because it's more efficient and anyway you don't need resumable upload for files that can be uploaded in a single request and/or retried from local FS cache.

But generally speaking, GCS is inefficient when processing many small files, that's why you may want to change your pipeline to write fewer bigger files, ideally you should write 500+ MiB files to GCS, or at least 100 MiB files. Even if you will make it work with writing many small files, it will be less efficient when you will need to read and process these files.

Seems like java.io.IOException: Error writing request body to server is retryable - we will add retries for it in next GCS connector release to improve reliability.

@afilipchik
Copy link
Author

I use approach similar to one used in S3 FS: https://github.com/apache/flink/tree/master/flink-filesystems/flink-s3-fs-base.
the way Flink implements it: https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.java

2 important methods: persist and closeForCommit. Pipeline controls when those methods are called (with bulk sink and 10 minutes checkpoints it will happen every 10 mins). So, my original implementation was to open GCS stream, keep writing till persist is called, close file on persist and open new one. I use combine to create a final file when closeForCommit is called. Rollback then just removes non persisted files.

Another quick question -> how does it deal with streams that are open for long time (minutes), but don't receive too much data? Like, stream is open for 10 minutes, but all data is written in the first couple seconds? Or every couple seconds?

@medb
Copy link
Contributor

medb commented Jul 14, 2020

Stream open for a long time should not be a problem, because GCS connector cashes data until there will be enough of it to send a request to GCS API, so open stream doesn't mean an open GCS connection. That said direct upload could open a single long running request that will be less reliable because it can not recover from transient network failures for long uploads.

@medb
Copy link
Contributor

medb commented Jul 14, 2020

Coming back to retrying IOExceptions - all of them are already retried:

HttpBackOffIOExceptionHandler exceptionHandler =
new HttpBackOffIOExceptionHandler(new ExponentialBackOff());

It must mean that for your usage pattern number of retries is not sufficient, you can adjust it thought fs.gs.http.max.retry property, but this will be inefficient and slow, so you may want to avoid writing too many small files to GCS instead.

Closing this issues because retries are already in-place.

@medb medb closed this as completed Jul 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants