Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation flaw of the middleware prevents concurrent requests #27

Open
hongyuan1306 opened this issue Jan 8, 2022 · 2 comments
Open
Labels
bug Something isn't working

Comments

@hongyuan1306
Copy link

I think the current vesion of the msgpack middleware has a serious implementaion flaw that will cause errors when parallel requests are processed.

During each request, some request scoped variables, like receive, send, should_decode_from_msgpack_to_json, initial_message etc, are stored on the middleware instance itself:

        self.should_decode_from_msgpack_to_json = (
            "application/x-msgpack" in headers.get("content-type", "")
        )
        # Take an initial guess, although we eventually may not
        # be able to do the conversion.
        self.should_encode_from_json_to_msgpack = (
            "application/x-msgpack" in headers.getlist("accept")
        )
        self.receive = receive
        self.send = send

The problem is that there is only one instance of the middleware, but multiple parallel requests are normally in progress, so these variables will get mixed up between the requests. When for example receive_with_msgpack is called to process a request, self.receive could already have been overwritten by a subsequent request.

The proper way to pass request scoped values between the various instance methods would be to use request or function scoped storage, like scope, wrapped function or partial function.

@florimondmanca
Copy link
Owner

Hi @hongyuan1306, just seeing this now as I come back around to this repo. I think your analysis is correct. Middleware state should be avoided! An alternative to storing inside scope would be to use contextvars. Don't really mind either way, both should work.

@florimondmanca florimondmanca added the bug Something isn't working label Mar 31, 2022
@florimondmanca florimondmanca changed the title Implementation flaw of the middleware Implementation flaw of the middleware prevents concurrent requests Mar 31, 2022
@mgucare
Copy link

mgucare commented Oct 5, 2022

Hi @hongyuan1306, @florimondmanca,
I can't find the concurrent requests issue. Yes the MessagePackMiddleware instance is unique and only app, packb and unpackb are linked to this instance which is not a problem from my point of view. The main job of this middleware is done in _MessagePackResponder, and each time the __call__ method of MessagePackMiddleware is called, a new instance of the responder is created.
I ran multiple time the following ugly test to confirm what I was thinking, and the result is 100% correct each time.
Am I missing something? How can I reproduce the issue ?

@pytest.mark.asyncio
async def test_msgpack_concurrent() -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive=receive)
        content_type = request.headers["content-type"]
        data = await request.body()
        #print('------------')
        #print(data)
        #print('------------')
        message = data
        text = f"content_type={content_type!r} message={message!r}"
        response = PlainTextResponse(text)
        await response(scope, receive, send)

    app = MessagePackMiddleware(app)

    test_cases = []
    strlen = 'xaf'
    for i in range(0, 10000, 2):
        packed_message = msgpack.packb({"message": "Hello, world! " + str(i)})
        test_cases.append({
            "content": {"message": "Hello, world!" + str(i)},
            "headers": {"content-type": "application/x-msgpack"},
            "body": packed_message,
            "expected_text": "content_type='application/json' message=b'" + '{"message": "Hello, world! ' + str(i) + '"}' + "'"
        })
        packed_message = msgpack.packb({"message": "Hello, world! " + str(i+1)})
        if 9 < i < 100:
            strlen = 'xb0'
        elif 99 < i < 1000:
            strlen = 'xb1'
        elif 999 < i < 10000:
            strlen = 'xb2'
        test_cases.append({
            "content": {"message": "Hello, world! " + str(i+1)},
            "headers": {"content-type": "application/json"},
            "body": packed_message,
            "expected_text": "content_type='application/json' message=b'\\x81\\xa7message\\" + strlen + "Hello, world! "  + str(i+1) + "'"
        })

    async def load_url(data):
        async with httpx.AsyncClient(app=app, base_url="http://testserver") as client:
            content = data["content"]
            body = data["body"]
            r = await client.post(
                "/", content=body, headers=data["headers"]
            )
            assert r.status_code == 200
            assert r.text == data["expected_text"]
            return r.text == data["expected_text"]

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        control = []
        future_to_url = (executor.submit(load_url, data) for data in test_cases)

        for future in concurrent.futures.as_completed(future_to_url):
            control.append(await future.result())
        assert control.count(False) == 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants