Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16.0][FIX] fastapi: Avoid process stuck in case of retry of Post request w… #440

Merged
merged 3 commits into from
Jul 8, 2024

Conversation

lmignon
Copy link
Contributor

@lmignon lmignon commented Jun 26, 2024

…ith body content.

In case of retry we must ensure that the stream pass to the Fastapi application is reset to the beginning to be sure it can be consumed again. Unfortunately , the stream object from the werkzeug request is not always seekable. In such a case, we wrap the stream into a new SeekableStream object that it become possible to reset the stream at the begining without having to read the stream first into memory.

@lmignon lmignon marked this pull request as draft June 26, 2024 15:12
@lmignon lmignon force-pushed the 16.0-fix-retry-on-post-with-body branch 2 times, most recently from 5e626b8 to df3228a Compare June 26, 2024 15:52
…ith body content.

In case of retry we must ensure that the stream pass to the Fastapi application is reset to the beginning to be sure it can be consumed again. Unfortunately , the stream object from the werkzeug request is not always seekable. In such a case, we wrap the stream into a new SeekableStream object that it become possible to reset the stream at the begining without having to read the stream first into memory.
@lmignon lmignon force-pushed the 16.0-fix-retry-on-post-with-body branch from df3228a to e871a22 Compare June 26, 2024 16:33
@lmignon lmignon marked this pull request as ready for review June 26, 2024 16:34
@lmignon
Copy link
Contributor Author

lmignon commented Jun 26, 2024

ping @paradoxxxzero

@lmignon
Copy link
Contributor Author

lmignon commented Jun 27, 2024

ping @benwillig @qgroulard

For the record, this addon works successfully with:

At least you must pay attention to use starlette>=0.36.3(as requested by fastapi 0.110.0) and a2wsgi=>1.10.6. (This last version fixes a lot of issues that could lead to process stuck) . I wonder if I should force minimum versions in the declaration of the addon's external dependencies @sbidoul

@paradoxxxzero
Copy link
Contributor

@lmignon Thanks for the ping, looks good to me. It comes with the drawback of having to load the entire stream in memory but you can't have it all and should be fine in most realistic cases.

@sbidoul
Copy link
Member

sbidoul commented Jun 27, 2024

I wonder if I should force minimum versions in the declaration of the addon's external dependencies @sbidoul

As you have had hard to diagnose issues with older versions, yes, let's declare the minimum versions of fastapi and a2wsgi to spare the pain to others. It is not be necessary to constrain starlette here as fastapi does it already.

These minimal versions ensure that the retrying mechanism from odoo is working fine with the way the werkezeug request is pass from odoo to the fastapi app.
@sbidoul
Copy link
Member

sbidoul commented Jun 27, 2024

IIUC, the SeekableStream implementation is useful only if a retryable error occurs while the input stream has only been partially read, as it that case it would consume less memory than a simple BytesIO wrapper. Also it may have a performance impact in the common case compared to BytesIO which is implemented in C. Hard to tell.

@lmignon
Copy link
Contributor Author

lmignon commented Jun 27, 2024

@sbidoul @paradoxxxzero Bearing in mind the question of maintenance, it is undoubtedly preferable to use a standard IO class than to use specific code (SeekableStream) for a benefit that has yet to be demonstrated. In the same time, we can also keep memory consumption under control by using the SpooledTemporaryFile class. (This integration of fastapi into Odoo is complex enough in itself, so I should avoid making it even more complex.) . In terms of performances, if we compare SpooledTemporaryFile to io.BytesIO, both are equals as long as the size doesn't exceed the max_size specified for the SpooledTemporaryFile This max_size could be set as a system parameters with a default value of ?128? Mb. What do you thing about it?

On the other side, the specialized SeekableStream class ensures that the stream is only read once as long as there has been no retryable error, whereas using a standard IO class, the stream will always be read at least twice.

@lmignon
Copy link
Contributor Author

lmignon commented Jun 28, 2024

In order to determine the best approach for keeping the incoming stream so that it can be reused in the event of a retryable error, I made a few performance and memory consumption tests. I compared 3 approaches:

The basic one:

stream = io.BytesIO(input_stream.read())
stream.read()

The use of my SeekableStream:

stream = SeekableStream(input_stream)
stream.read()

The use of a SeekableStream class using a tempfile.SpooledTemporaryFile as buffer

stream = SeekableStream2(input_stream)
stream.read()

In the SeekableStream2, a limit of 20MB is declared as max in memory size for the SpooledTemporaryFile buffer

The benchmark was carried out for an input stream of 10MB and one of 100MB. I use an temporary file as original input stream in order to simulate a real use case where the stream comes from a request body or a file upload.

Here it's the code used...

# Copyright 2024 ACSONE SA/NV
# License LGPL-3.0 or later (http://www.gnu.org/licenses/LGPL).
import io
import os
import time
import tracemalloc
import statistics
import tempfile
from contextlib import contextmanager


class SeekableStream(io.RawIOBase):
    def __init__(self, original_stream):
        super().__init__()
        self.original_stream = original_stream
        self.buffer = bytearray()
        self.buffer_position = 0
        self.seek_position = 0
        self.end_of_stream = False

    def read(self, size=-1):  # pylint: disable=method-required-super
        if size == -1:
            # Read all remaining data
            size = len(self.buffer) - self.buffer_position
            data_from_buffer = bytes(self.buffer[self.buffer_position :])
            self.buffer_position = len(self.buffer)

            # Read remaining data from the original stream if not already buffered
            remaining_data = self.original_stream.read()
            self.buffer.extend(remaining_data)
            self.end_of_stream = True
            return data_from_buffer + remaining_data

        buffer_len = len(self.buffer)
        remaining_buffer = buffer_len - self.buffer_position

        if remaining_buffer >= size:
            # Read from the buffer if there is enough data
            data = self.buffer[self.buffer_position : self.buffer_position + size]
            self.buffer_position += size
            return bytes(data)
        else:
            # Read remaining buffer data
            data = self.buffer[self.buffer_position :]
            self.buffer_position = buffer_len

            # Read the rest from the original stream
            additional_data = self.original_stream.read(size - remaining_buffer)
            if additional_data is None:
                additional_data = b""

            # Store read data in the buffer
            self.buffer.extend(additional_data)
            self.buffer_position += len(additional_data)
            if len(additional_data) < (size - remaining_buffer):
                self.end_of_stream = True
            return bytes(data + additional_data)

    def seek(self, offset, whence=io.SEEK_SET):
        if whence == io.SEEK_SET:
            new_position = offset
        elif whence == io.SEEK_CUR:
            new_position = self.buffer_position + offset
        elif whence == io.SEEK_END:
            if not self.end_of_stream:
                # Read the rest of the stream to buffer it
                # This is needed to know the total size of the stream
                self.read()
            new_position = len(self.buffer) + offset

        if new_position < 0:
            raise ValueError("Negative seek position {}".format(new_position))

        if new_position <= len(self.buffer):
            self.buffer_position = new_position
        else:
            # Read from the original stream to fill the buffer up to the new position
            to_read = new_position - len(self.buffer)
            additional_data = self.original_stream.read(to_read)
            if additional_data is None:
                additional_data = b""
            self.buffer.extend(additional_data)
            if len(self.buffer) < new_position:
                raise io.UnsupportedOperation(
                    "Cannot seek beyond the end of the stream"
                )
            self.buffer_position = new_position

        return self.buffer_position

    def tell(self):
        return self.buffer_position

    def readable(self):
        return True


class SeekableStream2(io.RawIOBase):
    def __init__(
        self, original_stream, max_buffer_size=20 * 1024 * 1024
    ):  # Default 20 MB buffer
        super().__init__()
        self.original_stream = original_stream
        self.buffer = tempfile.SpooledTemporaryFile(max_size=max_buffer_size)
        self.buffer_position = 0
        self.end_of_stream = False

    def read(self, size=-1):
        self.buffer.seek(self.buffer_position)

        if size == -1:
            # Read all remaining data
            remaining_data = self.original_stream.read()
            self.buffer.write(remaining_data)
            self.buffer_position = self.buffer.tell()
            self.end_of_stream = True
            self.buffer.seek(0)
            return self.buffer.read()

        buffer_len = self.buffer.seek(0, io.SEEK_END)
        remaining_buffer = buffer_len - self.buffer_position

        if remaining_buffer >= size:
            # Read from the buffer if there is enough data
            self.buffer.seek(self.buffer_position)
            data = self.buffer.read(size)
            self.buffer_position += size
            return data
        else:
            # Read remaining buffer data
            self.buffer.seek(self.buffer_position)
            data = self.buffer.read(remaining_buffer)
            self.buffer_position = buffer_len

            # Read the rest from the original stream
            additional_data = self.original_stream.read(size - remaining_buffer)
            if additional_data is None:
                additional_data = b""

            # Store read data in the buffer
            self.buffer.write(additional_data)
            self.buffer_position += len(additional_data)
            if len(additional_data) < (size - remaining_buffer):
                self.end_of_stream = True
            return data + additional_data

    def seek(self, offset, whence=io.SEEK_SET):
        if whence == io.SEEK_SET:
            new_position = offset
        elif whence == io.SEEK_CUR:
            new_position = self.buffer_position + offset
        elif whence == io.SEEK_END:
            # Read all data to find the end of the stream if not already done
            if not self.end_of_stream:
                self.read()
            buffer_len = self.buffer.seek(0, io.SEEK_END)
            new_position = buffer_len + offset
        else:
            raise ValueError("Invalid value for `whence`: {}".format(whence))

        if new_position < 0:
            raise ValueError("Negative seek position {}".format(new_position))

        if new_position <= self.buffer.seek(0, io.SEEK_END):
            self.buffer_position = new_position
        else:
            # Read from the original stream to fill the buffer up to the new position
            to_read = new_position - self.buffer.seek(0, io.SEEK_END)
            additional_data = self.original_stream.read(to_read)
            if additional_data is None:
                additional_data = b""
            self.buffer.write(additional_data)
            if self.buffer.seek(0, io.SEEK_END) < new_position:
                raise io.UnsupportedOperation(
                    "Cannot seek beyond the end of the stream"
                )
            self.buffer_position = new_position

        return self.buffer_position

    def tell(self):
        return self.buffer_position

    def readable(self):
        return True


# Generate a large stream for testing
large_data = b"A" * 10**7  # 10 MB of data
extra_large_data = b"A" * 10**8  # 100 MB of data


def benchmark_bytesio(data_file_name, n_runs):
    times = []
    peak_memories = []

    for _ in range(n_runs):
        with open(data_file_name, "rb") as stream:
            start_time = time.time()
            tracemalloc.start()

            seekable = io.BytesIO(stream.read())
            seekable.read()

            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            end_time = time.time()

            elapsed_time = end_time - start_time
            times.append(elapsed_time)
            peak_memories.append(peak)

    avg_time = statistics.mean(times)
    avg_memory = statistics.mean(peak_memories)
    return avg_time, avg_memory


def benchmark_seekablestream(stream_class, data_file_name, n_runs):
    times = []
    peak_memories = []

    for _ in range(n_runs):
        with open(data_file_name, "rb") as stream:

            start_time = time.time()
            tracemalloc.start()

            seekable = stream_class(stream)
            seekable.read()

            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            end_time = time.time()

            elapsed_time = end_time - start_time
            times.append(elapsed_time)
            peak_memories.append(peak)

    avg_time = statistics.mean(times)
    avg_memory = statistics.mean(peak_memories)
    return avg_time, avg_memory


for data in [large_data, extra_large_data]:
    data_file_name = tempfile.mktemp()
    with open(data_file_name, "wb") as f:
        f.write(data)
    try:
        n_runs = 20  # Number of runs for averaging
        data_size = len(data) / 10**6  # Convert bytes to MB

        # Benchmark BytesIO approach
        time_bytesio, memory_bytesio = benchmark_bytesio(data_file_name, n_runs)

        # Benchmark SeekableStream approach
        time_seekablestream, memory_seekablestream = benchmark_seekablestream(SeekableStream, data_file_name, n_runs)

        # Benchmark SeekableStream approach with SpooledTemporaryFile as buffer
        time_seekablestream2, memory_seekablestream2 = benchmark_seekablestream(SeekableStream2,
            data_file_name, n_runs
        )
    finally:
        try:
            os.remove(data_file_name)
        except FileNotFoundError:
            pass

    print(f"Data Size: {data_size:.2f} MB")
    print(
        f"BytesIO - Avg Time: {time_bytesio:.4f} seconds, Avg Peak Memory: {memory_bytesio / 10**6:.2f} MB"
    )
    print(
        f"SeekableStream - Avg Time: {time_seekablestream:.4f} seconds, Avg Peak Memory: {memory_seekablestream / 10**6:.2f} MB"
    )
    print(
        f"SeekableStream2 - Avg Time: {time_seekablestream2:.4f} seconds, Avg Peak Memory: {memory_seekablestream2 / 10**6:.2f} MB"
    )

The result really surprised me.....

> python test_seekabke.py 
Data Size: 10.00 MB
BytesIO - Avg Time: 0.0016 seconds, Avg Peak Memory: 10.00 MB
SeekableStream - Avg Time: 0.0079 seconds, Avg Peak Memory: 20.00 MB
SeekableStream2 - Avg Time: 0.0122 seconds, Avg Peak Memory: 30.00 MB
Data Size: 100.00 MB
BytesIO - Avg Time: 0.0425 seconds, Avg Peak Memory: 100.00 MB
SeekableStream - Avg Time: 0.0849 seconds, Avg Peak Memory: 200.00 MB
SeekableStream2 - Avg Time: 0.1501 seconds, Avg Peak Memory: 200.13 MB

In any case, the simplest solution performs better and behaves differently than I expected.

lmignon added a commit to acsone/rest-framework that referenced this pull request Jun 28, 2024
The use of a BytesIO in place of our specialized SeekableStream class to keep the input stream in case we need to process the request again due to a retryable error outperforms both in terms of speed and memory consumption.

see OCA#440 (comment) for more info.
The use of a BytesIO in place of our specialized SeekableStream class to keep the input stream in case we need to process the request again due to a retryable error outperforms both in terms of speed and memory consumption.

see OCA#440 (comment) for more info.
@lmignon lmignon force-pushed the 16.0-fix-retry-on-post-with-body branch from 1b0e0ad to 749ab3f Compare June 28, 2024 10:15
Copy link
Member

@sbidoul sbidoul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting analysis :) Thanks!

@lmignon
Copy link
Contributor Author

lmignon commented Jul 8, 2024

/ocabot merge patch

@OCA-git-bot
Copy link
Contributor

What a great day to merge this nice PR. Let's do it!
Prepared branch 16.0-ocabot-merge-pr-440-by-lmignon-bump-patch, awaiting test results.

@OCA-git-bot OCA-git-bot merged commit 624b404 into OCA:16.0 Jul 8, 2024
7 checks passed
@OCA-git-bot
Copy link
Contributor

Congratulations, your PR was merged at 2380e30. Thanks a lot for contributing to OCA. ❤️

@lmignon lmignon deleted the 16.0-fix-retry-on-post-with-body branch July 8, 2024 11:18
lmignon added a commit to acsone/rest-framework that referenced this pull request Oct 3, 2024
The use of a BytesIO in place of our specialized SeekableStream class to keep the input stream in case we need to process the request again due to a retryable error outperforms both in terms of speed and memory consumption.

see OCA#440 (comment) for more info.
cormaza pushed a commit to cormaza/rest-framework that referenced this pull request Nov 8, 2024
The use of a BytesIO in place of our specialized SeekableStream class to keep the input stream in case we need to process the request again due to a retryable error outperforms both in terms of speed and memory consumption.

see OCA#440 (comment) for more info.
dnplkndll pushed a commit to Kencove/rest-framework that referenced this pull request Dec 14, 2024
The use of a BytesIO in place of our specialized SeekableStream class to keep the input stream in case we need to process the request again due to a retryable error outperforms both in terms of speed and memory consumption.

see OCA#440 (comment) for more info.
kobros-tech pushed a commit to kobros-tech/rest-framework that referenced this pull request Jan 16, 2025
The use of a BytesIO in place of our specialized SeekableStream class to keep the input stream in case we need to process the request again due to a retryable error outperforms both in terms of speed and memory consumption.

see OCA#440 (comment) for more info.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants