Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pool concurrency #46

Merged
merged 2 commits into from
May 24, 2017
Merged

Fix pool concurrency #46

merged 2 commits into from
May 24, 2017

Conversation

argaen
Copy link
Member

@argaen argaen commented May 11, 2017

Previous design of the pool had various flaws:

  • Created more connections than maxsize (although because of maxsize they were discarded).
  • There were race conditions in the control of max size and when creating connection. Now the size is controlled by the queue + _in_use set
  • Although asyncio.Queue was being used, no blocking feature was being used. Now clients will block when trying to acquire a connection

Fixes #43

EDIT: I have to fix the case when minsize is 0, clients get blocked forever Fixed in ff4dbc1

Previous design of the pool had various flaws:

- Created more connections than maxsize (although
  because of maxsize they were discarded).
- There were race conditions in the control of
  max size and when creating connection. Now the
  size is controlled by the queue + _in_use set
- Although asyncio.Queue was being used, no
  blocking feature was being used. Now clients
  will block when trying to acquire a connection
@argaen
Copy link
Member Author

argaen commented May 11, 2017

Again, although I've added tests, I've also used this script to combine with ab for doing load testing:

import uuid
import logging
import asyncio
import aiomcache

from aiohttp import web

logger = logging.getLogger(__name__)


class CacheManager:
    def __init__(self):
        self.cache = aiomcache.Client("127.0.0.1", 11211, pool_size=4)

    async def get(self, key):
        return await asyncio.wait_for(self.cache.get(key), 0.1)

    async def set(self, key, value):
        return await asyncio.wait_for(self.cache.set(key, value), 0.1)


async def handler_get(req):
    try:
        data = await req.app['cache'].get(b'testkey')
        assert req.app['cache'].cache._pool.size() <= 4
        if data:
            return web.Response(text=data.decode())
        data = str(uuid.uuid4()).encode()
        await req.app['cache'].set(b'testkey', data)
        return web.Response(text=str(data))
    except asyncio.TimeoutError:
        data = str(uuid.uuid4()).encode()
        await req.app['cache'].set(b'testkey', data)
        return web.Response(status=404)


if __name__ == '__main__':
    app = web.Application()
    app['cache'] = CacheManager()
    app.router.add_route('GET', '/', handler_get)
    web.run_app(app)

ab -n 3500 -c 1000 http://127.0.0.1:8080/

Concurrency Level:      1000
Time taken for tests:   4.366 seconds
Complete requests:      3500
Failed requests:        38
   (Connect: 0, Receive: 0, Length: 38, Exceptions: 0)
Non-2xx responses:      38
Total transferred:      653268 bytes
HTML transferred:       124632 bytes
Requests per second:    801.74 [#/sec] (mean)
Time per request:       1247.288 [ms] (mean)
Time per request:       1.247 [ms] (mean, across all concurrent requests)
Transfer rate:          146.14 [Kbytes/sec] received

@pfreixes @achedeuzot it would be nice if you could test this.

@@ -16,9 +16,8 @@ def __init__(self, host, port, *, minsize, maxsize, loop=None):
self._minsize = minsize
self._maxsize = maxsize
self._loop = loop
self._pool = asyncio.Queue(maxsize, loop=loop)
self._pool = asyncio.Queue(loop=loop)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for the maxsize, control is done outside the data structure. Having a maxsize here is misleading and can also hide effects like creating more connections than needed (because they get queued or discarded)

@fafhrd91
Copy link
Member

could you try "-k -n 10000"?

conn = _conn

if conn is None:
_conn = yield from self._pool.get()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clients will now block here until there is a connection available. If the retrieved connection has an error, they will close it and try to create a new one

if self.size() < self._maxsize:
reader, writer = yield from asyncio.open_connection(
self._host, self._port, loop=self._loop)
if self.size() < self._maxsize:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid race condition when yielding from open_connection. When we get a new connection, maybe other clients also created their connections so we have to check again and close it if necessary

else:
reader.feed_eof()
writer.close()
return None
else:
return None

def size(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Size of the pool is now coupled with the underlying data structures used to store connections

@argaen
Copy link
Member Author

argaen commented May 11, 2017

@fafhrd91 I've updated the script (forgot to add the wait_for) and updated the results with it.

Concurrency Level:      1000
Time taken for tests:   29.337 seconds
Complete requests:      10000
Failed requests:        5585
   (Connect: 0, Receive: 0, Length: 5585, Exceptions: 0)
Non-2xx responses:      5585
Keep-Alive requests:    5564
Total transferred:      1790477 bytes
HTML transferred:       158940 bytes
Requests per second:    340.87 [#/sec] (mean)
Time per request:       2933.681 [ms] (mean)
Time per request:       2.934 [ms] (mean, across all concurrent requests)
Transfer rate:          59.60 [Kbytes/sec] received

With master ab -k -n 10000 -c 200 http://127.0.0.1:8080/ (with -c 1000 never finishes):

Concurrency Level:      200
Time taken for tests:   22.011 seconds
Complete requests:      10000
Failed requests:        9904
   (Connect: 0, Receive: 0, Length: 9904, Exceptions: 0)
Non-2xx responses:      9904
Keep-Alive requests:    101
Total transferred:      1476309 bytes
HTML transferred:       3456 bytes
Requests per second:    454.33 [#/sec] (mean)
Time per request:       440.213 [ms] (mean)
Time per request:       2.201 [ms] (mean, across all concurrent requests)
Transfer rate:          65.50 [Kbytes/sec] received

Even worse, after the load test, the server doesn't respond to ANY request, which was the original issue reported by @achedeuzot

@codecov-io
Copy link

codecov-io commented May 11, 2017

Codecov Report

Merging #46 into master will decrease coverage by 0.22%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master     #46      +/-   ##
=========================================
- Coverage   91.72%   91.5%   -0.23%     
=========================================
  Files           5       5              
  Lines         266     259       -7     
  Branches       39      38       -1     
=========================================
- Hits          244     237       -7     
  Misses         11      11              
  Partials       11      11
Impacted Files Coverage Δ
aiomcache/pool.py 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update fb875c0...ff4dbc1. Read the comment docs.

@argaen
Copy link
Member Author

argaen commented May 14, 2017

@fafhrd91 would you mind reviewing it? I think its a real improvement

if _conn is None:
break
yield from self._pool.put(_conn)
self._pool.put_nowait(_conn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, If Im not wrong, the code has still chances to oveflow the pool size required by teh user. How much closer are the values btw minsize and maxsize these chances will increase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something but I don't see it happening. put_nowait is sync. It calls _put that directly inserts into the internal deque:

    def _put(self, item):
        self._queue.append(item)

This means that after self._pool.put_nowait, the object is inserted without breaking the coroutine execution.

Also, I added this tests that should cover this case (note it checks every time that the pool size is never bigger than maxsize): https://github.com/aio-libs/aiomcache/pull/46/files#diff-44251e44a09d028bd96e8275ea1a0ec9R92

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeps, my mistake I can see that the _create_new_connection already takes care of that kind of situations.

@@ -39,29 +37,20 @@ def acquire(self):

:return: ``tuple`` (reader, writer)
"""
while self._size < self._minsize:
while self.size() == 0 or self.size() < self._minsize:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just evaluate self.size() < self._minsize ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because _minsize can be 0 and then pool would end up being blocked because no connections exist

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oks got it, I would prefer to do both things without mixing them to get more readability - Ive read it three or four times. But not a big deal for me

@pfreixes
Copy link

LGTM good work !!!

@argaen
Copy link
Member Author

argaen commented May 23, 2017

Hey @asvetlov would you mind reviewing this? I'll wait until next Monday and as talked with @fafhrd91 if there is no more reviews I'll merge and publish a new release with the changes.

@asvetlov
Copy link
Member

LGTM!
Please go ahead

@argaen argaen merged commit cd902d6 into master May 24, 2017
@argaen argaen deleted the fix_asyncio_queue_pool branch May 24, 2017 21:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

connection pool misbehaving with task cancellation
5 participants