Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Performance degradation of S3 Filesystem when working with large splittable files when closed before all data is consumed. #25991

Closed
2 of 15 tasks
vatanrathi opened this issue Mar 27, 2023 · 8 comments · Fixed by #26114

Comments

@vatanrathi
Copy link

vatanrathi commented Mar 27, 2023

What happened?

Beam java pipeline encounter severe performance issues on all beam versions later than 2.30.0. After troubleshooting, we identified that enhancement done under BEAM-12329 seems root cause for the performance degradation.
Note that pipeline runs fine for small datasets of few MBs but are significant slower for lager datasets ~10GBs+.
It appears that on version 2.30 and lower, amazon-web-service module was not draining the input stream and program complained about "Not all bytes were read from .... " but as we know in this case system is overreacting and making invalid assumption about remaining data and cost of read vs abort and reconnect.
In later version post 2.30.0, when input stream is drained when close function is called and subsequent requests to read remaining data needs to reconnect and seek to the position which is significantly slower in our case and pipeline keeps running for hours as against to a few minutes on version 2.30 and lower.
Do you have any suggestion on how to achieve the same performance ? I have tried both amazon-web-services and amazon-web-services2 on all versions after 2.30 and all results same poor performance.

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@vatanrathi
Copy link
Author

@iemejia can you kindly help here ?

@iemejia
Copy link
Member

iemejia commented Mar 30, 2023

Hi, I am less involved in Beam recently. I wonder if what is going on is that somehow the Amazon library is slow with the connections. One thing to check is if the delays are when trying to close/drain the connections from the AWS IO connector. Notice that I added this code because the library warns about not doing it, but from a quick look I have seen few connectors in other systems doing it.

Maybe this manifests because the Spark runner has many many workers, is this the case? If so, this could definitely count.

I cannot think of an easy fix, apart of reverting the change, but we need to check first with the current maintainers if they are ok with this @aromanenko-dev @mosche or if there are other ideas.

@vatanrathi
Copy link
Author

@iemejia You might be correct in saying that there could be an underlying issue with amazon sdk.

This is what I did so far:

  1. beam-sdks-java-io-amazon-web-services - I tried putting patch to remove "drainInputStream" call from close() and performance is same across all latest versions. But, then returns previous aws warning about "Not all bytes read"

  2. beam-sdks-java-io-amazon-web-services2 - Putting same patch to ignore draining resulted in improved performance but still lot worse than sdk1 ... I noticed there seems to be an issue with closing of ResponseInputStream which appears to be waiting for a long time. Based on a sample test it took around 6mins to close, so I added a "abort()" call before close/drain and to my surprise it result significantly improved performance which I would expect from latest beam + spark3

Below logs suggest that program waited ~6min for closing ResponseStream
21:27:23 dtime="2023-03-30 21:27:15.978", thread="idle-connection-reaper", lvl="DEBUG", logger="software.amazon.awssdk.http.apache.internal.net.SdkSslSocket", ctx="debug", jobId="xxxxx", executionId="xxxxx", closing xxxxx.s3.ap-southeast-2.amazonaws.com/52.95.131.46:443
21:33:44 dtime="2023-03-30 21:33:33.406", thread="Executor task launch worker for task 4.0 in stage 0.0 (TID 4)", lvl="INFO", logger="org.apache.spark.storage.memory.MemoryStore", ctx="logInfo", jobId="", executionId="", Block rdd_8_4 stored as values in memory (estimated size 67.4 MiB, free 15.8 GiB)

After adding "abort" call before draining (https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3ReadableSeekableByteChannel.java#L168) on sdk2, I did not observe any wait ...
However, I am not sure If adding an "abort" call would cause any issue to my program or is it a bad choice

@aromanenko-dev
Copy link
Contributor

@vatanrathi

I tried putting patch to remove "drainInputStream" call from close() and performance is same across all latest versions.

Do you mean that it was not improved with that patch?

I'm wondering if it's even possible that close() will be called under normal circumstances before all data is read?

@vatanrathi
Copy link
Author

@aromanenko-dev Sorry If I was not clear before ... Let me explain

Currently we are on beam 2.23.0 versions and given job finishes in around 10min. I tried to upgrade to 2.45.0 and noticed performance issues on both aws sdk1 and 2. So, I thought of upgrading versions step by step and thats where I noticed that performance started degraded from ver 2.31.0. Thats where I noticed this change which I believe is the root cause.

Below is my final findings based on several iterations of tests.

  1. With aws sdk1, if I drainInputStream is removed from close() call, then execution time is same across versions.
  2. However with sdk2 , with drainInputStream call in close(), pipeline runs for hours which takes only ~10min to finish on aws sdk1. if drainInputStream is closed, performance is improved but it still it took ~30mins to finish. But if s3ResponseInputStream.abort() is called before s3ResponseInputStream.close() in close(), then performance is significantly imporved and pipeline finishes within 3minutes.
  @Override
  public void close() throws IOException {
    if (s3ResponseInputStream != null) {
      **s3ResponseInputStream.abort()**
      drainInputStream(s3ResponseInputStream);
      s3ResponseInputStream.close();
    }
    open = false;
  }

I found a bug aws/aws-sdk-java-v2#2117 raised in aws-sdk-java-v2 for close() call which also complains that close() call unexpectedly waits.

For your question "I'm wondering if it's even possible that close() will be called under normal circumstances before all data is read?", I dont know the exact answer but I think as beam reads data in burst so when data read in first fetch is being processed, s3 try to close connection.

If you think we can avoid close() call by tweaking some http connection param in pipeline options or in some other way, kindly let me know

@mosche
Copy link
Member

mosche commented Apr 3, 2023

@vatanrathi Thanks for raising this, your pointer to aws/aws-sdk-java-v2/issues/2117 is very helpful 👍 Trying to drain the input stream in all cases is certainly dangerous considering that files might be very large and the byte range requested was from "position" to the very end.

As a quick workaround using abort (if position != contentLength) seems totally eligible, particularly when dealing with large files. Though, drawback is respective connections cannot be reused.

If it is not desired to read remaining data from the stream, you can explicitly abort the connection via abort(). Note that this will close the underlying connection and require establishing an HTTP connection which may outweigh the cost of reading the additional data.

I'll have a closer look the next days and will think about alternatives. It will probably make sense to read the data in chunks to minimize the overhead when closing but also allow reusing connections.

@vatanrathi
Copy link
Author

@mosche Thanks a lot for agreeing to look into it.. It was causing a lot of trouble for us.

I would also need your comment on connection reuse. We have very large files (some are more than 200GBs) to process.
I have setup my httpClientConfig as below

options.setHttpClientConfiguraton(HttpClientConfiguraton.builder()
.connectionTimeout(1000 * 60 * 60  * 10) // 10 hours
.socketTimeout((1000 * 60 * 60  * 10) // 10 hours
.connectionMaxIdleTime((1000  * 10) // 10 seconds
.build());

This is to ensure that we DO NOT REUSE conn from pool that have been idle for more than 10sec since s3 closes idle conn after 20 sec which could result in using an already closed conn. This idle timeout matters as BEAM process data in bursts.

So, I (think) connection is closed every 10 secs which invoke close() call. Do you think my above config is fine for this use case ?

At this stage, I have upgraded to BEAM 2.45.0 with spark3 and aws skd2. I have put a patch on aws skd2 by including abort() call within close() function which is giving me best performance in my SIT environment. Do you think I should be able to take it to prod until a proper fix/workaround is implemented in BEAM sdk ?

@mosche
Copy link
Member

mosche commented Apr 3, 2023

@vatanrathi Note, if you use abort connections won't be kept open and force-closed anyways.
But configuring idle timeout this way should be fine 👍

connectionTimeout should be magnitudes lower, that's the timeout to establish the connection!

The amount of time to wait when initially establishing a connection before giving up and timing out.

socketTimeout can be high. Nevertheless, as your large files are hopefully splittable they are never read at once. You should be fine using a much lower timeout here as well.

@mosche mosche added aws and removed awaiting triage labels Apr 3, 2023
@mosche mosche changed the title [Bug]: BEAM-12329 causes performance issues [Bug]: Performance degradation of S3 Filesystem when working with large splittable files when closed before all data is consumed. Apr 3, 2023
@mosche mosche self-assigned this Apr 5, 2023
mosche pushed a commit to mosche/beam that referenced this issue Apr 5, 2023
@github-actions github-actions bot added this to the 2.48.0 Release milestone Apr 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants