Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error decoding response body after upgrade to object store 0.10 #5882

Open
ion-elgreco opened this issue Jun 13, 2024 · 63 comments
Open

error decoding response body after upgrade to object store 0.10 #5882

ion-elgreco opened this issue Jun 13, 2024 · 63 comments
Assignees
Labels

Comments

@ion-elgreco
Copy link

ion-elgreco commented Jun 13, 2024

Describe the bug
We bumped the object store to 0.10 in delta-rs, and now we already seeing a couple reports on the following error error decoding response body. Happens on Azure and S3.

See delta-io/delta-rs#2595 and delta-io/delta-rs#2592

To Reproduce
Seems to occur when reading tables or doing operations on them.

Expected behavior
Don't have an issue decoding the response body

Additional context

@thomasfrederikhoeck @k-ye

@ion-elgreco ion-elgreco changed the title error decoding response body after upgrade to object store 0.10 error decoding response body after upgrade to object store 0.10 Jun 13, 2024
@tustvold
Copy link
Contributor

I think we would need a reproducer to action this, the linked issues aren't even clearly implicating object_store

@Xuanwo
Copy link
Member

Xuanwo commented Jun 15, 2024

Please also print the source of the error via Debug print. Usually, it should be caused by connection reset or similar network related errors.

@ion-elgreco
Copy link
Author

@thomasfrederikhoeck @k-ye can you guys provide additional details please

@thomasfrederikhoeck
Copy link

@Xuanwo I would love to be of more help but I don't now how to do this in delta-rs (an in turn object_store). I didn't help setting the timeout to 300s.

@ion-elgreco Can you point me in the direction of how I can provide better logs?

@Xuanwo
Copy link
Member

Xuanwo commented Jun 24, 2024

@Xuanwo I would love to be of more help but I don't now how to do this in delta-rs (an in turn object_store). I didn't help setting the timeout to 300s.

Hi, if you can consistently reproduce this issue, please change the following places:

https://github.com/delta-io/delta-rs/blob/d17ed97b5bda0cadbc0df959f8fb38e275570c87/python/src/error.rs#L41-L51

fn object_store_to_py(err: ObjectStoreError) -> PyErr {
    match err {
        ObjectStoreError::NotFound { .. } => PyFileNotFoundError::new_err(err.to_string()),
        ObjectStoreError::Generic { source, .. }
            if source.to_string().contains("AWS_S3_ALLOW_UNSAFE_RENAME") =>
        {
            DeltaProtocolError::new_err(source.to_string())
        }
        _ => PyIOError::new_err(err.to_string()),
    }
}

Don't use err.to_string(), print it's debug message instead.

@thomasfrederikhoeck
Copy link

@Xuanwo Ah thanks!! I get the following consistently :

Generic {
    store: "MicrosoftAzure",
    source: reqwest::Error {
        kind: Decode,
        source: reqwest::Error {
            kind: Body,
            source: TimedOut,
        },
    },
}

I also tried bumping the timeout to 600s. I still get _internal.DeltaError: Failed to parse parquet: Parquet error: Z-order failed while scanning data: ArrowError(ExternalError(General("ParquetObjectReader::get_byte_ranges error: Generic MicrosoftAzure error: error decoding response body")), None) but I never hit the debug print in this case. I am however seeing a lot of

[2024-06-24T21:09:33Z INFO  object_store::client::retry] Encountered transport error backing off for 0.1 seconds, retry 1 of 10: error sending request for url (REDACTED)
[2024-06-24T21:13:00Z DEBUG hyper_util::client::legacy::client] client connection error: error shutting down connection

@Xuanwo
Copy link
Member

Xuanwo commented Jun 25, 2024

[2024-06-24T21:09:33Z INFO object_store::client::retry] Encountered transport error backing off for 0.1 seconds, retry 1 of 10: error sending request for url (REDACTED)

I suspect there's an issue with the network connection between your environment and Azure.

Could you provide more details about your setup?

  • Where are you conducting the tests?
  • Which Azure region or zone are you using, or are you just using Azurite?
  • What is the average latency and bandwidth between your location and Azure?
  • Can you try using azcopy to read/write large file?

@thomasfrederikhoeck
Copy link

thomasfrederikhoeck commented Jun 25, 2024

@Xuanwo I might be network related but I have some feeling that is related to how object_store or delta-rs handles if there is a lower throughput than within a Azure data center (some connections going stale while waiting for somthing else).

  • Running from local laptop in Europe
  • Azure region is Westeurope
  • I just benchmarked with azcopy bench "https://ACCOUNT.blob.core.windows.net/CONTAINER?SAS" --file-count 20 --size-per-file 10000M. So 20 files of 10 Gb and here I get a throughput of 145 Mb/s. It runs through with no failures.

The benchmark took 1+ hours with no failure while the delta-rs call fails within a few minutes.

@ion-elgreco
Copy link
Author

@Xuanwo @tustvold I can concur this also happens to us in v0.18.1/2, I can see logs in LakeFS which says "context canceled".

Somewhere in object store the connection is getting dropped constantly with large files. Can you guys give suggestions on how to debug.

For me I am connecting within a VNET in EU amsterdam

@tustvold
Copy link
Contributor

Are you mixing IO with CPU bound work, I wonder if you are stalling out the tokio runtime

@ion-elgreco
Copy link
Author

ion-elgreco commented Aug 14, 2024

Hmm I am not sure, I started working on delta-rs a year ago and most of this FileSystem handling code was already there.

We essentially create a DeltaFileSystemHandler which we expose to Python. In python we create a DeltaStorageHandler which inherits the pyarrow FileSystemHandler methods, which we implement to call the Rust DeltaFileSystemHandler.

I think Pyarrow just calls read on an ObjectInputFile, which in rust calls get_range on the underlying object-store (https://github.com/delta-io/delta-rs/blob/c446b1287dedba122b941d8d1d4ae6290aa86d5c/python/src/filesystem.rs#L467-L495)

    fn read<'py>(&mut self, nbytes: Option<i64>, py: Python<'py>) -> PyResult<Bound<'py, PyBytes>> {
        self.check_closed()?;
        let range = match nbytes {
            Some(len) => {
                let end = i64::min(self.pos + len, self.content_length) as usize;
                std::ops::Range {
                    start: self.pos as usize,
                    end,
                }
            }
            _ => std::ops::Range {
                start: self.pos as usize,
                end: self.content_length as usize,
            },
        };
        let nbytes = (range.end - range.start) as i64;
        self.pos += nbytes;
        let data = if nbytes > 0 {
            py.allow_threads(|| {
                rt().block_on(self.store.get_range(&self.path, range))
                    .map_err(PythonError::from)
            })?
        } else {
            "".into()
        };
        // TODO: PyBytes copies the buffer. If we move away from the limited CPython
        // API (the stable C API), we could implement the buffer protocol for
        // bytes::Bytes and return this zero-copy.
        Ok(PyBytes::new_bound(py, data.as_ref()))

Here rt() just creates a runtime if it doesn't exist yet. So I don't see where here CPU bound related stuff is happening, unless I have to consider the cpu bound work in python which is done by PyArrow itself?

@tustvold
Copy link
Contributor

That at least looks plausible, how big are the ranges we're fetching and how long are we fetching for? I wonder if we're running into some Azure limit, it sounds like they're hanging up for some reason

@ion-elgreco
Copy link
Author

What do you mean with looks plausible? :)

@tustvold I'll put some print statements in the ranges, to see what is being requested! Will get back to you on that!

@tustvold
Copy link
Contributor

What do you mean with looks plausible? 

I can't see anything obviously wrong, but also don't know much about pyarrow so can't say definitively if it is correct

@ion-elgreco
Copy link
Author

ion-elgreco commented Aug 14, 2024

@tustvold It seems pyarrow fetches 4 files in parallel and then reads around 30MB each time:

https://gist.github.com/ion-elgreco/e2339990843755b40475dbd6e72e4697

@tustvold
Copy link
Contributor

That's on the chonkier end of optimal, but not ludicrous. How long do the fetches take?

@ion-elgreco
Copy link
Author

Hmm what would you suggest is more optimal? Like 10MB?

So now my VPN connection throughput is working fine, so it seems each fetch takes around 4-8 secs.

python/src/filesystem.rs:501:17] elapsed.as_secs() = 6
[python/src/filesystem.rs:473:9] (&self.path, &nbytes) = (
    Path {
        raw: "product_line_code=DUMMY/100-f1cafe66-476f-4818-8199-5c5a4a6eb4ef-0.parquet",
    },
    Some(
        33231426,
    ),
)

@tustvold
Copy link
Contributor

VPN connection

Oh... This is almost certainly what is causing this issue, very few VPNs will support large volume data transfer. It is almost certainly dropping the connections in the interest of preserving QoS for other users. Shuttling data through a VPN box is not only likely to be the cause of your issue, it is also likely very expensive.

@ion-elgreco
Copy link
Author

I see that could explain it for me, however my colleague saw timeouts on his Azure Compute instance, so Azure <-> Azure connection within our vnet

@tustvold
Copy link
Contributor

tustvold commented Aug 14, 2024

I'm afraid I don't have any other ideas, something outside of object_store is dropping the connection. This could be Azure itself, Azure blob storage definitely gives off the impression of being an MVP that somehow got shipped, but it is more likely to be some middleware network appliance, like a VPN, NAT gateway or similar. AWS has private gateway endpoints that must be configured for S3, I am not sure if Azure needs something similar.

@thomasfrederikhoeck
Copy link

@Xuanwo I might be network related but I have some feeling that is related to how object_store or delta-rs handles if there is a lower throughput than within a Azure data center (some connections going stale while waiting for somthing else).

  • Running from local laptop in Europe
  • Azure region is Westeurope
  • I just benchmarked with azcopy bench "https://ACCOUNT.blob.core.windows.net/CONTAINER?SAS" --file-count 20 --size-per-file 10000M. So 20 files of 10 Gb and here I get a throughput of 145 Mb/s. It runs through with no failures.

The benchmark took 1+ hours with no failure while the delta-rs call fails within a few minutes.

@tustvold The weird thing is that I can run some rather large data opeartions (taking an 1+ hour) with azcopy bench without seeing any dropped connection or something like that.

I can maybe add: Before this PR in polars we sometimes saw similar issues but I'm very far from knowledge-able on networking.

@tustvold
Copy link
Contributor

Before this PR in polars

This is why I asked about starving the tokio threadpool, this does not appear to be the issue @ion-elgreco is running into from what he has shared.

with azcopy bench without seeing any dropped connection or something like that.

Azcopy will be using multipart uploads, which uses smaller requests that are therefore less susceptible to dropped connections

@ion-elgreco
Copy link
Author

ion-elgreco commented Aug 14, 2024

I got a bit confused myself here, but @thomasfrederikhoeck you have issues during Optimize where the data is being read differently. @tustvold, here it seems we read a Parquet object within a tokio task, should this be rayon threadpool instead?

let stream = match operations {
            OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
                .flat_map(|(_, (partition, bins))| {
                    futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
                })
                .map(|(partition, files)| {
                    debug!(
                        "merging a group of {} files in partition {:?}",
                        files.len(),
                        partition,
                    );
                    for file in files.iter() {
                        debug!("  file {}", file.location);
                    }
                    let object_store_ref = log_store.object_store();
                    let batch_stream = futures::stream::iter(files.clone())
                        .then(move |file| {
                            let object_store_ref = object_store_ref.clone();
                            async move {
                                let file_reader = ParquetObjectReader::new(object_store_ref, file);
                                ParquetRecordBatchStreamBuilder::new(file_reader)
                                    .await?
                                    .build()
                            }
                        })
                        .try_flatten()
                        .boxed();

                    let rewrite_result = tokio::task::spawn(Self::rewrite_files(
                        self.task_parameters.clone(),
                        partition,
                        files,
                        log_store.object_store().clone(),
                        futures::future::ready(Ok(batch_stream)),
                    ));
                    util::flatten_join_error(rewrite_result)
                })
                .boxed(),

Later down in the code we read that stream from above one by one and cast each recordBatch which is cpu bound I guess? And then we write it as a parquet again:

while let Some(maybe_batch) = read_stream.next().await {
            let mut batch = maybe_batch?;

            batch = super::cast::cast_record_batch(
                &batch,
                task_parameters.file_schema.clone(),
                false,
                true,
            )?;
            partial_metrics.num_batches += 1;
            writer.write(&batch).await.map_err(DeltaTableError::from)?;
        }

@tustvold
Copy link
Contributor

tustvold commented Aug 14, 2024

You should avoid doing any non-trivial CPU-bound work on the tokio threadpool that you use for IO. The way I've seen this done successfully is running DF in one tokio threadpool, and then spawning IO from it into a different one. There was some work in the past to make this easier, see #4040, but I never got it over the line. I'll file a ticket

Edit: Filed #6248

@ion-elgreco
Copy link
Author

@tustvold thanks for the support! And insights 😄, not extreme expert on rust async yet, so I might need to look into this on how I could allocate or split up these pools.

How fast do you think #6248 could land?

@tustvold
Copy link
Contributor

tustvold commented Sep 7, 2024

If we identify the CPU intensive sections of Datafusion and use spawn_blocking combined with a channel we could move all the blocking tasks to that separate threadpool and use the default tokio threadpool for IO.

I think the challenge for DataFusion is that almost all of its processing is CPU bound, so it makes more sense to move it wholesale to its own threadpool. Calling spawn_blocking for every CPU-bound section would not only be extremely verbose, but would have terrible performance as the per-task overhead for spawn_blocking is very high. Ultimately the decision was made to make DataFusion exection async, some might weigh the tradeoffs involved differently and argue this is unfortunate, but whatever the case this means DataFusion needs to run in an async-scheduler. This could be tokio, or something else, but tokio is the default choice.

You may be interested to look into apache/datafusion#2199 where I tried to move DF away from async/tokio, but it was an extremely complicated project back then, and is likely even less practical now.

If this is something of interest I would recommend filing a ticket in DataFusion. I am loathe to, as I do not have the time or capacity to drive such an initiative myself, but I personally think it would be valuable.

@adriangb
Copy link
Contributor

adriangb commented Sep 7, 2024

If I understand correctly the point is that we don't want to move CPU work somewhere because that's happening in too many places to keep track. Instead we want to "special case" the IO work and move that around. Is that right?

@crepererum
Copy link
Contributor

If I understand correctly the point is that we don't want to move CPU work somewhere because that's happening in too many places to keep track. Instead we want to "special case" the IO work and move that around. Is that right?

This is what we do at InfluxData and it works reasonably well. You have to be slightly careful so that you don't miss some IO calls or that you don't hand IO handles (e.g. sockets, or HTTP connections wrapping them) from the IO runtime to the CPU runtime. But other than that, it works and improves both our end2end latency and the tokio internal metrics.

@alamb
Copy link
Contributor

alamb commented Sep 9, 2024

If we identify the CPU intensive sections of Datafusion and use spawn_blocking combined with a channel we could move all the blocking tasks to that separate threadpool and use the default tokio threadpool for IO.

If we did this I think it is important to do performance tests -- by default tokio potentially uses many (100s I think?) of threads for this blocking thread pool and if we are not careful launching CPU bound work on them will mean the threads are over subscribed (more threads than CPUs) which will reduce effectiveness

This is what we do at InfluxData and it works reasonably well. You have to be slightly careful so that you don't miss some IO calls or that you don't hand IO handles (e.g. sockets, or HTTP connections wrapping them) from the IO runtime to the CPU runtime.

It would be really helpful to document / write a blog about how this works -- I think it would be widely read and appreciated. @ion-elgreco any interest / chance that you or someone else in the delta lake team would be able to? I would be happy to collaborate.

@zalmane
Copy link

zalmane commented Sep 9, 2024

Just experienced this issue as well. This is an Azure VM accessing Azure blob storage. Happens intermittently. We have 60 tasks running in parallel and happens ocassionally and retrying resolves it. We set the pyarrow io thread count to 2 and cpu thread count to 1 on a machine with 120 vCPUs.
Using pyarrow 15 and delta-rs 19.2 (although the error is from arrow-rs so seems unrelated to delta).
Any suggestions for a workaround?

@tustvold
Copy link
Contributor

tustvold commented Sep 9, 2024

I've created apache/datafusion#12393 to track this on DataFusion's end.

I am also going to close this issue, as I don't believe there is any work planned in this repository related to this, ultimately the issue lies in how DataFusion and its consumers are wiring up IO.

@tustvold tustvold closed this as not planned Won't fix, can't repro, duplicate, stale Sep 9, 2024
@zalmane
Copy link

zalmane commented Sep 9, 2024

Why is this being closed? We are experiencing this bug on pyarrow/arrow-rs without Datafusion.

@tustvold
Copy link
Contributor

tustvold commented Sep 9, 2024

We are experiencing this bug on pyarrow/arrow-rs without Datafusion

delta-rs makes use of DataFusion, and as discussed in #5882 (comment).

I am closing this issue to make it clear that no work is planned in this repository related to this

@zalmane
Copy link

zalmane commented Sep 9, 2024

I see. Even though the trace points to pyarrow? I was under the impression that when reading the table itself, delta-rs is no longer in the middle:

  File "pyarrow/_dataset.pyx", line 562, in pyarrow._dataset.Dataset.to_table
  File "pyarrow/_dataset.pyx", line 3804, in pyarrow._dataset.Scanner.to_table
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 88, in pyarrow.lib.check_status
OSError: Generic MicrosoftAzure error: error decoding response body
, Message: Generic MicrosoftAzure error: error decoding response body

Should I repost this on the datafusion issue or do you think this is different and is related to pyarrow/arrow-rs

@ion-elgreco
Copy link
Author

If we identify the CPU intensive sections of Datafusion and use spawn_blocking combined with a channel we could move all the blocking tasks to that separate threadpool and use the default tokio threadpool for IO.

If we did this I think it is important to do performance tests -- by default tokio potentially uses many (100s I think?) of threads for this blocking thread pool and if we are not careful launching CPU bound work on them will mean the threads are over subscribed (more threads than CPUs) which will reduce effectiveness

This is what we do at InfluxData and it works reasonably well. You have to be slightly careful so that you don't miss some IO calls or that you don't hand IO handles (e.g. sockets, or HTTP connections wrapping them) from the IO runtime to the CPU runtime.

It would be really helpful to document / write a blog about how this works -- I think it would be widely read and appreciated. @ion-elgreco any interest / chance that you or someone else in the delta lake team would be able to? I would be happy to collaborate.

@alamb what's the question exactly?

Currently our logstore is spawning all tasks in a separate runtime that can be configured

@tustvold
Copy link
Contributor

tustvold commented Sep 9, 2024

Even though the trace points to pyarrow
Should I repost this on the datafusion issue or do you think this is different and is related to pyarrow/arrow-rs

The issue pertains to how CPU bound work is starving IO, this side-channel will not be reflected in stack traces. Additionally there is something in-between that is connecting pyarrow to object_store, we don't provide such an integration. The delta-rs people will likely be best placed to comment on what this is.

@alamb alamb reopened this Sep 9, 2024
@alamb
Copy link
Contributor

alamb commented Sep 9, 2024

Let's leave this ticket open until we sort out the next steps (though I agree with @tustvold that I don't predict any code changes in arrow-rs)

@alamb
Copy link
Contributor

alamb commented Sep 9, 2024

@alamb what's the question exactly?

Currently our logstore is spawning all tasks in a separate runtime that can be configured

My question is "would you be willing to summarize this ticket / write up a blog post (perhaps on the DataFusion blog) explaining how to spawn IO related tasks on a different thread pool?

I am 🎣 for help as I would like to write this blog too (so we can distill down this ticket and others for future discussion) but I am struggling to find time

@crepererum
Copy link
Contributor

crepererum commented Sep 10, 2024

I think there's something else going on that is independent of the tokio runtime issues. I was able to reproduce this locally with a somewhat broken kubectl port-forward (which is kinda famous for it's unstable connection handling when transferring large amounts of data). I'm hitting the same error case, and it is NOT retried at all. It seems that this is this error case here:

https://github.com/seanmonstar/reqwest/blob/09884ed0a09d43ebd5c67491caf4ad5683fba995/src/error.rs#L191

So I think we should extend the retry logic to capture this case. However this might be difficult in the streaming case (i.e. when the error occurs mid-stream), see

let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

At least we should try to improve the error message. It seems that the Display impl. is not super helpful and we might wanna use Debug instead, see seanmonstar/reqwest#2373

@crepererum crepererum self-assigned this Sep 10, 2024
@tustvold
Copy link
Contributor

tustvold commented Sep 10, 2024

Retrying interrupted streaming requests is tracked by - #6287

I'm a bit wary of this ticket just becoming a general dumping ground for any networking related issue, which is part of why I closed it...

@crepererum
Copy link
Contributor

Error display improvements tracked by #6377.

@tustvold
Copy link
Contributor

Ok so here is my summary of this ticket, and the action items going forward.

Problem

The error decoding response body error occurs whenever an HTTP request body is interrupted whilst streaming the response body. This is after the HTTP status code has been returned and determined to be OK. I am not aware of any object stores using HTTP trailers to indicate errors, they just terminate the request. We do not currently support retrying request interrupted mid-stream in this way, and there are complexities involved in supporting this (#6287).

Causes

There are two related causes of this:

  1. Mixing IO and CPU on the same runtime, stalling out IO and causing the server to hangup - error decoding response body after upgrade to object store 0.10 #5882 (comment)
  2. An unstable network connection through some sort of proxy or VPN - error decoding response body after upgrade to object store 0.10 #5882 (comment) error decoding response body after upgrade to object store 0.10 #5882 (comment)

Outcome

As for the follow on work:

Please let me know if I have missed anything, otherwise I will look to close this issue in favour of the linked issues in the next few days. I think this issue has been very helpful, and I'm grateful for everyone who has participated, but I am keen to put this on a more actionable footing.

@ion-elgreco
Copy link
Author

@alamb what's the question exactly?

Currently our logstore is spawning all tasks in a separate runtime that can be configured

My question is "would you be willing to summarize this ticket / write up a blog post (perhaps on the DataFusion blog) explaining how to spawn IO related tasks on a different thread pool?

I am 🎣 for help as I would like to write this blog too (so we can distill down this ticket and others for future discussion) but I am struggling to find time

I could but not soon, I recently started a new job so that's keeping me quite busy

@ion-elgreco
Copy link
Author

ion-elgreco commented Sep 10, 2024

What I still don't quite get is that we are still seeing errors but only on the reading side through the DeltaFileHandler which is exposed into a pyarrow filesystem, there should be zero cpu bound tasks on that tokio runtime

@alamb
Copy link
Contributor

alamb commented Sep 10, 2024

What I still don't quite get is that we are still seeing errors but only on the reading side through the DeltaFileHandler which is exposed into a pyarrow filesystem, there should be zero cpu bound tasks on that tokio runtime

Perhaps it is related to just some networks errors and a retry of streaming and #6287 tracks retrying interrupted streaming requests

@ion-elgreco
Copy link
Author

@alamb delta-io/delta-rs#2595 (comment)

I asked them to add a timeout increase and that resolved it, i guess it would already help a lot if the true error surfaces, it might be all those folks have low network throughput

@alamb
Copy link
Contributor

alamb commented Sep 10, 2024

@alamb delta-io/delta-rs#2595 (comment)

I asked them to add a timeout increase and that resolved it, i guess it would already help a lot if the true error surfaces, it might be all those folks have low network throughput

I believe @itsjunetime may be able to take a look at improving the errors and retries. We'll keep the tickets updated

@erratic-pattern
Copy link
Contributor

I've opened a PR #6519 that will retry on reqwest::Error::Decode errors. I'm not sure which issue to associate it with, but it seems related to this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests