Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[support] LimitConcurrentRequestsMiddleware vs stream body response #411

Closed
quazardous opened this issue Jul 10, 2021 · 4 comments
Closed

Comments

@quazardous
Copy link

I'm using response with streamed body as explained here https://github.com/reactphp/http#streaming-outgoing-response

I'm not sure LimitConcurrentRequestsMiddleware knows how to handle response with streamed body:

// from src/Middleware/LimitConcurrentRequestsMiddleware.php
...
            // happy path: if next request handler returned immediately,
            // we can simply try to invoke the next queued request
            if ($response instanceof ResponseInterface) {
                $this->processQueue();
                return $response;
            }
...

So even if the stream is still alive it is not counted as "Concurrent", right ?

@clue
Copy link
Member

clue commented Jul 25, 2021

@quazardous Your observation is correct, this is indeed by design.

The LimitConcurrentRequestsMiddleware is designed to limit "pending" requests by keeping track of pending promises from the next request handlers. As soon as the request handler returns a response (or fulfills a promise with a response), it will no longer be considered "pending". This happens irrespective of whether the response is in a streaming state.

I'm open to discuss this design. Do you have a specific use case in mind where a streaming response should be considered "pending"?

@quazardous
Copy link
Author

Hi,

I'm building an API that deals with input and output ndjson feeds that can be quite large.

The API generate response with streaming body so the stream can last long after the response was processed.

For now I've created some kind of lock mechanism with my business central server object (just an array of spl_hash()) to keep track of the number of long stream.

It's called at the beginning of the (possibly long running) handler.

// handler start
$lock = $this->server->acquireLock(0, function () { //cleanup on force release });

...

// called when response is done or on error
$this->server->releaseLock($lock);

It's raw but effective. The main loop can force release a lock if it's too long (calling the cleanup callback).

Maybe it could be integrated like:

    if ($response instanceof ResponseInterface) {
        if ($response instanceof WithLockResponseInterface) {
            $lock = $response->getLock();
            // test lock/delegate the response to a promise/timer/whatever that knows how to handle the lock and pause the stream...
            // I'm not fluent enough with reactphp to guess the good way to do that...
        } else {
            $this->processQueue();
            return $response;
        }
    }

@quazardous
Copy link
Author

On another front, when creating long streamed response using tick/timers, I've faced huge memory rise when testing in production.

I realized that given a slower network, somehow the internal stream pipe was using more and more memory. It's logical, the fake stream was fast looping and creating data, while the piped through stream could not send those data fast enough to the network. So it was growing somewhere along the chain.

So I've added a memory throttling test that prevents using too much memory (it ticks/timer more often).

I thought maybe memory throttling could be generalized with a middleware or in a dedicated stream like ThroughStream.

@clue
Copy link
Member

clue commented Aug 13, 2021

On another front, when creating long streamed response using tick/timers, I've faced huge memory rise when testing in production.

I realized that given a slower network, somehow the internal stream pipe was using more and more memory. It's logical, the fake stream was fast looping and creating data, while the piped through stream could not send those data fast enough to the network. So it was growing somewhere along the chain.

@reactphp supports throttling streaming data (backpressure), but it sounds like your implementation may not take advantage of this at the moment?

As per https://github.com/reactphp/http#streaming-outgoing-response, you may return any ReadableStreamInterface as the body of your response. If it emits any data, it will automatically be sent as part of the HTTP response body over the transport connection. If the buffer of this transport connection is full, it will automatically throttle your stream by calling pause() on it. Once its buffer drains, it will automatically resume() your stream.

I thought maybe memory throttling could be generalized with a middleware or in a dedicated stream like ThroughStream.

You should have access to the response body in a middleware if you need this kind of control. Check $response->getBody() instanceof ReadableStreamInterface if you want to manually throttle the stream.

I believe this has been answered, so I'm closing this for now. Please come back with more details if this problem persists and we can always reopen this 👍

@clue clue closed this as completed Aug 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants